Netty关闭连接后引发的空指针异常

lingengbin 发布于 2014/09/28 16:55
阅读 9K+
收藏 2

我用netty做一个文件传输,当客户端下载文件时,服务端传文件。服务端传文件的代码如下:

final ChunkedStream chunkedStream = new ChunkedStream(fis);
ctx.channel().writeAndFlush(chunkedStream).addListener(new ChannelFutureListener() {
	@Override
	public void operationComplete(ChannelFuture future) throws Exception {
		if(!future.isSuccess()) {
			logger.debug("File send timeout");
		}
		if (ctx != null ) {
		        if (ctx.channel().isActive()) {
			    ctx.close();
			}									
		}
							
		if (chunkedStream != null) {
			chunkedStream.close();
		}
		if (fis != null) {
			try {
			    fis.close();
			} catch (IOException e) {
			}
		}
							
	}
});



问题出现在以下场景:我写了一个下载超时控制的handler,当超过指定时间后,会触发一个我自定义的Exception。然后我在exceptionCaught里捕获它,并关闭连接。这时,服务端还继续着向客户端传文件,连接关闭后,上面代码的operationComplete方法会被触发,并且future.isSuccess()返回false。operationComplete方法里面的代码执行完后,就开始出错了。错误信息栈如下所示:


16:29:55.637-[WARN ] Lsr-FileTransferService-IoProcessor-73 DefaultPromise - An exception was thrown by io.netty.handler.stream.ChunkedWriteHandler$5.operationComplete()

java.lang.IllegalStateException: complete already: DefaultChannelPromise@82acef(failure(java.nio.channels.ClosedChannelException)

at io.netty.util.concurrent.DefaultPromise.setFailure(DefaultPromise.java:401) [netty-all-4.0.10.Final.jar:na]

at io.netty.channel.DefaultChannelPromise.setFailure(DefaultChannelPromise.java:87) ~[netty-all-4.0.10.Final.jar:na]

at io.netty.handler.stream.ChunkedWriteHandler$PendingWrite.fail(ChunkedWriteHandler.java:354) ~[netty-all-4.0.10.Final.jar:na]

at io.netty.handler.stream.ChunkedWriteHandler$5.operationComplete(ChunkedWriteHandler.java:306) ~[netty-all-4.0.10.Final.jar:na]

at io.netty.handler.stream.ChunkedWriteHandler$5.operationComplete(ChunkedWriteHandler.java:301) ~[netty-all-4.0.10.Final.jar:na]

at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:621) [netty-all-4.0.10.Final.jar:na]

at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:548) [netty-all-4.0.10.Final.jar:na]

at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:407) [netty-all-4.0.10.Final.jar:na]

at io.netty.channel.ChannelOutboundBuffer.safeFail(ChannelOutboundBuffer.java:508) [netty-all-4.0.10.Final.jar:na]

at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:296) [netty-all-4.0.10.Final.jar:na]

at io.netty.channel.ChannelOutboundBuffer.failFlushed(ChannelOutboundBuffer.java:440) [netty-all-4.0.10.Final.jar:na]

at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:550) [netty-all-4.0.10.Final.jar:na]

at io.netty.channel.DefaultChannelPipeline$HeadHandler.close(DefaultChannelPipeline.java:1018) [netty-all-4.0.10.Final.jar:na]

at io.netty.channel.DefaultChannelHandlerContext.invokeClose(DefaultChannelHandlerContext.java:560) [netty-all-4.0.10.Final.jar:na]

at io.netty.channel.DefaultChannelHandlerContext.close(DefaultChannelHandlerContext.java:545) [netty-all-4.0.10.Final.jar:na]

at io.netty.channel.DefaultChannelHandlerContext.close(DefaultChannelHandlerContext.java:423) [netty-all-4.0.10.Final.jar:na]

at cn.com.agree.afa.lsr.service.aft.codec.service.GetFile$2.exceptionCaught(GetFile.java:124) [classes/:na]

