NETTY在大并发的时候出现解码问题

烈日下的追逐 发布于 2013/11/05 10:12
阅读 4K+
收藏 2
用netty写的一个网关服务器的验证,在并发数比较大的时候,就会出现解码的问题,各位帮忙看看,好几天了,一直没解决。

(1)每隔50ms生成一个客户端进行压测,客户端每隔1s发送200bytes的数据,服务器收到后,将其加入到队列中。

(2)另开一个线程处理队列,目前是简单的将其发给客户端。

(3)客户端收到服务器的消息后,也是将其加入到队列中,由另外一个线程进行处理,将消息发送时间和接收时间记录到文件中。

协议:
包体长度+包体
包体使用的是一个map,数据格式定义:
* 字段1键名长度    字段1键名 字段1值长度    字段1值
* 字段2键名长度    字段2键名 字段2值长度    字段2值

* 字段3键名长度    字段3键名 字段3值长度    字段3值

public class XLRequest {
   private int length;	// 数据包长
  
   private Map<String,String> datas=new HashMap<String, String>(); //参数

}
编码:
public class MessageEncoder extends OneToOneEncoder  {
	
	public void encodeDataBuffer(Map<String, String> values, ChannelBuffer totalBuffer) throws Exception {
		if (values != null && values.size() > 0) {
			for (Entry<String, String> entry : values.entrySet()) {
				String key = entry.getKey();
				String value = entry.getValue();
				totalBuffer.writeInt(key.getBytes().length);
				totalBuffer.writeBytes(key.getBytes());
				totalBuffer.writeInt(value.getBytes().length);
				totalBuffer.writeBytes(value.getBytes());
			}
		}
	}
	
	public int getDataLength(Map<String, String> values){
		int dataLength = 0;
		for (Entry<String, String> entry : values.entrySet()) {
			String key = entry.getKey();
			String value = entry.getValue();
			dataLength = dataLength + 4 + key.getBytes().length + 4
					+ value.getBytes().length;
		}
		return dataLength;
	}

	@Override
	protected Object encode(ChannelHandlerContext ctx, Channel channel,
			Object msg) throws Exception {
		XLRequest request = (XLRequest) msg;
		ChannelBuffer totalBuffer = ChannelBuffers.dynamicBuffer();
		
		Map<String, String> datas = request.getDatas();
		if(datas==null || datas.isEmpty()){
			throw new Exception("内容为空!");
		}
		totalBuffer.writeInt(getDataLength(datas));
		
		encodeDataBuffer(datas, totalBuffer);
		return totalBuffer;
	}
}

解码

public class MessageDecoder extends FrameDecoder {
	public static final Logger logger = LoggerFactory.getLogger(MessageDecoder.class);
	
	@Override
	protected Object decode(ChannelHandlerContext ctx, Channel channel,
			ChannelBuffer buffer) throws Exception {
		
		if (buffer.readableBytes() < 4) { 
			return null;
		}
		buffer.markReaderIndex();
		int totalLength = buffer.readInt(); // 数据包长
		
		if (buffer.readableBytes() < totalLength) {
			buffer.resetReaderIndex();
			return null;
		}
		ChannelBuffer dataBuffer = ChannelBuffers.buffer(totalLength);
		buffer.readBytes(dataBuffer, totalLength);
		
		XLRequest request = new XLRequest();
		request.setLength(totalLength);
		request.setDatas(decodeDataBuffer(dataBuffer));
		return request;
	}
	
	public Map<String, String> decodeDataBuffer(ChannelBuffer dataBuffer ) throws IOException, Exception {
		Map<String, String> dataMap = new HashMap<String, String>();
		
		if (dataBuffer != null && dataBuffer.readableBytes() > 0) {
			int processIndex = 0;
			int totalLength = dataBuffer.readableBytes();
			while (processIndex < totalLength) {
				/**
				 * 获取Key
				 */
				int size = dataBuffer.readInt();
				byte[] contents = new byte[size];
				dataBuffer.readBytes(contents);
				String key = new String(contents);
				processIndex = processIndex + size + 4;
				
				/**
				 * 获取Value
				 */
			
				size = dataBuffer.readInt();
				contents = new byte[size];
				dataBuffer.readBytes(contents);
				String value = new String(contents);
				
				dataMap.put(key, value);
				processIndex = processIndex + size + 4;
			}
		}
		return dataMap;
	}
}
ClientFactory:

