博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
netty源码解解析(4.0)-13 Channel NIO实现: 关闭和清理
阅读量:4963 次
发布时间:2019-06-12

本文共 9783 字,大约阅读时间需要 32 分钟。

  Channel提供了3个方法用来实现关闭清理功能:disconnect,close,deregister。本章重点分析这个3个方法的功能的NIO实现。

 

  disconnect实现: 断开连接

  disconnect方法的调用栈如下:

1 io.netty.channel.AbstractChannel#disconnect()2 io.netty.channel.DefaultChannelPipeline#disconnect()3 io.netty.channel.AbstractChannelHandlerContext#disconnect()4 io.netty.channel.AbstractChannelHandlerContext#disconnect(io.netty.channel.ChannelPromise)5 io.netty.channel.AbstractChannelHandlerContext#invokeDisconnect6 io.netty.channel.DefaultChannelPipeline.HeadContext#disconnect7 io.netty.channel.AbstractChannel.AbstractUnsafe#disconnect8 io.netty.channel.socket.nio.NioSocketChannel#doDisconnect9 io.netty.channel.socket.nio.NioSocketChannel#doClose

  disconnect稍微复杂一些, 在io.netty.channel.AbstractChannelHandlerContext#disconnect(io.netty.channel.ChannelPromise)实现中,会根据channel是否支持disconnect操作来决定下一步动作:

if (!channel().metadata().hasDisconnect()) {    next.invokeClose(promise);} else {    next.invokeDisconnect(promise);}   

  之所以这样设计,是因为TCP和UDP的disconnect含义是不一样的,对TCP来说disconnect就是关闭socket;对UDP来说,它没有连接的概念,默认情况下通过udp socket发送数据需要指定远程地址,但如果调用connect之后,就不需指定这个地址,数据报会被发送到connect指定的地址上,disconnect含义是删除connect指定的地址,发送数据时必须指定地址。所以在NIO的Channel实现中,TCP的disconnect是调用socket的close方法,UDP的disconnect是调用socket的disconnect方法,下面是两种不同的disconnect实现。

//TCP io.netty.channel.socket.nio.NioSocketChannel#doDisconnect@Overrideprotected void doDisconnect() throws Exception {    doClose();}@Overrideprotected void doClose() throws Exception {    super.doClose();    javaChannel().close();}//UDP io.netty.channel.socket.nio.NioDatagramChannel#doDisconnect@Overrideprotected void doDisconnect() throws Exception {    javaChannel().disconnect();}

  io.netty.channel.AbstractChannel.AbstractUnsafe#disconnect实现了disconnect的逻辑,先调用doDisconnect方法,这个方法是io.netty.channel.AbstractChannel定义的的抽象方法。如果channel的状态从active变成inactive,就调用pipeline的fireChannelInactive方法触发channelInactive事件。

  

  close实现: 关闭channel

  close方法的调用栈:

io.netty.channel.AbstractChannel#close()io.netty.channel.DefaultChannelPipeline#close()io.netty.channel.AbstractChannelHandlerContext#close()io.netty.channel.AbstractChannelHandlerContext#close(io.netty.channel.ChannelPromise)io.netty.channel.AbstractChannelHandlerContext#invokeCloseio.netty.channel.DefaultChannelPipeline.HeadContext#closeio.netty.channel.AbstractChannel.AbstractUnsafe#close(io.netty.channel.ChannelPromise)io.netty.channel.AbstractChannel.AbstractUnsafe#close(final ChannelPromise promise, final Throwable cause ,final ClosedChannelException closeCause, final boolean notify)io.netty.channel.AbstractChannel.AbstractUnsafe#doClose0io.netty.channel.socket.nio.NioSocketChannel#doClose  

  close的逻辑实现在io.netty.channel.AbstractChannel.AbstractUnsafe#close(final ChannelPromise promise, final Throwable cause ,final ClosedChannelException closeCause, final boolean notify)中,这个close方法主要实现了一下几个功能:

  1. 确保在多线程环境下,多次调用close和一次调用的影响一致,并且可以通过promis得到同样的结果。
  2. 保证在执行close的过程中,不能向channel写数据。
  3. 调用doClose0执行执真正的close操作。
  4. 调用deregister对channel做最后的清理工作,并触发channelInactive, channelUnregistered事件。

  以下是这个方法的代码:

