Netty 4.0 新的特性及需要注意的地方

这篇文章和你一起过下Netty的主发行版本的一些显著的改变和新特性,让你在把你的应用程序转换到新版本的时候有个概念。

项目结构改变

Netty的包名从org.jboss.netty改为io.netty,因为我们不在是JBoss.org的一部分了

二进制JAR包被分为了多个子模块以便用户能够从类路径中去掉非必需的特性。当前的结构如下:

模块 描述
netty project parent
common utility and logging
buffer buffer API
transport channel API and its core implementations
transport-rtrx RTRX transport implementation
transport-sctp SCTP transport implementation
transport-udt UDT transport implementation
handler channel handlers
codec codec framework
codec-http HTTP, Web Sockets, SPDY, and RTSP codec
codec-socks Socks codec
example examples
all generates an all-in-one JAR
tarball generates a tarball distribution

所有的Netty的Jar(除了netty-all外)包现在都是OSGI的bundle,能够用在你喜欢的OSGI容器上。

常用API的变化

Buffer API变化

ChannelBuffer ByteBuf

由于上文所提到的结构上的变化,buffer API现在可以作为一个单独的包被使用。为此,ChannelBuffer这个类型名也不那么讲得通了,而应该变更为ByteBuf。

用来创建新buffer的功能类ChannelBuffers被拆分为两个功能类:Unpooled和BufUtil。就像这个名字所暗示的,4.0引入了一个新的池化的ByteBufs,它可以通过ByteBuf的分配器(Allocator)的对应实现ByteBufAllocator来获得。

大多数的buffer变成了动态的,具备可配置的最大容量

在3.x时期,buffer分为固定和动态两种类型。一个固定buffer的容量在创建之后就无法改变,而动态buffer的容量在write*(译者按:writeByte,writeInt,writeLong...)方法需要更多空间时自动扩容。

从4.0开始,所有buffer都变成了动态的。但是,相对于之前的动态进行了优化。你可以更容易也更安全的对一个buffer的容量进行扩大和缩小。之所以说它容易是因为有一个新的ByteBuf.capacity(int newCapacity)的方法。说它安全是因为你可以设置一个容量的最大值,以防止容量没有限制的增大。

// 不要再使用 dynamicBuffer() - 使用 buffer().
ByteBuf buf = ByteBuf.buffer();

// 增加buffer的容量
buf.capacity(1024);
...

// 缩减buffer的容量 (最后的512个byte被丢弃)
buf.capacity(512);

唯一的例外是那些使用wrappedBuffer方法创建的,包装(warp)了一个buffer或一个byte数组的buffer。你无法扩大它的容量,因为这样会使包装一个已有buffer的目的是去意义——减少内存的复制。如果你想要在包装了一个buffer之后改变它的容量,你应该重新创建一个拥有足够容量的buffer,然后将你想要包装的那个buffer的内容复制过来。

新接口: CompositeByteBuf

一个新的名叫CompositeByteBuf的接口为组合buffer(composite buffer)的实现定义了多种高级的操作。一个用户可以使用组合buffer,以只比随机访问大一点的代价达到一个批量内存复制的目的。要创建一个新的组合buffer,可以像以前一样使用Unpooled.wrappedBuffer(... 译者注:此处省略号应该是指省略方法参数,下同)或Unpooled.compositeBuffer(...)。

可预知的NIO buffer转型

在3.x中,ChannelBuffer.toByteBuffer()以及它的其他变体所提供的约定并不那么明确。用户无法确定这些方法会返回一个拥有共享数据的视图buffer还是一个拥有独立数据的通过复制得到的buffer。4.0将toByteBuffer()替换为ByteBuf.nioBufferCount(),nioBuffer(),以及nioBUffers()。如果调用nioBufferCount()返回0,用户总是可以通过调用copy().nioBuffer()来获得一个复制的buffer。

对小字节序变更的支持