at io.netty.channel.DefaultChannelHandlerContext.invokeExceptionCaught(DefaultChannelHandlerContext.java:275) [netty-all-4.0.10.Final.jar:na]

at io.netty.channel.DefaultChannelHandlerContext.fireExceptionCaught(DefaultChannelHandlerContext.java:253) [netty-all-4.0.10.Final.jar:na]

at cn.com.agree.afa.lsr.service.aft.codec.FileReadTimeoutHandler.readTimeout(FileReadTimeoutHandler.java:147) [classes/:na]

at cn.com.agree.afa.lsr.service.aft.codec.FileReadTimeoutHandler.access$2(FileReadTimeoutHandler.java:145) [classes/:na]

at cn.com.agree.afa.lsr.service.aft.codec.FileReadTimeoutHandler$FileReadTimeoutTask.run(FileReadTimeoutHandler.java:171) [classes/:na]

at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38) [netty-all-4.0.10.Final.jar:na]

at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:123) [netty-all-4.0.10.Final.jar:na]

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:354) [netty-all-4.0.10.Final.jar:na]

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:348) [netty-all-4.0.10.Final.jar:na]

at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:101) [netty-all-4.0.10.Final.jar:na]

at java.lang.Thread.run(Thread.java:662) [na:1.6.0_43]

java.nio.channels.ClosedChannelException: null

 

 

我的猜测是这样的,ctx.close()调用一次setFailure方法,而operationComplete执行完后也会调用ctx的setFailure方法,但是这时它调用的ctx已经被关闭了。所以有了上面的空指针异常。

 

我的问题是:
1,上面的错误是什么原因呢?

2,有没有更好的方法来实现下载超时?

 

任何意见或看法都感激不尽,谢谢!!


加载中
0
Grrrr
Grrrr

很明显是setFailure调用了2次.

你的这个状况和下面的代码一样:

package netty;


import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.timeout.WriteTimeoutException;
import io.netty.util.concurrent.Promise;


import java.nio.channels.ClosedChannelException;


public class FutureTest {


 public static void main(String[] args) {
 EventLoopGroup group = new NioEventLoopGroup(0x1);
 Promise<String> p = group.next().newPromise();
 p.setFailure(new ClosedChannelException());//超时,你主动关闭了连接
 p.setFailure(WriteTimeoutException.INSTANCE);//writeAndFlush又抛了一次
 group.shutdownGracefully();
 }
}

 
 
 
 


0
char1st
char1st
   if(!future.isSuccess()) {
            logger.debug("File send timeout");
        }else{
            ..............
        }

先这样行不行?

char1st
char1st
能贴一下FileReadTimeoutHandler的代码么?
l
lingengbin
因为无论future成功或者失败,我都得关闭资源。所以没有用if{}else{}。之前也像你说的这样写过,还是一样有错误
0
Grrrr
Grrrr
我建议楼主先看下netty的事件驱动模型。再把java里面的Future & Promise看一下,以后处理这种问题就太轻松了。
Grrrr
Grrrr
回复 @lingengbin : 顺便说一句,一般在handler里面主动抛出异常,在exceptionCause()方法里面不建议关闭channel。一般都是返回一个错误消息,除非你确切知道你关闭channel不会引起当前io线程的其他操作,因为你是在netty异步的环境下进行的。比如你这个关闭操作就影响了writeAndFlush的操作。
l
lingengbin
十分感谢,对我十分有用,谢谢你!!
0
l
lingengbin

引用来自“char1st”的评论

   if(!future.isSuccess()) {
            logger.debug("File send timeout");
        }else{
            ..............
        }

先这样行不行?