public class ClientFactory implements ChannelPipelineFactory {

	private MessageDecoder messageDecoder = new MessageDecoder();
	private MessageEncoder messageEncoder = new MessageEncoder();
	private ClientHandler clientHandler = new ClientHandler();	
	
	ExecutionHandler executionHandler = new ExecutionHandler( 
			new OrderedMemoryAwareThreadPoolExecutor(
					SysInfo.getProcessors()*ConfigUtil.GATEWAY_PROCESS, 
					ConfigUtil.MAX_CHANNEL_MEMORY_SIZE, 
					0)
			);
	public ChannelPipeline getPipeline() throws Exception {

		ChannelPipeline cp = Channels.pipeline();
		cp.addLast("messageClientDecoder", messageDecoder);
		cp.addLast("messageClientEncoder", messageEncoder);
		cp.addLast("executionHandler", executionHandler);
		cp.addLast("clientHandler", clientHandler);
		return cp;
	}
}
ClientHandler:

public XLRequest createSendData(Integer channelId) {
		XLRequest request = new XLRequest();
		if(firstCreated){
			msg2_1024bytes = formResponseMessage(2);
			msg5_1024bytes = formResponseMessage(5);
			msg10_1024bytes = formResponseMessage(10);
			msg50_1024bytes = formResponseMessage(50);
			firstCreated = false;
		}
		String msg = getMessage(ConfigUtil.SEND_BYTES);
		request.setValue("msg", msg);
		request.setValue("sid", lastSid.getAndIncrement()+"");
		request.setValue("time", System.currentTimeMillis()+"");
		return request;
	}
	
	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent)
			throws Exception {
		XLRequest request = (XLRequest) messageEvent.getMessage();
		
		request.setValue("recvTime", System.currentTimeMillis()+"");
		messageManager.addSocketMessage(messageManager.handleMessage(CommandType.CLIENT_REQUEST, 
				ctx.getChannel(), request));
	}
	

	@Override
	public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
			throws Exception {
		Channel channel = ctx.getChannel();
		if (channel != null) {
			// send data
			XLRequest clientRequest = createSendData(channel.getId());
			ClientSenderThread senderThread = new ClientSenderThread(channel, clientRequest);
			Thread thread = new Thread(senderThread);
			thread.start();
		}
		super.channelConnected(ctx, e);
	}
Serverhandler:

@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent)
			throws Exception {
		Channel sendChannel = ctx.getChannel();
		doProcessFromClient(ctx, messageEvent, sendChannel);
	}
	private void doProcessFromClient(ChannelHandlerContext ctx,
			MessageEvent messageEvent, Channel sendChannel) {
		XLRequest request = (XLRequest)messageEvent.getMessage();
		byte command = CommandType.GATE_TO_CLIENT_NORMAL;
		messageManager.addSocketMessage(messageManager.handleMessage(command, ctx.getChannel(), request));
	}
消息队列

public class MessageManager {
	
	public static final Logger logger = LoggerFactory.getLogger(MessageManager.class);