对小字节序的支持经历了重大变化。在之前的版本中,一个用户为了得到一个小字节序的buffer有两种选择:特别指定一个LittleEndianHeapChannelBufferFactory;用目标字节序将已存在的buffer包装起来。4.0添加了一个新方法,ByteBuf.order(ByteOrder)。这个方法返回当前buffer对象的一个具有指定字节序的视图buffer:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.nio.ByteOrder;
 
ByteBuf buf = Unpooled.buffer(4);
buf.setInt(0, 1);
// 打印出 '00000001'
System.out.format("%08x%n", buf.getInt(0)); 
 
ByteBuf leBuf = buf.order(ByteOrder.LITTLE_ENDIAN);
// 打印出 '01000000'
System.out.format("%08x%n", leBuf.getInt(0));
 
assert buf != leBuf;
assert buf == buf.order(ByteOrder.BIG_ENDIAN);

Pooled ByteBuf

前面已经提到Netty引入了pooledByteBufinstances。这在很多方面都很实用,举列如下:

public interface ByteBufAllocator {
 
    ByteBuf buffer();
    ByteBuf buffer(int initialCapacity);
    ByteBuf buffer(int initialCapacity, int maxCapacity);
    ByteBuf heapBuffer();
    ByteBuf heapBuffer(int initialCapacity);
    ByteBuf heapBuffer(int initialCapacity, int maxCapacity);
    ByteBuf directBuffer();
    ByteBuf directBuffer(int initialCapacity);
    ByteBuf directBuffer(int initialCapacity, int maxCapacity);
    ByteBuf ioBuffer();
 
    CompositeByteBuf compositeBuffer();
    CompositeByteBuf compositeBuffer(int maxNumComponents);
    CompositeByteBuf compositeHeapBuffer();
    CompositeByteBuf compositeHeapBuffer(int maxNumComponents);
    CompositeByteBuf compositeDirectBuffer();
    CompositeByteBuf compositeDirectBuffer(int maxNumComponents);
}
要想从一个handler那里获取当前的 ByteBufAllocator,可以使用ChannelHandlerContext.alloc()或Channel.alloc()方法:
Channel channel = ...;
ByteBuf buf = channel.alloc().buffer(512);
....
channel.write(buf);
 
ChannelHandlerContext ctx = ...
ByteBuf buf2 = ctx.alloc().buffer(512);
....
channel.write(buf2)

一旦一个ByteBuf被写入远程节点,它会再次自动的释放进入释放到池(the pool)里。

默认的ByteBufAllocator为PooledByteBufAllocator.如果你不希望使用buffer pooling或使用你自己的allocator,你可以运用Channel.config().setAllocator(..),以及一个可供选择的 allocator,比如UnpooledByteBufAllocator。

Channel API的变化

在4.0中,许多io.netty.channel包中的类都经历大量修改,因此文本上的简单搜索-替换是无法让你基于3.x的程序迁移到4.0上。这个部分会尝试将这些重大变更背后的思考过程展示出来,而不只是简单地作为展示所有变更。

翻新后的ChannelHandler接口

Upstream → Inbound, Downstream → Outbound

对于初学者来说,术语'upstream'(译者注:直译为向上游,有点像TCP/IP协议栈中从下往上,从物理层最终到达应用层这么一个流程)和'downstream'有点让人迷惑。在4.0中,只要可能,都会使用'inbound'(译者注:直译为开往内地的,相对于upstream确实更贴切,即指数据从外部网络经历层层filter到达我们的处理逻辑)和'outbound'来替换他们。

新的ChannelHandler继承层次

在3.x时代,ChannelHandler只是一个标记接口,而在ChannelUpstreamHandler、ChannelDownstreamHandler、LifeCycleAwareChannelHandler定义了具体的处理器方法。在Netty 4中,ChannelHandler将LifeCycleAwareChannelHandler接口和一堆实现辅助方法融合到了一起,具体见代码:

public interface ChannelHandler {
 
    void beforeAdd(ChannelHandlerContext ctx) throws Exception;
    void afterAdd(ChannelHandlerContext ctx) throws Exception;
    void beforeRemove(ChannelHandlerContext ctx) throws Exception;
    void afterRemove(ChannelHandlerContext ctx) throws Exception;
 
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
    ...
}