1 private void close(final ChannelPromise promise, final Throwable cause, 2                    final ClosedChannelException closeCause, final boolean notify) { 3     if (!promise.setUncancellable()) { 4         return; 5     } 6  7     if (closeInitiated) { 8         if (closeFuture.isDone()) { 9             // Closed already.10             safeSetSuccess(promise);11         } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.12             // This means close() was called before so we just register a listener and return13             closeFuture.addListener(new ChannelFutureListener() {14                 @Override15                 public void operationComplete(ChannelFuture future) throws Exception {16                     promise.setSuccess();17                 }18             });19         }20         return;21     }22 23     closeInitiated = true;24 25     final boolean wasActive = isActive();26     final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;27     this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.28     Executor closeExecutor = prepareToClose();29     if (closeExecutor != null) {30         closeExecutor.execute(new Runnable() {31             @Override32             public void run() {33                 try {34                     // Execute the close.35                     doClose0(promise);36                 } finally {37                     // Call invokeLater so closeAndDeregister is executed in the EventLoop again!38                     invokeLater(new Runnable() {39                         @Override40                         public void run() {41                             if (outboundBuffer != null) {42                                 // Fail all the queued messages43                                 outboundBuffer.failFlushed(cause, notify);44                                 outboundBuffer.close(closeCause);45                             }46                             fireChannelInactiveAndDeregister(wasActive);47                         }48                     });49                 }50             }51         });52     } else {53         try {54             // Close the channel and fail the queued messages in all cases.55             doClose0(promise);56         } finally {57             if (outboundBuffer != null) {58                 // Fail all the queued messages.59                 outboundBuffer.failFlushed(cause, notify);60                 outboundBuffer.close(closeCause);61             }62         }63         if (inFlush0) {64             invokeLater(new Runnable() {65                 @Override66                 public void run() {67                     fireChannelInactiveAndDeregister(wasActive);68                 }69             });70         } else {71             fireChannelInactiveAndDeregister(wasActive);72         }73     }74 }

   7-23行,在这个方法被多次调用的时候,只有一次可以执行的21行以后的代码。从代码看,这一点是用closeInitiated属性来保证的,但它是一个普通boolean类型的属性,在多线程情况下存在可见性问题。事实上一个channel unsafe实例的close方法,只会在一个线程中执行,closeInitiated只在这个方法中使用,因此不存在多线程间的可见性问题。虽然可能在多个不同的线程中多次调用Channel的close方法,但是这个close方法,只会在channel的eventLoop线程中执行。凡是通过io.netty.channel.DefaultChannelPipeline.HeadContext调用的channel unsafe方法,都一定在channel的eventLoop线程中执行。

  26,27行,把channel unsafe的outboundBuffer设置为null,  这样,在close的过程中,所有channel的write方法都会通过promise返回错误。

  28行,prepareToClose默认实现是返回null, 它是一个protected方法,可以根据需要覆盖它,用来在关闭之前做一些准备工作,同时指定一个executor,让接下来的关闭动作都在这个executor中执行。

  33-49行,53-72行,这两段代码实现的都是功能都是一样的,不同的是33-49行在prepareToClose提供的executor中执行。调用doClose0执行关闭操作,清理outboundBuffer(43,44),  调用fireChannelInactiveAndDeregister(46)触发channelInactive和channelDeregister事件。63-72行,通过inFlush0属性检查当前是否正在进程flush操作,如果是,使用invokerLater确保在当前方法和flush操作完成之后再触发事件。

  doClose0中是真正的关闭操作,它先调用doClose,然后设置promise的返回值:

1 //io.netty.channel.AbstractChannel.AbstractUnsafe#doClose0 2 private void doClose0(ChannelPromise promise) { 3     try { 4         doClose(); 5         closeFuture.setClosed(); 6         safeSetSuccess(promise); 7     } catch (Throwable t) { 8         closeFuture.setClosed(); 9         safeSetFailure(promise, t);10     }11 }12 //io.netty.channel.socket.nio.NioSocketChannel#doClose13 @Override14 protected void doClose() throws Exception {15     super.doClose();16     javaChannel().close();17 }

   fireChannelInactiveAndDeregister是调用deregister实现,也就是说,正常情况下,调用Channel的close方法之后就会自动完成一个channel最后的清理工作,不需要再调用deregister方法。

1 private void fireChannelInactiveAndDeregister(final boolean wasActive) {2     deregister(voidPromise(), wasActive && !isActive());3 }

 

  deregister实现:从eventLoop中注销channel

  deregister的调用栈:

1 io.netty.channel.AbstractChannel#deregister()2 io.netty.channel.DefaultChannelPipeline#deregister()3 io.netty.channel.AbstractChannelHandlerContext#deregister()4 io.netty.channel.AbstractChannelHandlerContext#deregister(io.netty.channel.ChannelPromise)5 io.netty.channel.AbstractChannelHandlerContext#invokeDeregister6 io.netty.channel.DefaultChannelPipeline.HeadContext#deregister7 io.netty.channel.AbstractChannel.AbstractUnsafe#deregister(io.netty.channel.ChannelPromise)8 io.netty.channel.AbstractChannel.AbstractUnsafe#deregister(io.netty.channel.ChannelPromise, boolean)9 io.netty.channel.nio.AbstractNioChannel#doDeregister

   deregister的逻辑在中实现io.netty.channel.AbstractChannel.AbstractUnsafe#deregister(final ChannelPromise promise, final boolean fireChannelInactive),这个方法的实现比较简单,主要就是调用doDeregister方法执行deregister操作,然后触发channelInactive事件(如果fireChannelInactive参数是true)和channelUnregistered事件。

private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {    if (!promise.setUncancellable()) {        return;    }    if (!registered) {        safeSetSuccess(promise);        return;    }    invokeLater(new Runnable() {        @Override        public void run() {            try {                doDeregister();            } catch (Throwable t) {                logger.warn("Unexpected exception occurred while deregistering a channel.", t);            } finally {                if (fireChannelInactive) {                    pipeline.fireChannelInactive();                }                if (registered) {                    registered = false;                    pipeline.fireChannelUnregistered();                }                safeSetSuccess(promise);            }        }    });}

  这里使用invokeLater执行主要逻辑的目的是为了保证把当前正在eventLoop队列中所有任何都执行完之后再执行真正的deregister操作。

  doDeregister默认实现是空,什么都没做,它是个protected方法。真正的实现在io.netty.channel.nio.AbstractNioChannel中,它只是简单地调用eventLoop的cancel方法把SocketChannel对应的SelectionKey从Selector中删除,这样selector就不会监听到这个socket上的任何事件了。

1 @Override2 protected void doDeregister() throws Exception {3     eventLoop().cancel(selectionKey());4 }

 

转载于:https://www.cnblogs.com/brandonli/p/10240998.html

你可能感兴趣的文章
js 根据身份证获取出生日期及性别
查看>>
PHPstorm+XDebug+Chrome/Firefox超详细教程(图文)
查看>>
织梦常用标签汇总-------未完待续
查看>>
VMWare ESX/ESXi 虚拟机硬盘的厚置备(Thick Provision)与精简置备(Thin Provision)的转换
查看>>
(译文)Python中的staticmethod与classmethod
查看>>
表单验证
查看>>
总结-SQL注入
查看>>
pytorch-tensor处理速查表(cat stack squeeze unsqueeze permute等)
查看>>
今天的谈话
查看>>
最大熵模型---关毅老师的课件
查看>>
javaScript 简单的时间格式转换【转】
查看>>
解析AFNetWorking 网络框架(二)
查看>>
ThinkPHP5的模板替换__STATIC__
查看>>
Time Complexity of Loop with Powers
查看>>
python高级特性
查看>>
Linux编程之PING的实现
查看>>
[iOS开发]ShareSDK
查看>>
上一家公司倒闭,为什么我又来了创业公司?
查看>>
最大正方形 · Maximal Square
查看>>
746. Min Cost Climbing Stairs 最不费力的加权爬楼梯
查看>>