目录

Netty 源码解析

netty 最核心的就是 reactor 线程,对应项目中使用广泛的 NioEventLoop。

netty的Channel是对原始Channel的一层封装。其中所有的nio的操作封装在了Unsafe中,并进行了一定的增强,例如回调之类的。从AbstractNioChannel可以更加直观的看出,netty对Channel SelectionKey的封装,并添加了自己的回调ChannelPromise从而使方法更加易于使用。

ChannelPipeline & ChannelHandler

ChannelPipeline 处理或拦截 Channel 的入站(inbound)事件出站(outbound)操作ChannelHandler 列表。 ChannelPipeline 实现了拦截过滤器模式(责任链)的高级形式,让用户可以完全控制事件的处理方式以及管道中的 ChannelHandlers 如何相互交互。

创建管道 每个通道都有自己的管道,并在创建新通道时自动创建。

事件如何在管道中流动 下图描述了 ChannelPipeline 中的 ChannelHandler 通常如何处理 I/O 事件。 I/O 事件由 ChannelInboundHandler 或 ChannelOutboundHandler 处理,并通过调用 ChannelHandlerContext 中定义的事件传播方法(例如 ChannelHandlerContext.fireChannelRead(Object) 和 ChannelHandlerContext.write(Object))转发到其最近的处理程序。

DefaultChannelPipeline 内部双向链表

1
2
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;

ChannelHandler 添加进ChannelPipeline后会被封装成ChannelHandlerContext,会判断是ChannelInboundHandler还是ChannelOutboundHandler的子类,对inbound和outbound这两个属性进行赋值,ChannelInboundHandler的子类inbound为true,outbound为false,ChannelOutboundHandler反之。ChannelPipeline内部调用方法时,会使用fireXXXXX()的方法,会利用责任链模式进行调用,这时候会用到这个属性进行判断,是否有对应方法,从而进行调用

                                                   |
    +---------------------------------------------------+---------------+
    |                           ChannelPipeline         |               |
    |                                                  \|/              |
    |    +---------------------+            +-----------+----------+    |
    |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
    |    +----------+----------+            +-----------+----------+    |
    |              /|\                                  |               |
    |               |                                  \|/              |
    |    +----------+----------+            +-----------+----------+    |
    |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
    |    +----------+----------+            +-----------+----------+    |
    |              /|\                                  .               |
    |               .                                   .               |
    | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
    |        [ method call]                       [method call]         |
    |               .                                   .               |
    |               .                                  \|/              |
    |    +----------+----------+            +-----------+----------+    |
    |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
    |    +----------+----------+            +-----------+----------+    |
    |              /|\                                  |               |
    |               |                                  \|/              |
    |    +----------+----------+            +-----------+----------+    |
    |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
    |    +----------+----------+            +-----------+----------+    |
    |              /|\                                  |               |
    +---------------+-----------------------------------+---------------+
                    |                                  \|/
    +---------------+-----------------------------------+---------------+
    |               |                                   |               |
    |       [ Socket.read() ]                    [ Socket.write() ]     |
    |                                                                   |
    |  Netty Internal I/O Threads (Transport Implementation)            |
    +-------------------------------------------------------------------+

EventLoopGroup 与 EventLoop

EventLoopGroup 初始化创建的时候,会创建对应数量的 EventLoop,如果没有指定,默认创建cpu核心数量*2个EventLoop

reactor 线程启动

NioEventLoop 父类 SingleThreadEventExecutor 的execute方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void execute(Runnable task, boolean immediate) {
        boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {
            startThread();
            if (isShutdown()) {
                boolean reject = false;
                try {
                    if (removeTask(task)) {
                        reject = true;
                    }
                } catch (UnsupportedOperationException e) {
                    // The task queue does not support removal so the best thing we can do is to just move on and
                    // hope we will be able to pick-up the task before its completely terminated.
                    // In worst case we will log on termination.
                }
                if (reject) {
                    reject();
                }
            }
        }

        if (!addTaskWakesUp && immediate) {
            wakeup(inEventLoop);
        }
    }