下方的图表描述了这个新的类型集成层次:

fixme(原文中还没有插入此图)

事件对象从ChannelHandler中消失了

在3.x时代,所有的I/O操作都会创建一个新的ChannelEvent对象。对每个读或写的操作,还会额外创建一个新的ChannelBuffer对象。由于将资源管理和buffer的池化交给了JVM,这实际上极大地简化了Netty的内部实现。但是,基于Netty开发的应用在高负载下运行时,有时会观察到GC(Garbage Collection)的压力增大或变化不定,这些问题的根源也来自于这里。

4.0通过把事件对象替换为直接与类型相对应(译者注:原文为strongly typed,但是我觉得直译为强类型不太容易理解)的方法调用,几乎完全避免了事件对象的创建。3.x中,有类似于handleUpstream()和handleDownstream()这种能够捕获所有相关类型事件的处理器方法,4.0中你将不会再看到它们的身影了。所有的事件类型现在都有各自对应的处理器方法:

// 3.x时代:
void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception;
void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception;
 
// 4.0:
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
void channelActive(ChannelHandlerContext ctx) throws Exception;
void channelInactive(ChannelHandlerContext ctx) throws Exception;
void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception;
 
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelFuture future) throws Exception;
void connect(
        ChannelHandlerContext ctx, SocketAddress remoteAddress,
        SocketAddress localAddress, ChannelFuture future) throws Exception;
void disconnect(ChannelHandlerContext ctx, ChannelFuture future) throws Exception;
void close(ChannelHandlerContext ctx, ChannelFuture future) throws Exception;
void deregister(ChannelHandlerContext ctx, ChannelFuture future) throws Exception;
void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception;
void read(ChannelHandlerContext ctx);
void sendFile(ChannelHandlerContext ctx, FileRegion region, ChannelPromise promise) throws Exception;

ChannelHandlerContext类也被修改来反映上述提到的变化:

// Before:
ctx.sendUpstream(evt);
 
// After:
ctx.fireInboundBufferUpdated();

所有这些变化意味着用户无法去扩展ChannelEvent这个已经不存在的接口了。那用户要怎样才能定义他或她自己的事件类型呢,就像IdleStateEvent?4.0中的ChannelHandler有一个处理器方法叫做userEventTriggered(),它就是被设计用来满足这种特殊的用户需求。

Simplified channel state model

在3.x中,当一个新的Channel被创建并连接成功,至少三个ChannelStateEvent会被触发:channelOpen、channelBound以及channelConnected。当一个Channel关闭,则对应channelDisconnected、channelUnbound以及channelClosed三个事件。

fixme

但是,触发这么多事件的意义并不那么明显。如果在一个Channel进入可读或可写的状态时通知用户,想来会更有帮助。

fixme

channelOpen、channelBoundchannelConnected被合并为channelActive。channelDisconnected、channelUnboundchannelClosed被合并为channelInactive。类似的,Channel.isBound()和Channel.isConnected()也被合并为了Channel.isActive()。

需要注意的是,channelRegistered和channelUnregistered这两个事件与channelOpen和channelClosed具有的意义是不一样的。它们(channelRegistered和channelUnregistered)是在支持Channel的动态注册、注销以及再注册时被引入的,就像下图所示:

fixme

每个处理器的缓存

不像3.x那样在每次读操作都简历一个新堆里的缓存来触发上游的MessageEvent,4.0不会每次都创建新的 缓存。它直接从socket中读取数据到由用户的ChannelInboundByteHandler和ChannelInboundMessageHandler实现创建的入站缓存。

因为由上述处理器创建的入站缓存直到关联的通道关闭前都会重用,所以在上面的GC和内存带宽消耗都能保持较小。同样,当接收到的数据被销毁时用户已经完成操作,codec的实现就变得更简单和有效了

在创建出站缓存时也是差不多的(不会新建)。用户的ChannelOutBoundBYteHandler和ChannelOutboundMessageHandler来操作。

不需要每条消息都有一个事件

