netty4.x 数据发送问题

小乞丐 发布于 2015/05/18 12:02
阅读 11K+
收藏 2

大家好:

 最近研究netty中遇到一个奇怪的问题,

客户端采用tcp工具模拟,服务端是netty编写,在handler中循环向客户端发送数据,一段时间后:

客户端讲收不到任何数据:

服务端任然在正常发送数据,并且当我把客户端断开连接后,服务器并无响应,并且发送数据不报任何错误,服务器任然能正常发送错误,但实际上客户端已经断开连接了。而且发送数据的furture监听方法也不在执行!

服务端代码如下:

while(true){

String srcData = new String(bytes, "UTF-8");
					if(!ctx.channel().isActive() )return ;
					sendData(srcData);

}




	/**
	 * 发送GPS数据至客户端
	 * @throws ClosedChannelException 
	 */
	private void sendData(String srcData) {
		//System.out.println(ctx.channel().isActive());
		ChannelFuture future = ctx.channel().writeAndFlush(srcData);
		boolean is = future.isSuccess();
		//System.out.println(future.isSuccess());
		/**
		 * 监听发送状态..
		 */
		//future.awaitUninterruptibly(5 * 1000);
		future.addListener(new ChannelFutureListener() {
		    public void operationComplete(ChannelFuture future) {
		        if (future.isSuccess()) {
		            logger.debug(future.channel() + ",成功发送GPS数据.");
		        } else {
		            Channel channel = future.channel();
		            Throwable cause = future.cause();
		            logger.error("当前channel[{"+channel+"}]发送GPS数据包失败.",cause);
		            ctx.channel().close();
		        }
		    };
		});
	}


并且发送数据全部累积在netty的发送队列中,导致内存溢出:

2015-05-17 20:57:55.434 INFO  (MessageExecutor.java:80)-5s转发客户端[/192.168.220.1:59294] 137 条,累计发送 594595 条 
2015-05-17 20:58:36.360 WARN  (SingleThreadEventExecutor.java:114)-Unexpected exception from an event executor: 
java.lang.OutOfMemoryError: Java heap space
2015-05-17 20:58:36.360 ERROR (SenderHandler.java:67)-系统异常信息!
java.lang.OutOfMemoryError: Java heap space
at java.nio.ByteBuffer.wrap(ByteBuffer.java:350)
at java.lang.StringCoding$StringDecoder.decode(StringCoding.java:137)
at java.lang.StringCoding.decode(StringCoding.java:173)
at java.lang.String.<init>(String.java:443)
at java.lang.String.<init>(String.java:515)
at com.palmgo.datasource.kafka.MessageExecutor.consumer(MessageExecutor.java:159)
at com.palmgo.datasource.handler.SenderHandler.channelActive(SenderHandler.java:32)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:183)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:169)
at io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelInboundHandlerAdapter.java:64)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:183)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:169)
at io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelInboundHandlerAdapter.java:64)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:183)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:169)
at io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelInboundHandlerAdapter.java:64)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:183)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:169)
at io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:817)
at io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:454)
at io.netty.channel.AbstractChannel$AbstractUnsafe.access$100(AbstractChannel.java:378)
at io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:424)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:662)
2015-05-17 20:58:39.728 WARN  (DefaultChannelPipeline.java:1035)-An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
java.lang.OutOfMemoryError: Java heap space

请问netty 这种情况该如何处理呢?

加载中
0
Grrrr
Grrrr
代码没有问题,但是这种写法值得怀疑,你在server端的handler里面while(true)


那么这个handler的channelRead方法永远不会返回,那么也就是说在整个pipe line中,都会卡在这个handler里面,传递不到下一个handler。


最后导致的结果就是整个pipeline永远不会执行完成。
0
小乞丐
小乞丐

引用来自“Grrrr”的评论

代码没有问题,但是这种写法值得怀疑,你在server端的handler里面while(true)


那么这个handler的channelRead方法永远不会返回,那么也就是说在整个pipe line中,都会卡在这个handler里面,传递不到下一个handler。


最后导致的结果就是整个pipeline永远不会执行完成。

你好!

