【开源中国 APP 全新上线】“动弹” 回归、集成大模型对话、畅读技术报告”
version : netty-3.2.1.final
location : org.jboss.netty.channel.socket.nio.NioWorker.read(SelectionKey)
代码片段如下:
ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize); ... if (readBytes > 0) { bb.flip(); final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory(); final ChannelBuffer buffer = bufferFactory.getBuffer( bb.order(bufferFactory.getDefaultOrder())); recvBufferPool.release(bb); // Update the predictor. predictor.previousReceiveBufferSize(readBytes); // Fire the event. fireMessageReceived(channel, buffer); } else { recvBufferPool.release(bb); }
fireMessageReceived会最终触发handler的messageReceived方法的调用, 但接收数据的buffer已在此前就归还给了缓存池(recvBufferPool). 其实, bb的回收放在fireMessageReceived之前或之后, 在一般情况下对执行的结果不产生影响, 毕竟在fireMessageReceived执行完毕时, bb也就使用完了. 但是, 若有三个前提条件同时满足时, 就会出现bb被覆写问题.
- 使用ExecutionHandler异步化messageReceived的执行, 那么bb就有可能在(异步线程)读取之前, 被(当前线程)用后来的数据覆写, 除非buffer是bb的副本;
- 使用DirectChannelBufferFactory实现zero-copy, 那么buffer肯定是对bb的wrapped, 但bb并不是直接服用的, 而是由recvBufferPool管理, 除非bb刚好满足了后来数据的需要;
- 使用FixedReceiveBufferSizePredictor令每次对缓存大小需求都一样, 那么bb就会被覆写.
重现这一陷阱的代码如下:
package cn.cafusic.netty.execution.bug; import java.net.InetSocketAddress; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.buffer.DirectChannelBufferFactory; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.FixedReceiveBufferSizePredictor; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig; import org.jboss.netty.handler.execution.ExecutionHandler; /** * Bug * * @author <a href=mailto:jushi@taobao.com>jushi</a> * @created 2010-8-17 * */ public class Bug { private static class ClientHandler extends SimpleChannelHandler { @Override public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception { ChannelBuffer buf = (ChannelBuffer) e.getMessage(); while (buf.readable()) { System.out.print((char) buf.readByte()); } System.out.println(); e.getChannel().close(); } } private static class ServerHandler extends SimpleChannelHandler { @Override public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception { TimeUnit.SECONDS.sleep(1); // delay for buffer rewrite. e.getChannel().write(e.getMessage()); } @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("connected : " + e.getChannel()); NioSocketChannelConfig config = (NioSocketChannelConfig) e.getChannel().getConfig(); config.setBufferFactory(new DirectChannelBufferFactory()); // zero-copy config.setReceiveBufferSizePredictor(new FixedReceiveBufferSizePredictor(10)); // fix buffer size requirement } @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("closed : " + e.getChannel()); } } static void serve() { final ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor(), 1); final ServerBootstrap bootstrap = new ServerBootstrap(factory); ChannelPipeline pipeline = bootstrap.getPipeline(); pipeline.addLast("execution", new ExecutionHandler(Executors.newCachedThreadPool())); // async message received pipeline.addLast("handler", new ServerHandler()); bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setOption("child.keepAlive", true); final Channel bind = bootstrap.bind(new InetSocketAddress(8080)); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { System.out.println("shutdown"); bind.close().awaitUninterruptibly(); bootstrap.releaseExternalResources(); } }); } static void connect() { final ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor(), 1); final ClientBootstrap bootstrap = new ClientBootstrap(factory); ChannelPipeline pipeline = bootstrap.getPipeline(); pipeline.addLast("handler", new ClientHandler()); bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setOption("child.keepAlive", true); ChannelFuture future = bootstrap.connect(new InetSocketAddress("localhost", 8080)); Channel channel = future.awaitUninterruptibly().getChannel(); channel.write(ChannelBuffers.wrappedBuffer("++++++++++".getBytes())) .awaitUninterruptibly(); channel.write(ChannelBuffers.wrappedBuffer("----------".getBytes())) .awaitUninterruptibly(); channel.write(ChannelBuffers.wrappedBuffer("==========".getBytes())) .awaitUninterruptibly(); channel.getCloseFuture().awaitUninterruptibly(); bootstrap.releaseExternalResources(); } public static void main(String[] args) { serve(); connect(); System.exit(0); } }