netty源码之ChannelHandlerContext
源码之ChannelHandlerContext
在netty中,ChannelHandlerContext代表了⼀个ChannelHandler和ChannelPipeline之间的关系,Netty会把ChannelHandler包装进ChannelHandlerContext的实例DefaultChannelHandlerContext,然后把ChannelHandlerContext作为元素来组成链表,所以ChannelHandlerContext中⼀定有获得ChannelHandler和ChannelPipeline相关的⽅法。
ChannelHandler handler();
ChannelPipeline pipeline();
ChannelHandlerContext主要功能是管理在同⼀ChannelPipeline中各个ChannelHandler的交互,所以同时ChannelHandlerContext也有很多触发事件传播相关的⽅法。
ChannelHandlerContext的具体实现就三个,都继承⾃AbstractChannelHandlerContext。
AbstractChannelHandlerContext
AbstractChannelHandlerContext⾥⾯包含⼏乎所有与⽹络通信相关的⽅法,套路都⼀样,⽐如fireChannelRead⽅法,这是个inbound 类型的⽅法,当调⽤ctx.fireChannelRead⽅法时,⾸先找到链表中下⼀个inbound 类型的handler,然后进⾏线程本地性判断,是则直接执⾏,不是则投⼊线程的队列异步执⾏。
fireChannelRead()⽅法的源码如下:
@Override
public ChannelHandlerContext fireChannelRead(final Object msg){
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg){
final Object m = uch(ObjectUtil.checkNotNull(msg,"msg"), next);
EventExecutor executor = utor();
if(executor.inEventLoop()){
next.invokeChannelRead(m);
}el{
@Override
public void run(){
next.invokeChannelRead(m);
}
});
}
}
// 寻找双向链表中下⼀个⼊站context,也就是下⼀个⼊站handler
private AbstractChannelHandlerContext findContextInbound(int mask){
AbstractChannelHandlerContext ctx =this;
EventExecutor currentExecutor =executor();
do{
ctx = ;
}while(skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
return ctx;
}
HeadContext
HeadContext的继承关系如下:
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
可以看见,HeadContext既是ChannelHandlerContext,⼜是ChannelHandler,⽽且是同时处理inbound类型和outbound类型的ChannelHandler。
在具体的⽅法上,outbound类型事件的⽅法都是直接调⽤unsafe中的相关⽅法,⽽inbound类型事件的⽅法则会触发事件在ChannelPipeline中的传播。
TailContext
TailContext的继承关系如下:
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
可以看见,TailContext既是ChannelHandlerContext,⼜是ChannelHandler,⽽且是处理inbound类型ChannelHandler。
在具体的⽅法上,TailContext⾥的⽅法基本上都是空实现,有具体代码实现的是:
protected void onUnhandledInboundUrEventTriggered(Object evt){
// This may not be a configuration error and so don't log anything.
// The event may be superfluous for the current pipeline configuration.
}
protected void onUnhandledInboundException(Throwable cau){
try{
logger.warn(
"An exceptionCaught() event was fired, and it reached at the tail of the pipeline. "+
"It usually means the last handler in the pipeline did not handle the exception.",
cau);
}finally{
}
}
protected void onUnhandledInboundMessage(Object msg){
try{
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. "+
"Plea check your pipeline configuration.", msg);
}finally{
}
}
这三个⽅法⽆⼀例外都是进⾏了相关资源的释放⼯作。
这⾥总结下平时在⼊站Handler中,释放资源的⽅式:
1. 继承SimpleChannelInboundHandler,实现channelRead0()⽅法。
我们来看下SimpleChannelInboundHandler的源码:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {
boolean relea =true;
try{
if(acceptInboundMessage(msg)){
@SuppressWarnings("unchecked")
I imsg =(I) msg;
channelRead0(ctx, imsg);// 模板⽅法模式
}el{
relea =fal;
ctx.fireChannelRead(msg);
}
}finally{
if(autoRelea && relea){
}
}
}
2. 可以将资源向后传递给Tail进⾏释放。DefaultChannelHandlerContext
DefaultChannelHandlerContext这个⾥⾯的代码就⾮常简单了,⼤家⾃⾏查阅。
ctx.write()、pipeline.write()、channel.write()的区别
假设现在配置的Handler为:
p.addLast("1",new OutboundHandlerA());
p.addLast("2",new InboundHandlerA());
p.addLast("3",new InboundHandlerB());// 在这个⼊站handler中分别调⽤三个write⽅法
p.addLast("4",new OutboundHandlerB());
那么实际上出站Handler的个数为3个,分别如下:
HeadHandler --> OutboundHandlerA --> OutboundHandlerB
在⼊站InboundHandlerB中调⽤ctx.write(),所经过的出站Handler的个数为2个,分别如下:OutboundHandlerA --> HeadHandler
在⼊站InboundHandlerB中调⽤pipeline.write(),所经过的出站Handler的个数为3个,分别如下:OutboundHandlerB --> OutboundHandlerA -->HeadHandler
channel.write()与pipeline.write()的结果⼀致,实际上channel.write()底层调⽤了pipeline.write()。下⾯是channel.write()的源码:
@Override
public ChannelFuture writeAndFlush(Object msg){
return pipeline.writeAndFlush(msg);
}
pipeline.write()的源码:
// ioty.channel.AbstractChannel#writeAndFlush(java.lang.Object)
public final ChannelFuture writeAndFlush(Object msg){
return tail.writeAndFlush(msg);
}
// ioty.channel.AbstractChannelHandlerContext#writeAndFlush(java.lang.Object)
public ChannelFuture writeAndFlush(Object msg){
return writeAndFlush(msg,newPromi());
}
public ChannelFuture writeAndFlush(Object msg, ChannelPromi promi){
write(msg,true, promi);
return promi;
}
private void write(Object msg,boolean flush, ChannelPromi promi){
ObjectUtil.checkNotNull(msg,"msg");
try{
if(isNotValidPromi(promi,true)){
// cancelled
return;
}
}catch(RuntimeException e){
throw e;
}
// 往前找最近的出站handler
final AbstractChannelHandlerContext next =findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH): MASK_WRITE);
final Object m = uch(msg, next);
EventExecutor executor = utor();
if(executor.inEventLoop()){
if(flush){
next.invokeWriteAndFlush(m, promi);
}el{
next.invokeWrite(m, promi);
}
}el{
final WriteTask task = wInstance(next, m, promi, flush);
if(!safeExecute(executor, task, promi, m,!flush)){
// We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
// and put it back in the Recycler for re-u later.
//
// See /netty/netty/issues/8343.
task.cancel();
}
}
}
private AbstractChannelHandlerContext findContextOutbound(int mask){
AbstractChannelHandlerContext ctx =this;
EventExecutor currentExecutor =executor();
do{
ctx = ctx.prev;
}while(skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
return ctx;
}
从源码中可以看出pipeline.write()是从最后⼀个⼊站TailHandler往前开始遍历找出最近的出站Handler(这⾥是OutboundHandlerB),调⽤它的write⽅法,然后依次往前触发write⽅法。
下⾯是ctx.write()的源码,ctx.write()还是跟上⾯的⼀样,只不过上⾯的代码中this是TailHandler,所以会从TailHandler开始往前找最近的出站Handler(这⾥是OutboundHandlerB),⽽这⾥的this是InboundHandlerB,往前找会找到最近的出站Handler(这⾥是OutboundHandlerA)。
// ioty.channel.AbstractChannelHandlerContext#writeAndFlush(java.lang.Object)
public ChannelFuture writeAndFlush(Object msg, ChannelPromi promi){
write(msg,true, promi);
return promi;
}
下⾯⽤⼀张图来总结下3个write的区别:
注意Tail是⼀个⼊站Handler,不是出站Handler。