4.0里不再有了messageReceived或writeRequested处理器方法。它们被inboundBufferUpdated和flush代替了。用户的入队一个或多个消息到一个入站(或出站)缓存同时会出发一个inboundBUfferUpdated(或flush)事件。

public void inboundBufferUpdated(ChannelHandlerContext ctx) {
    Queue<MyMessage> in = ctx.inboundMessageBuffer();
    Queue<MyNewMessage> out = ctx.nextInboundMessageBuffer();
    for (;;) {
        MyMessage m = in.poll();
        if (m == null) {
            break;
        }
        MyNewMessage decoded = decode(m);
        out.add(decoded);
    }
    ctx.fireInboundBufferUpdated();
}

public void flush(ChannelHandlerContext ctx, ChannelFuture future) {
    Queue<MyNewMessage> in = ctx.outboundMessageBuffer();
    Queue<MyMessage> out = ctx.nextOutboundMessageBuffer();
    for (;;) {
        MyNewMessage m = in.poll();
        if (m == null) {
            break;
        }
        MyMessage encoded = encode(m);
        out.add(encoded);
    }
    ctx.flush(future);
}
作为选择,用户能够在每个单独的入站(或出站)消息中触发这样的事件来模拟老的行为,尽管相对新方法来说效率更低。

消息处理器 vs. 字节处理器

在3.x里一个MessageEvent持有一个任意的对象。它能够是一个ChannelBuffer或是一个用户自定义的对象,它们都是同样对待的:

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) {
    Object msg = evt.getMessage();
    if (msg instanceof ChannelBuffer) {
        ChannelBuffer buf = (ChannelBuffer) msg;
        ...
    } else {
        MyMessage myMsg = (MyMessage) msg;
        ...
    }
}
在4.0里,它们就分别对待了,因为一个处理器不再处理一个独立的消息,而是处理多种多样的消息:
public void inboundBufferUpdated(ChannelHandlerContext ctx) {
    if (ctx.hasInboundByteBuffer()) {
        ByteBuf buf = ctx.inboundByteBuffer();
        ...
    } else {
        Queue<MyMessage> buf = ctx.inboundMessageBuffer();
        for (;;) {
            MyMessage msg = buf.poll();
            if (buf == null) {
                break;
            }
            ...
        }
    }
}
你可能发现一个ServerChannel的处理器是一个入站缓存是Queue<Channel>的入站处理器是较为有趣的。

处理器适配器

大多数用户都发现创建和管理它的生命周期是繁琐的,因此它支持用户扩展预定义好的适配器类来使得更方便:

明智的和不易出错的入站流量挂起

3.x有一个由Channel.setReadable(boolean)提供的不是很明显的入站流量挂起机制。它引入了在ChannelHandler之间的复杂交互操作,同时处理器由于不正确实现而很容易互相干扰。

4.0里,新的名为read()的出站操作增加了。如果你使用Channel.config().setAutoRead(false)来关闭默认的auto-read标志,Netty不会读入任何东西,直到你显式地调用read()操作。一旦你发布的read()操作完成,同时通道再次停止读,一个名为channelReadSuspended()的入站事件会触发一遍你能够重新发布另外的read()操作。你同样也可以拦截read()操作来执行更多高级的流量控制。

暂停接收传入的连接

在3.x里,没有方法让一个用户告诉Netty来厅子接收传入连接,除非是阻塞I/O线程或者关闭服务器socket。在aotu-read标志没有设置的时候,4.0涉及到的read()操作就像一个普通的通道。

半关闭socket

TCP和SCTP允许用户关闭一个socket的出站流量而不用完全关闭它。这样的socket被称为“半关闭socket”,同时用户能够通过调用SocketChannel.shutdownOutput()方法来获取一个半关闭socket。如果一个远端关闭了出站通道,SocketChannel.read(..)会返回-1,这看上去并没有和一个关闭了的链接有什么区别。

3.x没有shutdownOutput()操作。同样,它总是在SocketChannel.read(..)返回-1的时候关闭链接。