我修改了下: 在handler中使用另外一个线程来循环发送数据给客户端:

在channelAcitve中使用一个线程池来循环发送消息给客户端,发送消息的类是MessageExcutor.这个类中的run方法里是一个循环发送消息。。

close方法用来关闭链接后销毁这个线程池任务。。

package com.palmgo.datasource.handler;

import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.AttributeKey;

import java.net.SocketAddress;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.log4j.Logger;

import com.palmgo.datasource.init.SysConfig;
import com.palmgo.datasource.kafka.MessageExecutor;
import com.palmgo.datasource.utils.ForwardConn;
import com.palmgo.datasource.utils.LogUtils;

public class SenderHandler extends ChannelDuplexHandler{
	private final ExecutorService executor = Executors.newFixedThreadPool(1);
	private MessageExecutor msgExecutor =null;
	private Logger logger = LogUtils.getException();
	private final AttributeKey<String> attrTopic = AttributeKey.valueOf("topic");
	@Override
	public void channelActive(ChannelHandlerContext ctx) {
		try {
			String topic = ctx.channel().attr(attrTopic).get();
			msgExecutor= new MessageExecutor(topic,0,ctx);
			msgExecutor.setIsRunning(new AtomicBoolean(true));
			executor.execute(msgExecutor);
		//	msgExecutor.consumer();
		} catch (Exception e) {
			//数据发送异常,或者链接关闭
			Channel channel = ctx.channel();
			logger.error("发送客户端["+channel.remoteAddress()+"]数据异常,关闭链接!",e);
		}
	}
	@Override
	public void close(ChannelHandlerContext ctx, ChannelPromise future)
			throws Exception {
		logger.info("close :" + ctx.channel().isActive());
		if(ctx.channel().isActive()){
			Channel channel = ctx.channel();
			SocketAddress address = channel.remoteAddress();
			String ip = address.toString().split(":").trim();
			ip = ip.substring(1); //获取具体IP地址
			Map<String,ForwardConn> map = SysConfig.getSysConfig();
			ForwardConn conn = map.get(ip);
			conn.setNum(conn.getNum()-1);
			msgExecutor.setIsRunning(new AtomicBoolean(false));
			executor.shutdownNow();
			logger.info("关闭客户端["+address+"]链接.");
			super.close(ctx, future);
		}
	}
	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		if(ctx.channel() !=null)
			closeOnFlush(ctx.channel());
	}
	@Override
	public void write(ChannelHandlerContext ctx, Object msg,
			ChannelPromise promise) throws Exception {
		super.write(ctx, msg, promise);
	}
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		logger.error("系统异常信息!",cause);
		closeOnFlush(ctx.channel());
	}


	
	/**
	 * close the specified channel after all queue write request are flushed
	 */
	static void closeOnFlush(Channel ch){
		if(ch.isActive())
			ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
	}
}




在循环中调用的发送方法:

这是一个很奇怪的现象,我在上面close方法中先结束循环,在关闭线程池,第一次客户端链接后断开链接是没有问题的,响应close方法是从发送数据监听中发起的。然后在链接上服务器,在断开,发送消息中会一直发送失败,,future.channel().close()也会执行,但是并没有进入我handler的close方法。我在父类channelIoHandler中断点,也并未进入。。。

不知道为何。。。

0
loyal
loyal
居然都是猫的头像哈 ~~~全都是~
0
小乞丐
小乞丐

引用来自“loyal”的评论

居然都是猫的头像哈 ~~~全都是~

呵呵。不说我还没发现。果然是,还是同样一直猫。。。

可是我还是想搞定我这个问题。。。


0
龙城肥将
楼主,请问我用netty写的服务端但是,用tcp工具发送消息服务端无法收取,但是用netty写的客户端发送数据就可以收到,这是为什么?
0
坤艮

楼主,请问你用的tcp工具叫什么?看起来很好用的样子!

0
坤艮

楼主,请问你用的tcp工具叫什么?

0
小乞丐
小乞丐

引用来自“坤艮”的评论

楼主,请问你用的tcp工具叫什么?

TCP&UDP测试工具 ,百度下很多。具体用的哪一个我给忘记了。~

返回顶部
顶部