netty 学习 FrameDecoder

系统 1991 0

我们接下来就看和业务息息相关的解码器,首先我们来看FrameDecoder,这个东西应该是所有的解码器都会实现这个,所以我们来重点看一下。

        FrameDecoder产生的根源就是TCP/IP数据包的传输方式决定的,包在传输的过程中会分片和重组,

正如javadoc里面所说的:

    客户端在发送的时候的序列如下:

+-----+-----+-----+

| ABC | DEF | GHI |

+-----+-----+-----+

服务器端在接受到后可能会变成下面的序列:

+----+-------+---+---+

| AB | CDEFG | H | I |

+----+-------+---+---+

FrameDecoder帮助我们将接受到的数据包整理成有意义的数据帧,例如,可以帮助我们将数据包整理成

下面的数据格式:

+-----+-----+-----+

| ABC | DEF | GHI |

+-----+-----+-----+

我们接下来就看看FrameDecoder的实现吧:

FrameDecoder是继承SimpleChannelUpstreamHandler的,我们首先来看一下messageReceived的实现:

        

Java代码   收藏代码
  1. @Override  
  2.     public  void messageReceived(  
  3.            ChannelHandlerContext ctx, MessageEvent e)  throws Exception {  
  4.   
  5.        Object m = e.getMessage();  
  6.         if (!(m  instanceof ChannelBuffer)) {  
  7.            ctx.sendUpstream(e);  
  8.             return;  
  9.        }  
  10.   
  11.        ChannelBuffer input = (ChannelBuffer) m;  
  12.         if (!input.readable()) {  
  13.             return;  
  14.        }  
  15.   
  16.        ChannelBuffer cumulation = cumulation(ctx);  
  17.         if (cumulation.readable()) {  
  18.            cumulation.discardReadBytes();  
  19.            cumulation.writeBytes(input);  
  20.            callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress());  
  21.        }  else {  
  22.            callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());  
  23.             if (input.readable()) {  
  24.                cumulation.writeBytes(input);  
  25.            }  
  26.        }  
  27.    }  

        这个里面首先会检查input的可读性,这个比较好理解,关键是cumulation,

我们首先来看一下cumulation的实现吧:

Java代码   收藏代码
  1. private ChannelBuffer cumulation(ChannelHandlerContext ctx) {  
  2.         ChannelBuffer c = cumulation;  
  3.          if (c ==  null) {  
  4.             c = ChannelBuffers.dynamicBuffer(  
  5.                     ctx.getChannel().getConfig().getBufferFactory());  
  6.             cumulation = c;  
  7.         }  
  8.          return c;  
  9.     }  

        这个函数很简单,就是如果cumulation为空的时候初始化一下,如果不为空,就返回。我们得思考一下什么时候cumulation为空,什么时候不为空。我们再回过头来看一下上面的实现吧。如果cumulation可读,cumulation.discardReadBytes函数的作用是将0到readIndex之间的空间释放掉,将readIndex和writeIndex都重新标记一下。然后将读到的数据写到buffer里面。如果cumulation不可读,在调callDecode,如果发现从不可读状态到可读状态,则将读到的数据写到缓存区里面。

        我们再来看callDecode的实现:

Java代码   收藏代码
  1. private  void callDecode(  
  2.             ChannelHandlerContext context, Channel channel,  
  3.             ChannelBuffer cumulation, SocketAddress remoteAddress)  throws Exception {  
  4.   
  5.          while (cumulation.readable()) {  
  6.              int oldReaderIndex = cumulation.readerIndex();  
  7.             Object frame = decode(context, channel, cumulation);  
  8.              if (frame ==  null) {  
  9.                  if (oldReaderIndex == cumulation.readerIndex()) {  
  10.                      // Seems like more data is required.  
  11.                      // Let us wait for the next notification.  
  12.                      break;  
  13.                 }  else {  
  14.                      // Previous data has been discarded.  
  15.                      // Probably it is reading on.  
  16.                      continue;  
  17.                 }  
  18.             }  else  if (oldReaderIndex == cumulation.readerIndex()) {  
  19.                  throw  new IllegalStateException(  
  20.                          "decode() method must read at least one byte " +  
  21.                          "if it returned a frame (caused by: " + getClass() +  ")");  
  22.             }  
  23.   
  24.             unfoldAndFireMessageReceived(context, remoteAddress, frame);  
  25.         }  
  26.   
  27.          if (!cumulation.readable()) {  
  28.            this.cumulation =  null;  
  29.         }  
  30.     }  
  31.       

 

 

       这个里面上面是一个循环,首先将读指针备份一下,decode方法是交个子类实现的一个抽象方这个用来实现具体数据分帧的算法,从这个里面看到如果子类没有读到一帧数据,则返回null所以下面有一个判断,是一点数据没有读呢,还是读了一点,如果一点都没有读,就不需要再检测了等下一次messageRecieved进行通知,如果发现读了一点数据,就调用下一次分帧。如果读了一帧数据就发送一个通知,unfold是针对读到的循环数据要不要打开的意思。到最后如果发现不是可读状态,

cumulation将会被设置成null。

 

最后来看一下cleanup的实现

Java代码   收藏代码
  1. private  void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)  
  2.              throws Exception {  
  3.          try {  
  4.             ChannelBuffer cumulation =  this.cumulation;  
  5.              if (cumulation ==  null) {  
  6.                  return;  
  7.             }  else {  
  8.                  this.cumulation =  null;  
  9.             }  
  10.   
  11.              if (cumulation.readable()) {  
  12.                  // Make sure all frames are read before notifying a closed channel.  
  13.                 callDecode(ctx, ctx.getChannel(), cumulation,  null);  
  14.             }  
  15.   
  16.              // Call decodeLast() finally.  Please note that decodeLast() is  
  17.              // called even if there's nothing more to read from the buffer to  
  18.              // notify a user that the connection was closed explicitly.  
  19.             Object partialFrame = decodeLast(ctx, ctx.getChannel(), cumulation);  
  20.              if (partialFrame !=  null) {  
  21.                 unfoldAndFireMessageReceived(ctx,  null, partialFrame);  
  22.             }  
  23.         }  finally {  
  24.             ctx.sendUpstream(e);  
  25.         }  
  26.     }  

    在这个里面一般来说是在socket断开的时候调用,这个时候如果发现buffer还是可读状态,还会努力的确保所有的数据已经被分帧,然后调用decodeLast

 

===========================================================================================

转自http://blog.csdn.net/xiaolang85/article/details/12621663

netty 学习 FrameDecoder


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论