要支持半关闭socket,4.0增加了SocketChannel.shutdownOutput()方法,同时用户能设置“ALLOW_HALF_CLOSURE”的ChanneOption来阻止Netty在SocketChannel.read(..)返回-1的时候自动关闭链接。

灵活的I/O线程分配

在3.x里,一个Channel是由ChannelFactory创建的,同时新创建的Channel会自动注册到一个隐藏的I/O线程。4.0使用新的名为EventLoopGroup的接口来替换ChannelFactory,它由一个或多个EventLoop来构成。同样,一个新的Channel不会自动注册到EventLoopGroup,但用户可以显式调用EventLoopGroup.register()来注册。

感谢这个变化(举例来说,分离了ChannelFactory和I/O线程),用户可以注册不同的Channel实现到同一个EventLoopGroup,或者同一个Channel实现到不同的EventLoopGroup。例如,你可以运行一个NIO服务器socket,NIO UDP socket,以及虚拟机内部的通道在同一个I/O线程里。在编写一个需要最小延迟的代理服务器时这确实很有用。

能够从一个已存在的jdk套接字上创建一个Channel

3.x没提供方法从已存在的jdk套接字(如java.nio.channels.SocketChannel)创建一个新的通道。现在你可以用4.0这样做了。

取消注册和重新注册一个Channel从/到一个I/O线程

一旦一个新的Channel在3.x里创建,它完全绑定到一个单一的I/O线程上,直到它底层的socket关闭。在4.0里,用户能够从I/O线程里取消注册一个Channel来完全控制它底层jdk套接字。例如,你能够利用Netty提供的高层次无阻塞I/O的优势来解决复杂的协议,然后取消注册Channel并且切换到阻塞模式来在可能的最大吞吐量下传输一个文件。当然,它能够再次注册已经取消了注册的Channel。

java.nio.channels.FileChannel myFile = ...;
java.nio.channels.SocketChannel mySocket = java.nio.channels.SocketChannel.open();
 
// Perform some blocking operation here.
...
 
// Netty takes over.
SocketChannel ch = new NioSocketChannel(mySocket);
EventLoopGroup group = ...;
group.register(ch);
...
 
// Deregister from Netty.
ch.deregister().sync();
 
// Perform some blocking operation here.
mySocket.configureBlocking(false);
myFile.transferFrom(mySocket, ...);
 
// Register back again to another event loop group.
EventLoopGroup anotherGroup = ...;
anotherGroup.register(ch);

调度任意的任务到一个I/O线程里运行

当一个Channel被注册到EventLoopGroup时,Channel实际上是注册到由EventLoopGroup管理EventLoop中的一个。EventLoop实现了java.utilconcurrent.ScheduledExecutorService接口。这意味着用户可以在一个用户通道归属的I/O线程里执行或调度一个任意的Runnable或Callable。随着新的娘好定义的线程模型的到来(稍后会介绍),它变得极其容易地编写一个线程安全的处理器。

public class MyHandler extends ChannelOutboundMessageHandlerAdapter {
    ...
    public void flush(ChannelHandlerContext ctx, ChannelFuture f) {
        ...
        ctx.flush(f);
 
        // Schedule a write timeout.
        ctx.executor().schedule(new MyWriteTimeoutTask(), 30, TimeUnit.SECONDS);
        ...
    }
}
 
public class Main {
    public static void main(String[] args) throws Exception {
        // Run an arbitrary task from an I/O thread.
        Channel ch = ...;
        ch.executor().execute(new Runnable() { ... });
    }
}

简化的关闭

releaseExternalResources()不必再用了。你可以通过调用EventLoopGroup.shutdown()直接地关闭所有打开的连接同时使所有I/O线程停止,就像你使用java.util.concurrent.ExecutorService.shutdown()关闭你的线程池一样。

类型安全的ChannelOptions

有两个方法来配置Netty的Channel的socket参数。第一个是明确地调用ChannelConfig的setter,例如SocketChannelConfig.setTcpNoDelay(true)。这是最为类型安全的方法。另外一个是调用ChannelConfig.setOption()方法。有时候你不得不决定在运行时的时候socket要配置什么选项,同时这个方法在这种情况下有点不切实际。然而,在3.x里它是容易出错的,因为一个用户必需用一对字符串和对象来指定选项。如果用户调用了错误的选项名或者值,他或她将会赵宇到一个ClassCastException或指定的选项甚至可能会默默地忽略了。