	/** MessageManager的消息队列,下面的addSocketMessage方法就是向这个队列塞MessagePack的*/
    private LinkedBlockingQueue<MessagePack> receivedQueen = new LinkedBlockingQueue<MessagePack>();
    private ExecutorService pool;
    
    
    /** 登记式单例类 */
    private static AtomicInteger instanceIndex = new AtomicInteger(0);
    private static Map<String, MessageManager> instanceMap = new HashMap<String, MessageManager>();
    private String instanceName = "";
    public static final String INSTANCE_CLIENT_TO_GATE = "client_to_gate";
    public static final String INSTANCE_GATE_TO_CLIENT = "gate_to_client";
    private MessageManager(String name){
    	this.instanceName = name;
    }
    public String getInstanceName() {
		return instanceName;
	}
	public static MessageManager getInstance(String name){
    	if(StringUtils.isBlank(name)){
    		name = MessageManager.class.getName().concat(instanceIndex.get()+"");
    		instanceIndex.getAndIncrement();
    	}
    	
    	synchronized (instanceMap) {
    		MessageManager instance = instanceMap.get(name);
        	if(instance==null){
        		instance = new MessageManager(name);
        		instanceMap.put(name, instance);
        	}
        	return instance;
		}
    }
	

    public MessagePack handleMessage(byte command, Channel sendChannel, XLRequest request) {
        MessagePack messagePack = null;

        switch (command) {
        case CommandType.GATE_TO_CLIENT_NORMAL: 
        	messagePack = new GateMessagePack(command, sendChannel, request);
        	break;
        case CommandType.CLIENT_REQUEST:
        	messagePack = new ClientMessagePack(command, sendChannel, request);
        }
        return messagePack;
    }

    public void start() {
    	pool = Executors.newFixedThreadPool(SysInfo.getProcessors()*2);
        for(int i=0;i<SysInfo.getProcessors();i++){
        	 pool.execute(new PushRecvThread());
        }
    }

	private class PushRecvThread implements Runnable {

		public void run() {
			while (true) {
				// 从队列中取继承MessagePack的实例
				MessagePack message;
				try {
					message = receivedQueen.poll(1, TimeUnit.MILLISECONDS);
					if (message != null) {
						message.onHandler();
					}
				} catch (InterruptedException e) {
					logger.error(e.getMessage(), e);
				}
			}
		}
	}
	public void addSocketMessage(MessagePack message) {
		if (message != null) {
			try {
				boolean success = receivedQueen.offer(message, 1,TimeUnit.MILLISECONDS);
			} catch (InterruptedException e) {
				logger.error(e.getMessage(), e);
			}
		}
	}
}
代码比较多,源代码我上传到百度网盘了,地址: http://pan.baidu.com/s/17Vbyg

7k用户每隔1s发送200byte数据的时候,就会出现如下的错误,而且还有个很奇怪的问题,在这7k用户建立连接的过程中,不会出现解码问题,全部建立连接后不久,就会出现解码问题。不知道是不是我写的代码有问题,各位大牛帮忙看看吧。

java.lang.IndexOutOfBoundsException: Not enough readable bytes - Need 4, maximum is 3
	at org.jboss.netty.buffer.AbstractChannelBuffer.checkReadableBytes(AbstractChannelBuffer.java:661)
	at org.jboss.netty.buffer.AbstractChannelBuffer.readInt(AbstractChannelBuffer.java:273)
	at com.zltx.game.sky.net.verify.MessageDecoder.decodeDataBuffer(MessageDecoder.java:51)
	at com.zltx.game.sky.net.verify.MessageDecoder.decode(MessageDecoder.java:37)
	at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:425)
	at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:310)
	at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
	at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
	at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
	at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
	at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
	at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
	at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)


加载中
0
huzorro
huzorro

错误提示的很明显 需要读取4字节 最大只有3字节可用 这种问题不是encoder有问题就是decoder有问题 至于为啥建立连接的时候不报错 

1是建立连接的时候还没开始发包

 2.如果你定义连接用的认证包 那就说明认证包的编解码没问题 

另外建议你协议采用包头加包体的方式定义 包体内对于字段长度的描述只有变长字段才加长度 定长的约定好就可以了

0
烈日下的追逐
烈日下的追逐

引用来自“huzorro”的答案

错误提示的很明显 需要读取4字节 最大只有3字节可用 这种问题不是encoder有问题就是decoder有问题 至于为啥建立连接的时候不报错 

1是建立连接的时候还没开始发包

 2.如果你定义连接用的认证包 那就说明认证包的编解码没问题 

另外建议你协议采用包头加包体的方式定义 包体内对于字段长度的描述只有变长字段才加长度 定长的约定好就可以了

