Netty传输文件出现java.io.IOException: Connection reset by peer

开源为了谁 发布于 2015/07/24 12:40
阅读 5K+
收藏 1

@冠超杨 你好,想跟你请教个问题:

这是错误信息

java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at io.netty.buffer.UnpooledUnsafeDirectByteBuf.setBytes(UnpooledUnsafeDirectByteBuf.java:446)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:744)

然后我的代码是:

fileServer.java

public class FileServer {

static final int PORT = ConfigUtils.getInteger("trans.server.file.port");
static final int BOSSTHREAD_COUNT = ConfigUtils.getInteger("trans.server.bossThread.size");
static final int WORKERTHREAD_COUNT = ConfigUtils.getInteger("trans.server.workerThread.size");


public static void main(String[] args) {
try {
System.out.println("进入main函数");
runServer();
System.out.println("netty server started...");
} catch (Exception e) {
System.out.println("netty server start failed!");
e.printStackTrace();
}
}


/**
* 启动服务
*/
public static void runServer() throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup(BOSSTHREAD_COUNT); //开启n个boss线程接受传入的请求
EventLoopGroup workerGroup = new NioEventLoopGroup(WORKERTHREAD_COUNT); //开启n个worker线程处理boss接受的请求的具体操作
try {
ServerBootstrap bootstrap = new ServerBootstrap(); //服务器启动相关操作工具类
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) //使用NioServerSocketChannel作为通道接受到来的请求
.option(ChannelOption.SO_BACKLOG, 128) // option()接受请求的NioServerSocketChannel的选项设置
.childOption(ChannelOption.SO_KEEPALIVE, true) //
.childHandler(new ChannelInitializer<SocketChannel>() { //帮助用户配置一个新的通道,可以添加更多的处理程序实现复杂逻辑
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(
                    new ByteArrayEncoder(),
new FileServerHandler()
);
 
}
});
// Bind and start to accept incoming connections.
ChannelFuture f = bootstrap.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();

} catch (Exception e) {
e.printStackTrace();
}finally{
// Shut down all event loops to terminate all threads. 
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

fileServerHander.java


public class FileServerHandler extends ChannelInboundHandlerAdapter {


private static String receiveDir = ConfigUtils.getString("trans.server.file.dir");
private boolean first = true;
private FileOutputStream fos;
private BufferedOutputStream bufferedOutputStream;
private String filePath;
private long fileLength;
private long readLength;
private String fileId;


@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
....
//此处代码省略

//保存文件并更新数据库文件路径
String directory = receiveDir+FileUtils.getFolderByDate();
fileId = jsonInfo.getString("file_id");//数据库中的表示该文件数据的Id
String filetxt = jsonInfo.getString("file_name");
String fileName = "";
if(filetxt.contains(".")){
fileName = filetxt.substring(filetxt.lastIndexOf("."));
}
File file_dir = new File(directory);
if (!file_dir.exists()) {
file_dir.mkdirs();//创建目录
}
filePath = directory + ConstUtils.FILE_RECEIVED_PREFIX + fileId+UUIDGenerator.getId()+fileName;//EN_表示未解密
fileLength = jsonInfo.getLong("file_size");
File file = new File(filePath);
if (!file.exists()) {
file.createNewFile();//创建文件
}
fos =  new FileOutputStream(file);
bufferedOutputStream = new BufferedOutputStream(fos);
first = false;
jsonInfo.put("server_path", filePath);
CheckInfo.update(jsonInfo);
JSONObject response = new JSONObject();
response.put("msgType", ConstUtils.TRANSFER_DATATYPE_C);

response.put("msg", fileId);
ctx.writeAndFlush(response.toString().getBytes());

}else{
bufferedOutputStream.write(bytes, 0, bytes.length);
// buf.release();
readLength += bytes.length;
System.out.println(readLength+"=="+fileLength+"="+(readLength == fileLength));
if (readLength >= fileLength) {
CheckInfo.updateFileCatchState(fileId);//修改文件获取状态
JSONObject response = new JSONObject();
System.out.println("----------------start----------------");
if (bufferedOutputStream != null ) {
bufferedOutputStream.flush();
bufferedOutputStream.close();
bufferedOutputStream=null;
}
if (fos != null) {
fos.close();
fos = null;
}
System.out.println("----------------buffereoutputStream----------------"+bufferedOutputStream);
response.put("msgType", ConstUtils.TRANSFER_DATATYPE_FILE);
response.put("msg", ConstUtils.TRANSFER_FILE_STATUS);
ctx.writeAndFlush(response.toString());
ctx.close();
}
}
} catch (Exception e) {


System.out.println("文件传输信息:"+message);
System.out.println("错误信息Exception:"+e.getMessage());
e.printStackTrace();
JSONObject response = new JSONObject();
response.put("msgType", ConstUtils.TRANSFER_DATATYPE_RESPONSE_STATUS);
response.put("msg", ConstUtils.RESPONSE_STATUS_ERR);
ctx.writeAndFlush(response.toString());
ctx.close();

}finally{
buf.release();
}
}


@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
/*if (bufferedOutputStream != null ) {
bufferedOutputStream.flush();
bufferedOutputStream.close();
}
if (fos != null) {
fos.close();
}*/
// ctx.channel().c
System.out.println(" fileServerHandler in channelInactive");
super.channelInactive(ctx);
}



@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}



@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
try {
bufferedOutputStream.flush();
bufferedOutputStream.close();
fos.close();
System.out.println("exceptionCaught: "+cause.getStackTrace());
JSONObject response = new JSONObject();
response.put("msgType", ConstUtils.TRANSFER_DATATYPE_FILE);
response.put("msg", ConstUtils.RESPONSE_STATUS_ERR);
ctx.writeAndFlush(response.toString());
ctx.close();

} catch (IOException e) {
System.out.println("exceptionCaught:-----------");
e.printStackTrace();
}
}


}


可以帮我看看么?这问题我都纠结了几天了,谢谢了!



加载中
返回顶部
顶部