4.0引入了名为ChannelOption的新的类型,它提供了类型安全地访问socket选项。

ChannelConfig cfg = ...;
 
// Before:
cfg.setOption("tcpNoDelay", true);
cfg.setOption("tcpNoDelay", 0);  // Runtime ClassCastException
cfg.setOption("tcpNoDelays", true); // Typo in the option name - ignored silently
 
// After:
cfg.setOption(ChannelOption.TCP_NODELAY, true);
cfg.setOption(ChannelOption.TCP_NODELAY, 0); // Compile error

AttributeMap

在回应用户指令里,你可以附加任意的对象到Channel和ChannelHandlerContext。一个名为AttributeMap的新接口被加入了,它被Channel和ChannelHandlerContext继承。作为替代,ChannelLocal和Channel.attachment被移除。这些属性会在他们关联的Channel被垃圾回收的同时回收。

public class MyHandler extends ChannelInboundMessageHandlerAdapter<MyMessage> {
 
    private static final AttributeKey<MyState> STATE =
            new AttributeKey<MyState>("MyHandler.state");
 
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) {
        ctx.attr(STATE).set(new MyState());
        ctx.fireChannelRegistered();
    }
 
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MyMessage msg) {
        MyState state = ctx.attr(STATE).get();
    }
    ...
}


新的bootstrap API

bootstrap API已经重头重写,尽管它的目的还是一样;它执行需要使服务器或客户端运行的典型步骤,通常能在样板代码里找到。新的bootstrap同样采取了流畅的接口。

public static void main(String[] args) throws Exception {
    // Configure the server.
    ServerBootstrap b = new ServerBootstrap();
    try {
        b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
         .channel(new NioServerSocketChannel())
         .option(ChannelOption.SO_BACKLOG, 100)
         .localAddress(8080)
         .childOption(ChannelOption.TCP_NODELAY, true)
         .childHandler(new ChannelInitializer<SocketChannel>() {
             @Override
             public void initChannel(SocketChannel ch) throws Exception {
                 ch.pipeline().addLast(handler1, handler2, ...);
             }
         });
 
        // Start the server.
        ChannelFuture f = b.bind().sync();
 
        // Wait until the server socket is closed.
        f.channel().closeFuture().sync();
    } finally {
        // Shut down all event loops to terminate all threads.
        b.shutdown();
    }
}

ChannelPipelineFactory → ChannelInitializer

和你在上面的例子注意到的一样,ChannelPipelineFactory不再存在了。而是由ChannelInitializer来替换,它给予了在Channel和ChannelPipeline的配置的更多控制。

请注意,你不能自己创建一个新的ChannelPipeline。通过观察目前为止的用例报告,Netty项目队伍总结到让用户去创建自己的管道实现或者是继承默认的实现是没有好处的。因此,ChannelPipeline不再让用户创建。ChannelPipeline由Channel自动创建。

ChannelFuture拆分为ChannelFuture和ChannelPromise

ChannelFuture已经被拆分为ChannelFuture和ChannelPromise了。这不仅仅是让异步操作里的生产者和消费者间的约定更明显,同样也是得在使用从链中返回的ChannelFuture更加安全,因为ChannelFuture的状态是不能改变的。

由于这个编号,一些方法现在都采用ChannelPromiser而不是ChannelFuture来改变它的状态。

良好定义的线程模型

在3.x里并没有良好设计的线程模型,尽管曾经要修复线程模型在3.5的不一致性。4.0定义的一个严格的线程模型来帮助用户编写ChannelHandler而不必担心太多关于线程安全的东西。

不再有ExecutionHandler ——它包含到核心里

在你加入一个ChannelHandler到一个ChannelPipeline来告诉管道总是通过指定的EventExecutor调用加入的ChannelHander处理器的方法的时候,你可以指定一个EventExecutor。

