测试 netty 多个客户端,轮询发送消息,容易产生异常

过把瘾 发布于 2011/08/30 16:19
阅读 14K+
收藏 3

1、我使用netty 3.2.4 final 版本;

2、使用LengthFieldBasedFrameDecoder解码,LengthFieldPrepender编码;

3、单个客户端不停的发送消息没有任何问题,建立两个客户端不停发送消息时,总是出现解码服务端解码异常。 不知道什么地方使用错误?

4、window xp, jdk1.6.0_26上运行,-Xms1024m -Xmx1024m -XX:PermSize=256M -XX:MaxPermSize=256M

5、以下是我写的测试代码:

 

package org.jboss.netty.example.length;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;

/**
 * 服务器代码
 */

public class MessageServer {

	public static void main(String[] args) {

		ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), 8);
		ServerBootstrap bootstrap = new ServerBootstrap(factory);

		MessageFactory fac = new MessageFactory();

		bootstrap.setPipelineFactory(fac);
		bootstrap.setOption("child.tcpNoDelay", true);
		bootstrap.setOption("child.keepAlive", true);

		bootstrap.bind(new InetSocketAddress(9999));
	}
}

 

package org.jboss.netty.example.length;

import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.logging.LoggingHandler;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.logging.Log4JLoggerFactory;

public class MessageFactory implements ChannelPipelineFactory {

	/**
	 * 消息解码器
	 */
	private ChannelHandler messageDecoder = new MessageDecoder(Integer.MAX_VALUE, 0, 4, 0, 4);
	
	/**
	 * 消息编码器
	 */
	private ChannelHandler messageEncoder = new MessageEncoder(4);
	 
	/**
	 * 消息分发的字节流处理器
	 */
	private MessageHandler messageHandler = new MessageHandler();		

	public ChannelPipeline getPipeline() throws Exception {

		ChannelPipeline cp = Channels.pipeline();
		return addMessageHandlersTo(cp);
	}
	
	/**
	 * 给ChannelPipeline对象增加消息协议的字节流处理器集合
	 * @param cp
	 * @return
	 */
	public ChannelPipeline addMessageHandlersTo(ChannelPipeline cp) {
		
		InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory());
		cp.addLast("logging", new LoggingHandler());
		
		cp.addLast("messageDecoder", messageDecoder);
		cp.addLast("messageEncoder", messageEncoder);
		
		cp.addLast("messageHandler", messageHandler);
		
		return cp;
	}
	
}

 

package org.jboss.netty.example.length;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;

/**
 * 消息编码器
 * 
 */
public class MessageEncoder extends LengthFieldPrepender {

	public MessageEncoder(int lengthFieldLength) {
		super(lengthFieldLength);
	}

	@Override
	protected Object encode(ChannelHandlerContext ctx, Channel channel,
			Object obj) throws Exception {

		ChannelBuffer ob = ChannelBuffers.dynamicBuffer(channel.getConfig().getBufferFactory());
		
		String message = (String)obj;
		
		byte[] strBytes = message.getBytes();
		ob.writeInt(strBytes.length);
		ob.writeBytes(strBytes);
		
		return super.encode(ctx, channel, ob);
	}

}

 

package org.jboss.netty.example.length;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;

/**
 * 消息解码器
 * 
 */
public class MessageDecoder extends LengthFieldBasedFrameDecoder {