每个客户端连接成功后就开始发包的
烈日下的追逐
烈日下的追逐
@huzorro 随机出现的,也不是说就是第二个,就是跑着跑着就出现解码出错的问题了
huzorro
huzorro
那就是所有客户端的第一个包不报错呗 第二个就开报错?
0
烈日下的追逐
烈日下的追逐

引用来自“huzorro”的答案

错误提示的很明显 需要读取4字节 最大只有3字节可用 这种问题不是encoder有问题就是decoder有问题 至于为啥建立连接的时候不报错 

1是建立连接的时候还没开始发包

 2.如果你定义连接用的认证包 那就说明认证包的编解码没问题 

另外建议你协议采用包头加包体的方式定义 包体内对于字段长度的描述只有变长字段才加长度 定长的约定好就可以了

我也知道错误提示是encoder或者decoder的时候出现问题了,但是我仔细看了好几遍,也没发现有什么问题,大神你帮忙看看解码和编码部分的代码,能看出什么道道来不?
烈日下的追逐
烈日下的追逐
@huzorro 加了的呀, totalBuffer.writeInt(getDataLength(datas));
huzorro
huzorro
貌似你包体长度没有加上包体长度本身的4个字节长度
0
星爷
星爷
编解码搞得太复杂了。自己都把自己绕晕了。直接传一个字符串,也用不着搞得这么累吧!
0
烈日下的追逐
烈日下的追逐

引用来自“huaye2007”的答案

编解码搞得太复杂了。自己都把自己绕晕了。直接传一个字符串,也用不着搞得这么累吧!
我要记录每个客户端的延迟时间的,所以用了一个map
0
cwledit
cwledit

netty的版本是什么?


烈日下的追逐
烈日下的追逐
3.7.0.Final
0
温柔一刀
温柔一刀
使用netty的典型错误,不可共享的decoder在多个pipeline中共享了。凡是不可共享(没有sharable标签的,包括父类,FrameDecoder就是典型 )的decoder,必须每次创建一个新的,不能在多个pipeline中共享
我是热门
我是热门
能请教下为啥decoder不能共享么?
0
烈日下的追逐
烈日下的追逐

引用来自“温柔一刀”的答案

使用netty的典型错误,不可共享的decoder在多个pipeline中共享了。凡是不可共享(没有sharable标签的,包括父类,FrameDecoder就是典型 )的decoder,必须每次创建一个新的,不能在多个pipeline中共享

大神说的是这里吗?


15         ChannelPipeline cp = Channels.pipeline();
16         cp.addLast("messageClientDecoder", messageDecoder);
17         cp.addLast("messageClientEncoder", messageEncoder);
18         cp.addLast("executionHandler", executionHandler);
19         cp.addLast("clientHandler", clientHandler);

0
烈日下的追逐
烈日下的追逐

引用来自“温柔一刀”的答案

使用netty的典型错误,不可共享的decoder在多个pipeline中共享了。凡是不可共享(没有sharable标签的,包括父类,FrameDecoder就是典型 )的decoder,必须每次创建一个新的,不能在多个pipeline中共享

应该是刀哥说的这个原因,messageDecoder每次new一个出来,现在还没出现问题,等在跑24h看看,后面我在结贴。

在此,谢谢各位大牛们的帮助,真心太感谢你们了,郁闷了差不多一个星期了。

public ChannelPipeline getPipeline() throws Exception {

        ChannelPipeline cp = Channels.pipeline();
        cp.addLast("messageClientDecoder", new MessageDecoder());
        cp.addLast("messageClientEncoder", messageEncoder);
        cp.addLast("executionHandler", executionHandler);
        cp.addLast("clientHandler", clientHandler);
        return cp;
    }

0
温柔一刀
温柔一刀
别忘了下面那句encoder,如果不是sharable,也要new
烈日下的追逐
烈日下的追逐
恩 结果还真是这个问题,现在程序运行正常,没有出现解包的问题了
返回顶部
顶部