Nginxstream模块阅读!
Nginx stream(UDP)模块分析
ngx_stream_handler.c
ngx_stream_init_connection函数
代码解读:
在ngx_stream_optimize_rvers⾥设置有连接发⽣时的回调函数ngx_stream_init_connection.
创建⼀个处理tcp的会话对象.
创建ctx数组,⽤于存储模块的ctx数据,调⽤handler,处理tcp数据,收发等等,读事件处理函数,执⾏处理引擎.
按阶段执⾏处理引擎ngx_stream_core_run_phas,调⽤各个模块的handler.
ngx_stream_proxy_module.c
ngx_stream_proxy_pass函数
代码解读:
解析proxy_pass指令,设置处理handler=ngx_stream_proxy_handler,在init建⽴连接之后会调⽤.
获取⼀个upstream{}块的配置信息.
ngx_stream_proxy_handler函数
核⼼代码解读:
ngx_stream_init_connection->ngx_stream_init_ssion之后调⽤,处理请求.
static void
ngx_stream_proxy_handler(ngx_stream_ssion_t *s)
{
u_char *p;
ngx_str_t *host;
ngx_uint_t i;
ngx_connection_t *c;
ngx_resolver_ctx_t *ctx, temp;
ngx_stream_upstream_t *u;
ngx_stream_core_srv_conf_t *cscf;
ngx_stream_proxy_srv_conf_t *pscf;
ngx_stream_upstream_srv_conf_t *uscf, **uscfp;
ngx_stream_upstream_main_conf_t *umcf;
// 获取连接对象
c = s->connection;
pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
"proxy connection handler");
// 创建连接上游的结构体
// ⾥⾯有如何获取负载均衡rver、上下游buf等
u = ngx_pcalloc(c->pool, sizeof(ngx_stream_upstream_t));电脑看电视
if (u == NULL) {
ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
return;
}
// 关联到会话对象
s->upstream = u;
s->log_handler = ngx_stream_proxy_log_error;
u->peer.log = c->log;
u->peer.log_error = NGX_ERROR_ERR;
if (ngx_stream_proxy_t_local(s, u, pscf->local) != NGX_OK) {
ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); return;
}
// 连接的类型,tcp or udp
u-&pe = c->type;
// 开始连接后端的时间
// 准备开始连接,设置开始时间,秒数,没有毫秒
u->start_c = ngx_time();
/
/ 连接的读写事件都设置为ngx_stream_proxy_downstream_handler
// 注意这个连接是客户端发起的连接,即下游
// 当客户端连接可读或可写时就会调⽤ngx_stream_proxy_downstream_handler
c->write->handler = ngx_stream_proxy_downstream_handler;
c->read->handler = ngx_stream_proxy_downstream_handler;
小白船音乐教案// 使⽤数组,可能会连接多个上游服务器
s->upstream_states = ngx_array_create(c->pool, 1,
sizeof(ngx_stream_upstream_state_t));
if (s->upstream_states == NULL) {
ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); return;
}
/
/ 如果是tcp连接,那么创建⼀个缓冲区,⽤来接收数据
if (c->type == SOCK_STREAM) {
p = ngx_pnalloc(c->pool, pscf->buffer_size);
if (p == NULL) {
ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
return;
}
// 注意是给下游使⽤的缓冲区
u->downstream_buf.start = p;
u->d = p + pscf->buffer_size;
u->downstream_buf.pos = p;
u->downstream_buf.last = p;
// 连接可读,表⽰客户端有数据发过来
// 加⼊到&ngx_posted_events
// 稍后由ngx_stream_proxy_downstream_handler来处理
if (c->read->ready) {
ngx_post_event(c->read, &ngx_posted_events);
}
}
// udp不需要,始终⽤⼀个固定⼤⼩的数组接收数据
// proxy_pass⽀持复杂变量
// 如果使⽤了"proxy_pass $xxx",那么就要解析复杂变量
if (pscf->upstream_value) {
if (ngx_stream_proxy_eval(s, pscf) != NGX_OK) {
ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
}
// 检查proxy_pass的⽬标地址
if (u->resolved == NULL) {
uscf = pscf->upstream;
} el {
#if (NGX_STREAM_SSL)
u->ssl_name = u->resolved->host;
#endif
host = &u->resolved->host;沈文秀
// 获取上游的配置结构体
// 在ngx_stream_proxy_pass⾥设置的
//uscf = pscf->upstream;
umcf = ngx_stream_get_module_main_conf(s, ngx_stream_upstream_module);
杀毒打不开怎么办uscfp = umcf->upstreams.elts;
for (i = 0; i < umcf-<s; i++) {
uscf = uscfp[i];
if (uscf->host.len == host->len
&& ((uscf->port == 0 && u->resolved->no_port)
|| uscf->port == u->resolved->port)
&& ngx_strncacmp(uscf->host.data, host->data, host->len) == 0)
小鸟作文{
goto found;
}
}
if (u->resolved->sockaddr) {
if (u->resolved->port == 0
&& u->resolved->sockaddr->sa_family != AF_UNIX)
{
ngx_log_error(NGX_LOG_ERR, c->log, 0,
"no port in upstream \"%V\"", host);
ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); return;
}
if (ngx_stream_upstream_create_round_robin_peer(s, u->resolved)
!= NGX_OK)
{
ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); return;
}
ngx_stream_proxy_connect(s);
return;
}
if (u->resolved->port == 0) {
ngx_log_error(NGX_LOG_ERR, c->log, 0,
"no port in upstream \"%V\"", host);
ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); return;
temp.name = *host;
cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module);
ctx = ngx_resolve_start(cscf->resolver, &temp);
if (ctx == NULL) {
雷锋简报
ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); return;
}
if (ctx == NGX_NO_RESOLVER) {
ngx_log_error(NGX_LOG_ERR, c->log, 0,
"no resolver defined to resolve %V", host);
ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); return;
}
ctx->name = *host;
ctx->handler = ngx_stream_proxy_resolve_handler;
ctx->data = s;
ctx->timeout = cscf->resolver_timeout;
u->resolved->ctx = ctx;
if (ngx_resolve_name(ctx) != NGX_OK) {
u->resolved->ctx = NULL;
ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); return;
}
return;
}
found:
if (uscf == NULL) {
ngx_log_error(NGX_LOG_ALERT, c->log, 0, "no upstream configuration");
ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); return;
}
u->upstream = uscf;
#if (NGX_STREAM_SSL)
u->ssl_name = uscf->host;
#endif
// 负载均衡算法初始化
if (uscf->peer.init(s, uscf) != NGX_OK) {
ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); return;
}
// 准备开始连接,设置开始时间,毫秒
u->peer.start_time = ngx_current_mc;
// 设置负载均衡的重试次数
if (pscf->next_upstream_tries
&& u-&ies > pscf->next_upstream_tries)
{
角边角
u-&ies = pscf->next_upstream_tries;
}
//u->proxy_protocol = pscf->proxy_protocol;
// 最后启动连接
// 使⽤ngx_peer_connection_t连接上游服务器
// 连接失败,需要尝试下⼀个上游rver
// 连接成功要调⽤init初始化上游
ngx_stream_proxy_connect(s);
}
// 连接上游
// 使⽤ngx_peer_connection_t连接上游服务器
/
/ 连接失败,需要尝试下⼀个上游rver
// 连接成功要调⽤init初始化上游
static void
ngx_stream_proxy_connect(ngx_stream_ssion_t *s)
{
普者黑在哪里
ngx_int_t rc;
ngx_connection_t *c, *pc;
ngx_stream_upstream_t *u;
ngx_stream_proxy_srv_conf_t *pscf;
// 获取连接对象
c = s->connection;
c->log->action = "connecting to upstream";
pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
// 连接上游的结构体
// ⾥⾯有如何获取负载均衡rver、上下游buf等
u = s->upstream;
u->connected = 0;
u->proxy_protocol = pscf->proxy_protocol;
// 何时会执⾏这个?
if (u->state) {
u->state->respon_time = ngx_current_mc - u->state->respon_time;
}
/
/ 把⼀个上游的状态添加进会话的数组
u->state = ngx_array_push(s->upstream_states);
if (u->state == NULL) {
ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); return;
}
ngx_memzero(u->state, sizeof(ngx_stream_upstream_state_t));
// 这两个值置为-1,表⽰未初始化
u->state->connect_time = (ngx_mc_t) -1;
u->state->first_byte_time = (ngx_mc_t) -1;
// ⽤来计算响应时间,保存当前的毫秒值
// 之后连接成功后就会两者相减
u->state->respon_time = ngx_current_mc;
// 连接上游
// 使⽤ngx_peer_connection_t连接上游服务器
// 从upstream{}⾥获取⼀个上游rver地址
// 从cycle的连接池获取⼀个空闲连接
// 设置连接的数据收发接⼝函数
// 向epoll添加连接,即同时添加读写事件
// 当与上游服务器有任何数据收发时都会触发epoll
// socket api调⽤连接上游服务器