io.netty.channel.ChannelPipeline
Interface ChannelPipeline
1 | public interface ChannelPipeline |
ChannelPipeline是个ChannelHandler的集合,用来处理拦截Channel中的inbound事件和outbound操作。ChannelPipeline实现了一个高级的拦截过滤模式,给用户全面的控制一个事件是如何被处理的,以及多个ChannelHandlers在一个管道里是如何交互的。
创建一个pipeline
每个channel都有自己的pipeline,在创建channel时就自动创建pipeline。
事件在pipeline中是怎么流通的
I/O事件由ChannelInboundHandler或ChannelOutboundHandler处理的,并调用ChannelHandlerContext的事件传播方法(比如ChannelHandlerContext.fireChannelRead(Object), ChannelHandlerContext.write(Object))传到最近的handler。
一个inbound事件由inbound handlers(图中左边自下而上的方式)处理。一个inbound handler通常处理由图表底部的I/O线程生成的inbound数据。inbound数据通常由远程端点读取,通常是一个实际的输入操作,例如SocketChannel.read(ByteBuffer)。如果inbound事件继续向上超过了顶部界限,就会被忽略掉,或者日志记录下来。
一个outbound事件由outbound handler(图中右边自上而下的方式) 处理。一个outbound handler通常生成或转换outbound信息,比如写请求。如果outbound事件继续向下超过了底部的界限,就会被与Channel相关的I/O线程处理。I/O线程通常执行实际的操作,比如SocketChannel.write(ByteBuffer)。
举个例子,我们创建如下的pipeline:
1 | ChannelPipeline p = ...; |
上面的例子中,Inbound意味着是inbound handler,outbound意味着是outbound handler。当事件为inbound时,执行流程是1,2,3,4,5。当事件是outbound时,执行流程是5,4,3,2,1。在此基础上,ChannelPipeline自动跳过不相干的handlers来简化过程:
- 3和4没有实现ChannelInboundHandler,因此inbound事件只会执行1,2,5。
- 1和2没有实现ChannelOutboundHandler,因此outbound事件只会执行5,4,3。
- 如果5实现了ChannelInboundHandler和ChannelOutboundHandler,那么inbound和outbound事件都会包括5。
将一个事件传递到下一个handler
一个handler需要调用ChannelHandlerContext中的事件传递方法来将事件传给下一个handler。这些方法包括:
- Inbound事件传递方法:
- ChannelHandlerContext.fireChannelRegistered()
- ChannelHandlerContext.fireChannelActive()
- ChannelHandlerContext.fireChannelRead(Object)
- ChannelHandlerContext.fireChannelReadComplete()
- ChannelHandlerContext.fireExceptionCaught(Throwable)
- ChannelHandlerContext.fireUserEventTriggered(Object)
- ChannelHandlerContext.fireChannelWritabilityChanged()
- ChannelHandlerContext.fireChannelInactive()
- ChannelHandlerContext.fireChannelUnregistered()
- Outbound事件传递方法:
- ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
- ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
- ChannelHandlerContext.write(Object, ChannelPromise)
- ChannelHandlerContext.flush()
- ChannelHandlerContext.read()
- ChannelHandlerContext.disconnect(ChannelPromise)
- ChannelHandlerContext.close(ChannelPromise)
- ChannelHandlerContext.deregister(ChannelPromise)
例如:
1 | public class MyInboundHandler extends ChannelInboundHandlerAdapter { |
创建一个pipeline
用户在一个pipeline中可以有一个或多个ChannelHandlers来接收I/O事件(比如读)以及请求I/O操作(比如写和关闭)。例如一般的channel pipeline中通常有一下几个handlers:
- ProtocolDecoder - 将二进制数据(比如ByteBuf)转换成Java对象。
- ProtocolEncoder - 将Java对象转换为二进制数据。
- 业务逻辑handler - 实际的业务相关的处理。
举个例子:
1 | static final EventExecutorGroup group = new DefaultEventExecutorGroup(16); |
线程安全
一个ChannelHandler可以在任何时候被添加或移除,因为ChannelPipeline是线程安全的。比如你可以在敏感信息需要加密时添加加密handler,在转换完成后移除这个handler。