Netty 源码解析
netty 最核心的就是 reactor 线程,对应项目中使用广泛的 NioEventLoop。
netty的Channel是对原始Channel的一层封装。其中所有的nio的操作封装在了Unsafe中,并进行了一定的增强,例如回调之类的。从AbstractNioChannel可以更加直观的看出,netty对Channel SelectionKey的封装,并添加了自己的回调ChannelPromise从而使方法更加易于使用。
ChannelPipeline & ChannelHandler
ChannelPipeline
处理或拦截 Channel 的入站(inbound)事件
和出站(outbound)操作
的 ChannelHandler
列表。 ChannelPipeline 实现了拦截过滤器模式(责任链)的高级形式,让用户可以完全控制事件的处理方式以及管道中的 ChannelHandlers 如何相互交互。
创建管道 每个通道都有自己的管道,并在创建新通道时自动创建。
事件如何在管道中流动 下图描述了 ChannelPipeline 中的 ChannelHandler 通常如何处理 I/O 事件。 I/O 事件由 ChannelInboundHandler 或 ChannelOutboundHandler 处理,并通过调用 ChannelHandlerContext 中定义的事件传播方法(例如 ChannelHandlerContext.fireChannelRead(Object) 和 ChannelHandlerContext.write(Object))转发到其最近的处理程序。
DefaultChannelPipeline 内部双向链表
|
|
ChannelHandler 添加进ChannelPipeline后会被封装成ChannelHandlerContext,会判断是ChannelInboundHandler还是ChannelOutboundHandler的子类,对inbound和outbound这两个属性进行赋值,ChannelInboundHandler的子类inbound为true,outbound为false,ChannelOutboundHandler反之。ChannelPipeline内部调用方法时,会使用fireXXXXX()的方法,会利用责任链模式进行调用,这时候会用到这个属性进行判断,是否有对应方法,从而进行调用
|
+---------------------------------------------------+---------------+
| ChannelPipeline | |
| \|/ |
| +---------------------+ +-----------+----------+ |
| | Inbound Handler N | | Outbound Handler 1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler N-1 | | Outbound Handler 2 | |
| +----------+----------+ +-----------+----------+ |
| /|\ . |
| . . |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
| [ method call] [method call] |
| . . |
| . \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 2 | | Outbound Handler M-1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 1 | | Outbound Handler M | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
+---------------+-----------------------------------+---------------+
| \|/
+---------------+-----------------------------------+---------------+
| | | |
| [ Socket.read() ] [ Socket.write() ] |
| |
| Netty Internal I/O Threads (Transport Implementation) |
+-------------------------------------------------------------------+
EventLoopGroup 与 EventLoop
EventLoopGroup 初始化创建的时候,会创建对应数量的 EventLoop,如果没有指定,默认创建cpu核心数量*2个EventLoop
reactor 线程启动
NioEventLoop 父类 SingleThreadEventExecutor 的execute方法
|
|
外部线程在往任务队列里面添加任务的时候执行 startThread() ,netty会判断reactor线程有没有被启动,如果没有被启动,那就启动线程再往任务队列里面添加任务
|
|
SingleThreadEventExecutor 在执行doStartThread的时候,会调用内部执行器executor的execute方法,将调用NioEventLoop的run方法的过程封装成一个runnable塞到一个线程中去执行
|
|
该线程就是executor创建,对应netty的reactor线程实体。executor 默认是ThreadPerTaskExecutor
默认情况下,ThreadPerTaskExecutor 在每次执行execute 方法的时候都会通过DefaultThreadFactory创建一个FastThreadLocalThread线程,而这个线程就是netty中的reactor线程实体
reactor 线程的执行
NioEventLoop 的 run 方法
|
|
reactor 线程大概做的事情分为对三个步骤不断循环
- 轮询出IO事件 首先轮询注册到reactor线程对用的selector上的所有的channel的IO事件
- 处理IO事件 处理产生网络IO事件的channel
processSelectedKeys()
- 处理任务队列
runAllTasks(...)
netty的reactor线程第二步做的事情为处理IO事件,netty使用数组替换掉jdk原生的HashSet来保证IO事件的高效处理,每个SelectionKey上绑定了netty类AbstractChannel对象作为attachment,在处理每个SelectionKey的时候,就可以找到AbstractChannel,然后通过pipeline的方式将处理串行到ChannelHandler,回调到用户方法
-
当前reactor线程调用当前eventLoop执行任务,直接执行,否则,添加到任务队列稍后执行
-
netty内部的任务分为普通任务和定时任务,分别落地到MpscQueue和PriorityQueue
-
netty每次执行任务循环之前,会将已经到期的定时任务从PriorityQueue转移到MpscQueue
-
netty每隔64个任务检查一下是否该退出任务循环
-
从scheduledTaskQueue转移定时任务到taskQueue(mpsc queue)
-
计算本次任务循环的截止时间
-
循环执行任务
|
|
这一步便是netty里面执行所有任务的核心代码了。 首先调用safeExecute来确保任务安全执行,忽略任何异常
然后将已运行任务 runTasks 加一,每隔0x3F任务,即每执行完64个任务之后,判断当前时间是否超过本次reactor任务循环的截止时间了,如果超过,那就break掉,如果没有超过,那就继续执行。可以看到,netty对性能的优化考虑地相当的周到,假设netty任务队列里面如果有海量小任务,如果每次都要执行完任务都要判断一下是否到截止时间,那么效率是比较低下的
Netty中的线程有个特别的地方,就是一个IO线程会对应多个业务线程,业务线程就是生产者,IO线程就是消费者,它消费业务线程’生产’的任务.属于单消费者多生产者模式.通过类的名称 MpscUnboundedArrayQueue 可以看出来,这个类就是为多生产者(MultiProducer)单消费者(SingleConsumer)设计的。
- 收尾
|
|