Yanyg - Software Engineer

NGINX EVENTS 机制

目录

1 源码分析

auto/configure 生成 obj/ngx_modules.c:

char *ngx_module_names[] = "ngx_http_upstream_module{
    "ngx_core_module",
    "ngx_errlog_module",
    "ngx_conf_module",
    "ngx_regex_module",
    "ngx_events_module",
    "ngx_event_core_module",
    "ngx_epoll_module",
    "ngx_http_module",
    "ngx_http_core_module",
    "ngx_http_log_module",
    ,
    "ngx_http_static_module",
    /* ... */
}";

nginx.c main function 调用 ngx_preinit_modules() 设置模块数组.

ngx_int_t
ngx_preinit_modules(void)
{
    ngx_uint_t  i;

    for (i = 0; ngx_modules[i]; i++) {
        ngx_modules[i]->index = i;
        ngx_modules[i]->name = ngx_module_names[i];
    }

    ngx_modules_n = i;
    ngx_max_module = ngx_modules_n + NGX_MAX_DYNAMIC_MODULES;

    return NGX_OK;
}

ngx_init_cycle 函数调用 ngx_init_cycle(cycle) 初始化相关内存.

ngx_int_t
ngx_cycle_modules(ngx_cycle_t *cycle)
{
    /*
     * create a list of modules to be used for this cycle,
     * copy static modules to it
     */

    cycle->modules = ngx_pcalloc(cycle->pool, (ngx_max_module + 1)
                                              * sizeof(ngx_module_t *));
    if (cycle->modules == NULL) {
        return NGX_ERROR;
    }

    ngx_memcpy(cycle->modules, ngx_modules,
               ngx_modules_n * sizeof(ngx_module_t *));

    cycle->modules_n = ngx_modules_n;

    return NGX_OK;
}

检测是否需要调用create_conf:

for (i = 0; cycle->modules[i]; i++) {
    if (cycle->modules[i]->type != NGX_CORE_MODULE) {
        continue;
    }

    module = cycle->modules[i]->ctx;

    if (module->create_conf) {
        rv = module->create_conf(cycle);
        if (rv == NULL) {
            ngx_destroy_pool(pool);
            return NULL;
        }
        cycle->conf_ctx[cycle->modules[i]->index] = rv;
    }
}

ngx_events_module, create_conf 是 null, init_conf 是 ngx_event_init_conf.

ngx_event_core_module, create_conf 是 ngx_event_core_create_conf, init_conf 是 ngx_event_core_init_conf.

ngx_epoll_module_ctx, create_conf 是 ngx_epoll_create_conf, init_conf 是 ngx_epoll_init_conf.

nginx 单进程模式下, 进入ngx_single_process_cycle, 初始化各模块, 之后进入事件处理:

void
ngx_single_process_cycle(ngx_cycle_t *cycle)
{
    // ...
    for (i = 0; cycle->modules[i]; i++) {
        if (cycle->modules[i]->init_process) {
            if (cycle->modules[i]->init_process(cycle) == NGX_ERROR) {
                /* fatal */
                exit(2);
            }
        }
    }

    for ( ;; ) {
        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle");

        ngx_process_events_and_timers(cycle);
        // ...
    }
}

nginx 多进程模式下, 进入 ngx_master_process_cycle, ngx_start_worker_processes, ngx_worker_process_cycle,

static void
ngx_worker_process_init(ngx_cycle_t *cycle, ngx_int_t worker)
{
    // ...
    for (i = 0; cycle->modules[i]; i++) {
        if (cycle->modules[i]->init_process) {
            if (cycle->modules[i]->init_process(cycle) == NGX_ERROR) {
                /* fatal */
                exit(2);
            }
        }
    }
    // ...
}

static void
ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
{
    // ...
    ngx_int_t worker = (intptr_t) data;
    ngx_worker_process_init(cycle, worker);

    for ( ;; ) {
        // ...
        ngx_process_events_and_timers(cycle);
        // ...
    }
}

ngx_process_events_and_timers 获取 accept 锁, 处理各类event事件.

void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
    if (ngx_use_accept_mutex) {
        // ...
    }

    (void) ngx_process_events(cycle, timer, flags);

    ngx_event_process_posted(cycle, &ngx_posted_accept_events);

    ngx_event_expire_timers();

    ngx_event_process_posted(cycle, &ngx_posted_events);
}

ngx_process_events是个宏定义, 桥接到 ngx_event_actions.process_events:

#define ngx_process_events   ngx_event_actions.process_events
#define ngx_done_events      ngx_event_actions.done

#define ngx_add_event        ngx_event_actions.add
#define ngx_del_event        ngx_event_actions.del
#define ngx_add_conn         ngx_event_actions.add_conn
#define ngx_del_conn         ngx_event_actions.del_conn

#define ngx_notify           ngx_event_actions.notify

#define ngx_add_timer        ngx_event_add_timer
#define ngx_del_timer        ngx_event_del_timer

ngx_event_actions在ngx_epoll_init内部, 初始化为 ngx_epoll_module_ctx.actions.

static ngx_int_t
ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer)
{
    ngx_epoll_conf_t  *epcf;

    epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module);

    if (nevents < epcf->events) {
        if (event_list) {
            ngx_free(event_list);
        }
        event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events,
                               cycle->log);
    }

    nevents = epcf->events;

    ngx_io = ngx_os_io;

    ngx_event_actions = ngx_epoll_module_ctx.actions;

    return NGX_OK;
}

每个模块都有commands, 处理特定event, 这是在conf_parse里完成的:

ngx_conf_handler

至此, 主循环就和 event 处理函数接驳到一起.

1.1 ngx_events_module

类型 NGX_CORE_MODULE. 处理配置相关的命令.

ngx_events_block 处理 events 命令:

  • 计算 NGX_EVENT_MODULE 模块的数量;
  • 给 NGX_EVENT_MODULE 模块执行 create_conf 操作;
  • 调用 ngx_conf_parse 解析配置;
  • 给 NGX_EVENT_MODULE 模块执行 init_conf 操作;

1.2 ngx_event_core_module

ngx_event_core_create_conf 初始化 ngx_event_conf_t, 例如 connections, use 等.

ngx_event_core_init_conf 识别具体的异步IO模块, 例如 epoll/poll/select 等. 并根据配置确定 connections 等配置.

1.3 ngx_epoll_module

处理异步IO.

2 侦听流程

ngx_init_cycle 函数里:

  • 模块初始化时候在cycle里生成了listening port信息. 在ngx_open_listening_sockets侦听所有的端口;
  • ngx_event_process_init 时, 把侦听设置好的fd加到epoll中.
  • 调用者在侦听之前已经设置好了accept之后connection handler, 这个handler会设置rpc context, 同时把rev/wev handler设置为预期值.
  • accept时还会设置connection recv/send 函数. handler调用这两个函数获取数据.

设置accept rev = ngx_event_acceptex for IOCP, others use ngx_event_accept or ngx_event_recvmsg.

3 RPC实现思考

3.1 Async IO Layer

用epoll/select/poll等实现异步IO, 处理IO事件. 提供事件处理句柄, 提供accept处理句柄. Connection处理函数由上层设置.

RpcClient/RpcServer通用.

线程唤醒, Timer唤醒的Eventfd加入到Epoll.

3.2 IO Handler Layer

数据分片, 协议完整相关逻辑, 处理IO收发, 组包, 编解码. 最终生成完成的数据包(请求/应答).

RpcClient/RpcServer通用.

编解码可由上层提供函数接入.

3.3 应用层

发送请求, 等待应答. 或者处理请求, 发送应答.

4 References