	public MessageDecoder(int maxFrameLength, int lengthFieldOffset,
			int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {

		super(maxFrameLength, lengthFieldOffset, lengthFieldLength,
				lengthAdjustment, initialBytesToStrip);
	}

	@Override
	protected Object decode(ChannelHandlerContext ctx, Channel channel,
			ChannelBuffer buffer) throws Exception {

		ChannelBuffer frame = (ChannelBuffer) super.decode(ctx, channel, buffer);
		if (frame == null) {
			return null;
		}
		
		byte[] bytes = new byte[frame.readInt()];
		frame.readBytes(bytes);
		
		String content = new String(bytes);
		return content;
	}

	@Override
	protected Object decodeLast(ChannelHandlerContext ctx, Channel channel,
			ChannelBuffer buffer) throws Exception {

		return this.decode(ctx, channel, buffer);
	}

}

 

package org.jboss.netty.example.length;

import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;

/**
 * 消息处理类
 */
public class MessageHandler extends SimpleChannelHandler {
	
	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
			throws Exception {
		
		System.err.println("e.getMessage():"+e.getMessage());
		
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
			throws Exception {

		 super.exceptionCaught(ctx, e);
	}
	
}

 

package org.jboss.netty.example.length;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;

/**
 * 
 * 客户端代码
 */

public class MessageClient {

	public static List<Channel> channelList = new ArrayList<Channel>();
	
	public static ChannelFactory factory = null;
	
	static  {
		
		factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), 8);
		ClientBootstrap bootstrap = new ClientBootstrap(factory);

		MessageFactory fac = new MessageFactory();

		bootstrap.setPipelineFactory(fac);

		bootstrap.setOption("tcpNoDelay", true);
		bootstrap.setOption("keepAlive", true);
		
		int clientCount = 2;
		
		for (int i=0; i<clientCount; i++) {
			ChannelFuture cf = bootstrap.connect(new InetSocketAddress("localhost", 9999));
			channelList.add(cf.getChannel());
		}
	}
		
	public static void main(String [] args) {
		
		for (int i=0; i<1000; i++) {
			channelList.get(0).write("client1 "+i);
			channelList.get(1).write("client2 "+i);
		}
		
	}
}

以下是问题补充:

@过把瘾:以下是异常内容 2011-8-30 15:43:46 org.jboss.netty.channel.SimpleChannelHandler 警告: EXCEPTION, please implement org.jboss.netty.example.length.MessageHandler.exceptionCaught() for proper handling. java.lang.IndexOutOfBoundsException at org.jboss.netty.buffer.AbstractChannelBuffer.checkReadableBytes(AbstractChannelBuffer.java:657) at org.jboss.netty.buffer.AbstractChannelBuffer.readInt(AbstractChannelBuffer.java:272) at org.jboss.netty.example.length.MessageDecoder.decode(MessageDecoder.java:35) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:282) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:214) at org.jboss.netty.handler.logging.LoggingHandler.handleUpstream(LoggingHandler.java:231) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:349) at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:280) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:200) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) (2011/08/30 16:34)
加载中
0
过把瘾

引用来自“JavaGG”的答案

我也用netty的,也用了lengthdecoder了,但没有这个问题,好好的
private ChannelHandler messageDecoder = new MessageDecoder(Integer.MAX_VALUE, 0, 4, 0, 4);

找到出错原因了,这句使用使用上有问题,当messageDecoder 作为factory 的属性传给添加到ChannelPipeline使用时候,就会出现问题,具体什么原因还不清楚,需要看看源码。呵呵! 你知道具体为什么吗?

0
过把瘾
没人遇到这样的问题吗?江湖急救啊!自己顶一下。
0
JavaGG
JavaGG
writeUtf 不是用short表示长度的么??用int的?
0
过把瘾

引用来自“JavaGG”的答案

writeUtf 不是用short表示长度的么??用int的?

您好,谢谢您的回答,我只是做一个简单的测试,使用LengthFieldBasedFrameDecoder解码,LengthFieldPrepender编码;发送一个字符串(包括字符串的长度),如果建立多个客户端连接服务器,然后循环发送消息给服务器端,很容易产生异常,不知道大家是怎么解决这个问题的。

 

0
JavaGG
JavaGG
我也用netty的,也用了lengthdecoder了,但没有这个问题,好好的
0
叶春
叶春
应该是messagedecoder 应该是有状态的,需要单独一个channel设置一个,不能用同一个,要new出来一个新的
0
haiju_123
haiju_123
我能知道你是怎么创建出多个client端的吗
返回顶部
顶部