外部线程在往任务队列里面添加任务的时候执行 startThread() ,netty会判断reactor线程有没有被启动,如果没有被启动,那就启动线程再往任务队列里面添加任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
  private void startThread() {
        if (state == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                boolean success = false;
                try {
                    doStartThread();
                    success = true;
                } finally {
                    if (!success) {
                        STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                    }
                }
            }
        }
    }

SingleThreadEventExecutor 在执行doStartThread的时候,会调用内部执行器executor的execute方法,将调用NioEventLoop的run方法的过程封装成一个runnable塞到一个线程中去执行

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
private void doStartThread() {
    ...
    executor.execute(new Runnable() {
        @Override
        public void run() {
            thread = Thread.currentThread();
            ...
                SingleThreadEventExecutor.this.run();
            ...
        }
    }
}

该线程就是executor创建,对应netty的reactor线程实体。executor 默认是ThreadPerTaskExecutor

默认情况下,ThreadPerTaskExecutor 在每次执行execute 方法的时候都会通过DefaultThreadFactory创建一个FastThreadLocalThread线程,而这个线程就是netty中的reactor线程实体

reactor 线程的执行

NioEventLoop 的 run 方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
                    // fallthrough
            }
            processSelectedKeys();
            runAllTasks(...);
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        ...
    }

/images/network/netty/reactor-thread.webp

reactor 线程大概做的事情分为对三个步骤不断循环

  • 轮询出IO事件 首先轮询注册到reactor线程对用的selector上的所有的channel的IO事件
  • 处理IO事件 处理产生网络IO事件的channel processSelectedKeys()
  • 处理任务队列 runAllTasks(...)

netty的reactor线程第二步做的事情为处理IO事件,netty使用数组替换掉jdk原生的HashSet来保证IO事件的高效处理,每个SelectionKey上绑定了netty类AbstractChannel对象作为attachment,在处理每个SelectionKey的时候,就可以找到AbstractChannel,然后通过pipeline的方式将处理串行到ChannelHandler,回调到用户方法

  • 当前reactor线程调用当前eventLoop执行任务,直接执行,否则,添加到任务队列稍后执行

  • netty内部的任务分为普通任务和定时任务,分别落地到MpscQueue和PriorityQueue

  • netty每次执行任务循环之前,会将已经到期的定时任务从PriorityQueue转移到MpscQueue

  • netty每隔64个任务检查一下是否该退出任务循环

  • 从scheduledTaskQueue转移定时任务到taskQueue(mpsc queue)

  • 计算本次任务循环的截止时间

  • 循环执行任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
for (;;) {
    safeExecute(task);
    runTasks ++;
    if ((runTasks & 0x3F) == 0) {
        lastExecutionTime = ScheduledFutureTask.nanoTime();
        if (lastExecutionTime >= deadline) {
            break;
        }
    }

    task = pollTask();
    if (task == null) {
        lastExecutionTime = ScheduledFutureTask.nanoTime();
        break;
    }
}

这一步便是netty里面执行所有任务的核心代码了。 首先调用safeExecute来确保任务安全执行,忽略任何异常

然后将已运行任务 runTasks 加一,每隔0x3F任务,即每执行完64个任务之后,判断当前时间是否超过本次reactor任务循环的截止时间了,如果超过,那就break掉,如果没有超过,那就继续执行。可以看到,netty对性能的优化考虑地相当的周到,假设netty任务队列里面如果有海量小任务,如果每次都要执行完任务都要判断一下是否到截止时间,那么效率是比较低下的

Netty中的线程有个特别的地方,就是一个IO线程会对应多个业务线程,业务线程就是生产者,IO线程就是消费者,它消费业务线程’生产’的任务.属于单消费者多生产者模式.通过类的名称 MpscUnboundedArrayQueue 可以看出来,这个类就是为多生产者(MultiProducer)单消费者(SingleConsumer)设计的。

  • 收尾
1
2
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;