Channel ch = ...;
ChannelPipeline p = ch.pipeline();
EventExecutor e1 = new DefaultEventExecutor(16);
EventExecutor e2 = new DefaultEventExecutor(8);
 
p.addLast(new MyProtocolCodec());
p.addLast(e1, new MyDatabaseAccessingHandler());
p.addLast(e2, new MyHardDiskAccessingHandler());

EventExecutor是EventLoop的超类,同时也继承了ScheduledExecutorService。

fixme

编码解码器框架变化

在编码解码器框架里有实质性的内部改变,因为4.0需要一个处理器来创建和管理它的缓存(看这篇文章的每个处理器缓存部分。)然而,从用户角度来看这些变化都不是很大的。

编码解码器嵌入器→ EmbeddedChannel

编码解码器嵌入器已经被 io.netty.channel.embedded.EmbeddedByteChannel和EmbeddedMessageChannel替换了。EmbeddedChannel允许用户对任何包含编码解码器的管道进行单元测试。

HTTP编码解码器

HTTP解码器现在在每个HTTP消息中总生成多个消息对象:

1       * HttpRequest / HttpResponse
0 - n   * HttpContent
1       * LastHttpContent

要看更多的细节,请到转到已更新了的HttpSnoopServer例子。如果你希望为一个单一的HTTP消息处理多个消息,你可以把HttpObjectAggregator放入管道里。HttpObjectAggregator会把多个消息转换为一个单一的FullHttpRequest或是FullHttpResponse。

传输实现的变化

下面是传输协议新加入的东西:

用例学习:移植示例Factorial

这部分粗略地展示把示例Factorial从3.0移植到4.0的步骤。示例Factorial已经移植到4.0了,它放在io.netty.example.factorial包里。请浏览示例的源代码来看下每一处的变化。

移植服务端

  1. 重写FactorialServer.run()方法来使用新的 bootstrap API。
    1. 不再有ChannelFactory了。 由你自己去实例化NioEventLoop(一个是用来接收接入的链接,另外的就用来处理接收到的链接)。
  2. 从命名FactorialServerPipelineFactory为FactorialServerInitializer。
    1. 让它继承ChannelInitializer<Channel>。
    2. 取代创建新的ChannelPipeline,通过Channel.pipeline()来获取。
  3. 让FactorialServerHandler继承sChannelInboundMessageHandlerAdapter<BigInteger>。
    1. 用channelInactive()来替换channelDisconnected()。
    2. handleUpstream()不能再使用了。
  4. 让BigIntegerDecoder继承ByteToMessageDecoder<BigInteger>。
  5. 让NumberEncoder继承MessageToByteEncoder<Number>。
    1. encode()不在返回一个缓存了。由ByteToMessageDecoder来提供填充编码好的数据到缓存里。

移植客户端

大部分和移植服务端差不多,但你要在你编写一个潜在的大数据流时要多注意下。

  1. 重写FactorialClient.run()方法来使用新的bootstrap API。
  2. 重命名FactorialClientPipelineFactory为FactorialClientInitializer。
  3. 使FactorialClientHandler继承ChannelInboundMessageHandlerAdapter<BigInteger>
    1. 在这一点,你发现在4.0里没有了Channel.isWritable()或者channelInterestChanged()。作为代替,你自己来管理那些未决定的写操作。新的sendNumbers()看起来如下:
      private void sendNumbers() {
          // Do not send more than 4096 numbers.
          boolean finished = false;
          MessageBuf<Object> out = ctx.nextOutboundMessageBuffer();
          while (out.size() < 4096) {
              if (i <= count) {
                  out.add(Integer.valueOf(i));
                  i ++;
              } else {
                  finished = true;
                  break;
              }
          }
    
          ChannelFuture f = ctx.flush();
          if (!finished) {
              f.addListener(numberSender);
          }
      }
    
      private final ChannelFutureListener numberSender = new ChannelFutureListener() {
          @Override
          public void operationComplete(ChannelFuture future) throws Exception {
              if (future.isSuccess()) {
                  sendNumbers();
              }
          }
      };