回复 @char1st
谢谢你的回答,下面是我实现的FileReadTimeoutHandler源代码,这个Handler参考了netty自带的ReadTimeoutHandler,这两个不一样的是Netty自带的ReadTimeoutHandler的timeout指的是两次读写之间的空闲时间,而我的FileReadTimeoutHandler里面的timeout指的是从handler被加到pipeline后开始计算的时间。
public class FileReadTimeoutHandler extends ChannelInboundHandlerAdapter {


private final long timeoutMillis;
private volatile ScheduledFuture<?> timeout;


private volatile long firstReadTime;


private volatile int state;// 0 - none, 1 - Initialized, 2 - Destroyed;


private volatile boolean closed;


public FileReadTimeoutHandler(int timeoutMillis) {
if (timeoutMillis < 0) {
this.timeoutMillis = 0;
} else {
this.timeoutMillis = Math.max(timeoutMillis, 1);
}
}


/*
* (non-Javadoc)
* 
* @see
* io.netty.channel.ChannelInboundHandlerAdapter#channelRegistered(io.netty
* .channel.ChannelHandlerContext)
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isActive()) {
initialize(ctx);
}
super.channelRegistered(ctx);
}


/*
* (non-Javadoc)
* 
* @see
* io.netty.channel.ChannelInboundHandlerAdapter#channelActive(io.netty.
* channel.ChannelHandlerContext)
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
initialize(ctx);
super.channelActive(ctx);
}


/*
* (non-Javadoc)
* 
* @see
* io.netty.channel.ChannelInboundHandlerAdapter#channelInactive(io.netty
* .channel.ChannelHandlerContext)
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
destroy();
super.channelInactive(ctx);
}


/*
* (non-Javadoc)
* 
* @see
* io.netty.channel.ChannelInboundHandlerAdapter#channelRead(io.netty.channel
* .ChannelHandlerContext, java.lang.Object)
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
// 不能减少msg的引用计数
System.out.println("file read timeout reveice");
ctx.fireChannelRead(msg);
}


/*
* (non-Javadoc)
* 
* @see
* io.netty.channel.ChannelHandlerAdapter#handlerAdded(io.netty.channel.
* ChannelHandlerContext)
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
initialize(ctx);
}
}


/*
* (non-Javadoc)
* 
* @see
* io.netty.channel.ChannelHandlerAdapter#handlerRemoved(io.netty.channel
* .ChannelHandlerContext)
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
destroy();
}


private void initialize(ChannelHandlerContext ctx) {
System.out.println("file read timeout initialize");
if (state == 0) {
state = 1;
firstReadTime = System.currentTimeMillis();
if (timeoutMillis > 0) {
timeout = ctx.executor().schedule(new FileReadTimeoutTask(ctx),
timeoutMillis, TimeUnit.MILLISECONDS);
}
}
}

private void destroy() {
        state = 2;
        if (timeout != null) {
            timeout.cancel(false);
            timeout = null;
        }
    }

private void readTimeout(ChannelHandlerContext ctx) throws Exception {
if(!closed) {
ctx.fireExceptionCaught(FileReadTimeoutException.INSTANCE);
//	ctx.close();
closed = true;
}
}


private final class FileReadTimeoutTask implements Runnable {


private final ChannelHandlerContext ctx;


FileReadTimeoutTask(ChannelHandlerContext ctx) {
this.ctx = ctx;
}


@Override
public void run() {
if (!ctx.channel().isOpen()) {
return;
}


long currentTime = System.currentTimeMillis();
long nextDelay = timeoutMillis - (currentTime - firstReadTime);
if (nextDelay <= 0) {
try {
readTimeout(ctx);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
timeout = ctx.executor().schedule(this, nextDelay,
TimeUnit.MILLISECONDS);
}
}
}


}





0
Grrrr
Grrrr
顺便说一句,一般在handler里面主动抛出异常,在exceptionCause()方法里面不建议关闭channel。一般都是返回一个错误消息,除非你确切知道你关闭channel不会引起当前io线程的其他操作,因为你是在netty异步的环境下进行的。比如你这个关闭操作就影响了writeAndFlush的操作。
l
lingengbin
也许我应该试试自定义事件去触发userEventTriggered
返回顶部
顶部