这篇文章和你一起过下Netty的主发行版本的一些显著的改变和新特性,让你在把你的应用程序转换到新版本的时候有个概念。
Netty的包名从org.jboss.netty改为io.netty,因为我们不在是JBoss.org的一部分了。
二进制JAR包被分为了多个子模块以便用户能够从类路径中去掉非必需的特性。当前的结构如下:
模块 | 描述 |
---|---|
netty | project parent |
common | utility and logging |
buffer | buffer API |
transport | channel API and its core implementations |
transport-rtrx | RTRX transport implementation |
transport-sctp | SCTP transport implementation |
transport-udt | UDT transport implementation |
handler | channel handlers |
codec | codec framework |
codec-http | HTTP, Web Sockets, SPDY, and RTSP codec |
codec-socks | Socks codec |
example | examples |
all | generates an all-in-one JAR |
tarball | generates a tarball distribution |
所有的Netty的Jar(除了netty-all外)包现在都是OSGI的bundle,能够用在你喜欢的OSGI容器上。
由于上文所提到的结构上的变化,buffer API现在可以作为一个单独的包被使用。为此,ChannelBuffer这个类型名也不那么讲得通了,而应该变更为ByteBuf。
用来创建新buffer的功能类ChannelBuffers被拆分为两个功能类:Unpooled和BufUtil。就像这个名字所暗示的,4.0引入了一个新的池化的ByteBufs,它可以通过ByteBuf的分配器(Allocator)的对应实现ByteBufAllocator来获得。
在3.x时期,buffer分为固定和动态两种类型。一个固定buffer的容量在创建之后就无法改变,而动态buffer的容量在write*(译者按:writeByte,writeInt,writeLong...)方法需要更多空间时自动扩容。
从4.0开始,所有buffer都变成了动态的。但是,相对于之前的动态进行了优化。你可以更容易也更安全的对一个buffer的容量进行扩大和缩小。之所以说它容易是因为有一个新的ByteBuf.capacity(int newCapacity)的方法。说它安全是因为你可以设置一个容量的最大值,以防止容量没有限制的增大。
// 不要再使用 dynamicBuffer() - 使用 buffer(). ByteBuf buf = ByteBuf.buffer(); // 增加buffer的容量 buf.capacity(1024); ... // 缩减buffer的容量 (最后的512个byte被丢弃) buf.capacity(512);
唯一的例外是那些使用wrappedBuffer方法创建的,包装(warp)了一个buffer或一个byte数组的buffer。你无法扩大它的容量,因为这样会使包装一个已有buffer的目的是去意义——减少内存的复制。如果你想要在包装了一个buffer之后改变它的容量,你应该重新创建一个拥有足够容量的buffer,然后将你想要包装的那个buffer的内容复制过来。
一个新的名叫CompositeByteBuf的接口为组合buffer(composite buffer)的实现定义了多种高级的操作。一个用户可以使用组合buffer,以只比随机访问大一点的代价达到一个批量内存复制的目的。要创建一个新的组合buffer,可以像以前一样使用Unpooled.wrappedBuffer(... 译者注:此处省略号应该是指省略方法参数,下同)或Unpooled.compositeBuffer(...)。
在3.x中,ChannelBuffer.toByteBuffer()以及它的其他变体所提供的约定并不那么明确。用户无法确定这些方法会返回一个拥有共享数据的视图buffer还是一个拥有独立数据的通过复制得到的buffer。4.0将toByteBuffer()替换为ByteBuf.nioBufferCount(),nioBuffer(),以及nioBUffers()。如果调用nioBufferCount()返回0,用户总是可以通过调用copy().nioBuffer()来获得一个复制的buffer。
对小字节序的支持经历了重大变化。在之前的版本中,一个用户为了得到一个小字节序的buffer有两种选择:特别指定一个LittleEndianHeapChannelBufferFactory;用目标字节序将已存在的buffer包装起来。4.0添加了一个新方法,ByteBuf.order(ByteOrder)。这个方法返回当前buffer对象的一个具有指定字节序的视图buffer:
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.nio.ByteOrder; ByteBuf buf = Unpooled.buffer(4); buf.setInt(0, 1); // 打印出 '00000001' System.out.format("%08x%n", buf.getInt(0)); ByteBuf leBuf = buf.order(ByteOrder.LITTLE_ENDIAN); // 打印出 '01000000' System.out.format("%08x%n", leBuf.getInt(0)); assert buf != leBuf; assert buf == buf.order(ByteOrder.BIG_ENDIAN);
前面已经提到Netty引入了pooledByteBufinstances。这在很多方面都很实用,举列如下:
public interface ByteBufAllocator { ByteBuf buffer(); ByteBuf buffer(int initialCapacity); ByteBuf buffer(int initialCapacity, int maxCapacity); ByteBuf heapBuffer(); ByteBuf heapBuffer(int initialCapacity); ByteBuf heapBuffer(int initialCapacity, int maxCapacity); ByteBuf directBuffer(); ByteBuf directBuffer(int initialCapacity); ByteBuf directBuffer(int initialCapacity, int maxCapacity); ByteBuf ioBuffer(); CompositeByteBuf compositeBuffer(); CompositeByteBuf compositeBuffer(int maxNumComponents); CompositeByteBuf compositeHeapBuffer(); CompositeByteBuf compositeHeapBuffer(int maxNumComponents); CompositeByteBuf compositeDirectBuffer(); CompositeByteBuf compositeDirectBuffer(int maxNumComponents); }要想从一个handler那里获取当前的 ByteBufAllocator,可以使用ChannelHandlerContext.alloc()或Channel.alloc()方法:
Channel channel = ...; ByteBuf buf = channel.alloc().buffer(512); .... channel.write(buf); ChannelHandlerContext ctx = ... ByteBuf buf2 = ctx.alloc().buffer(512); .... channel.write(buf2)
一旦一个ByteBuf被写入远程节点,它会再次自动的释放进入释放到池(the pool)里。
默认的ByteBufAllocator为PooledByteBufAllocator.如果你不希望使用buffer pooling或使用你自己的allocator,你可以运用Channel.config().setAllocator(..),以及一个可供选择的 allocator,比如UnpooledByteBufAllocator。
在4.0中,许多io.netty.channel包中的类都经历大量修改,因此文本上的简单搜索-替换是无法让你基于3.x的程序迁移到4.0上。这个部分会尝试将这些重大变更背后的思考过程展示出来,而不只是简单地作为展示所有变更。
对于初学者来说,术语'upstream'(译者注:直译为向上游,有点像TCP/IP协议栈中从下往上,从物理层最终到达应用层这么一个流程)和'downstream'有点让人迷惑。在4.0中,只要可能,都会使用'inbound'(译者注:直译为开往内地的,相对于upstream确实更贴切,即指数据从外部网络经历层层filter到达我们的处理逻辑)和'outbound'来替换他们。
在3.x时代,ChannelHandler只是一个标记接口,而在ChannelUpstreamHandler、ChannelDownstreamHandler、LifeCycleAwareChannelHandler定义了具体的处理器方法。在Netty 4中,ChannelHandler将LifeCycleAwareChannelHandler接口和一堆实现辅助方法融合到了一起,具体见代码:
public interface ChannelHandler { void beforeAdd(ChannelHandlerContext ctx) throws Exception; void afterAdd(ChannelHandlerContext ctx) throws Exception; void beforeRemove(ChannelHandlerContext ctx) throws Exception; void afterRemove(ChannelHandlerContext ctx) throws Exception; void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception; void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception; ... }
下方的图表描述了这个新的类型集成层次:
fixme(原文中还没有插入此图)
在3.x时代,所有的I/O操作都会创建一个新的ChannelEvent对象。对每个读或写的操作,还会额外创建一个新的ChannelBuffer对象。由于将资源管理和buffer的池化交给了JVM,这实际上极大地简化了Netty的内部实现。但是,基于Netty开发的应用在高负载下运行时,有时会观察到GC(Garbage Collection)的压力增大或变化不定,这些问题的根源也来自于这里。
4.0通过把事件对象替换为直接与类型相对应(译者注:原文为strongly typed,但是我觉得直译为强类型不太容易理解)的方法调用,几乎完全避免了事件对象的创建。3.x中,有类似于handleUpstream()和handleDownstream()这种能够捕获所有相关类型事件的处理器方法,4.0中你将不会再看到它们的身影了。所有的事件类型现在都有各自对应的处理器方法:
// 3.x时代: void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception; void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception; // 4.0: void channelRegistered(ChannelHandlerContext ctx) throws Exception; void channelUnregistered(ChannelHandlerContext ctx) throws Exception; void channelActive(ChannelHandlerContext ctx) throws Exception; void channelInactive(ChannelHandlerContext ctx) throws Exception; void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception; void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelFuture future) throws Exception; void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) throws Exception; void disconnect(ChannelHandlerContext ctx, ChannelFuture future) throws Exception; void close(ChannelHandlerContext ctx, ChannelFuture future) throws Exception; void deregister(ChannelHandlerContext ctx, ChannelFuture future) throws Exception; void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception; void read(ChannelHandlerContext ctx); void sendFile(ChannelHandlerContext ctx, FileRegion region, ChannelPromise promise) throws Exception;
ChannelHandlerContext类也被修改来反映上述提到的变化:
// Before: ctx.sendUpstream(evt); // After: ctx.fireInboundBufferUpdated();
所有这些变化意味着用户无法去扩展ChannelEvent这个已经不存在的接口了。那用户要怎样才能定义他或她自己的事件类型呢,就像IdleStateEvent?4.0中的ChannelHandler有一个处理器方法叫做userEventTriggered(),它就是被设计用来满足这种特殊的用户需求。
在3.x中,当一个新的Channel被创建并连接成功,至少三个ChannelStateEvent会被触发:channelOpen、channelBound以及channelConnected。当一个Channel关闭,则对应channelDisconnected、channelUnbound以及channelClosed三个事件。
fixme
但是,触发这么多事件的意义并不那么明显。如果在一个Channel进入可读或可写的状态时通知用户,想来会更有帮助。
fixme
channelOpen、channelBound和channelConnected被合并为channelActive。channelDisconnected、channelUnbound和channelClosed被合并为channelInactive。类似的,Channel.isBound()和Channel.isConnected()也被合并为了Channel.isActive()。
需要注意的是,channelRegistered和channelUnregistered这两个事件与channelOpen和channelClosed具有的意义是不一样的。它们(channelRegistered和channelUnregistered)是在支持Channel的动态注册、注销以及再注册时被引入的,就像下图所示:
fixme
在创建出站缓存时也是差不多的(不会新建)。用户的ChannelOutBoundBYteHandler和ChannelOutboundMessageHandler来操作。
4.0里不再有了messageReceived或writeRequested处理器方法。它们被inboundBufferUpdated和flush代替了。用户的入队一个或多个消息到一个入站(或出站)缓存同时会出发一个inboundBUfferUpdated(或flush)事件。
public void inboundBufferUpdated(ChannelHandlerContext ctx) { Queue<MyMessage> in = ctx.inboundMessageBuffer(); Queue<MyNewMessage> out = ctx.nextInboundMessageBuffer(); for (;;) { MyMessage m = in.poll(); if (m == null) { break; } MyNewMessage decoded = decode(m); out.add(decoded); } ctx.fireInboundBufferUpdated(); } public void flush(ChannelHandlerContext ctx, ChannelFuture future) { Queue<MyNewMessage> in = ctx.outboundMessageBuffer(); Queue<MyMessage> out = ctx.nextOutboundMessageBuffer(); for (;;) { MyNewMessage m = in.poll(); if (m == null) { break; } MyMessage encoded = encode(m); out.add(encoded); } ctx.flush(future); }作为选择,用户能够在每个单独的入站(或出站)消息中触发这样的事件来模拟老的行为,尽管相对新方法来说效率更低。
在3.x里一个MessageEvent持有一个任意的对象。它能够是一个ChannelBuffer或是一个用户自定义的对象,它们都是同样对待的:
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) { Object msg = evt.getMessage(); if (msg instanceof ChannelBuffer) { ChannelBuffer buf = (ChannelBuffer) msg; ... } else { MyMessage myMsg = (MyMessage) msg; ... } }在4.0里,它们就分别对待了,因为一个处理器不再处理一个独立的消息,而是处理多种多样的消息:
public void inboundBufferUpdated(ChannelHandlerContext ctx) { if (ctx.hasInboundByteBuffer()) { ByteBuf buf = ctx.inboundByteBuffer(); ... } else { Queue<MyMessage> buf = ctx.inboundMessageBuffer(); for (;;) { MyMessage msg = buf.poll(); if (buf == null) { break; } ... } } }你可能发现一个ServerChannel的处理器是一个入站缓存是Queue<Channel>的入站处理器是较为有趣的。
大多数用户都发现创建和管理它的生命周期是繁琐的,因此它支持用户扩展预定义好的适配器类来使得更方便:
评论删除后,数据将无法恢复
评论(22)
ctx.sendUpstream(evt);
// After:
ctx.fireInboundBufferUpdated();
这个地方原文的After内容应为
// After:
ctx.fireChannelRead(receivedMessage);
引用来自“媛媛小译”的评论
引用来自“Gelopa”的评论
@媛媛小译 如果有时间 翻译点这种文章 , 对你也是很有帮助的
引用来自“Gelopa”的评论
@媛媛小译 如果有时间 翻译点这种文章 , 对你也是很有帮助的
引用来自“石头哥哥”的评论
个人认为还是最好看英文的原版较好