7
回答
netty 与msgpack 自定义协议问题
【腾讯云】学生服务器套餐10元/月 >>>   

@绝色 你好,想跟你请教个问题:

1 协议:


public class Headers { //目标类  private byte cls; //效验码  private byte msgCrc = 0x01; //全局回话id  private long msgSessionId; //消息长度,body  private int msgLength; //消息类型  private byte msgType; public byte getCls() { return cls;
    } public void setCls(byte cls) { this.cls = cls;
    } public byte getMsgCrc() { return msgCrc;
    } public void setMsgCrc(byte msgCrc) { this.msgCrc = msgCrc;
    } public long getMsgSessionId() { return msgSessionId;
    } public void setMsgSessionId(long msgSessionId) { this.msgSessionId = msgSessionId;
    } public int getMsgLength() { return msgLength;
    } public void setMsgLength(int msgLength) { this.msgLength = msgLength;
    } public byte getMsgType() { return msgType;
    } public void setMsgType(byte msgType) { this.msgType = msgType;
    }

}

public class Messages { //消息头  private Headers headers; //消息体  private Object body; public Object getBody() { return body;
    } public void setBody(Object body) { this.body = body;
    } public Headers getHeaders() { return headers;
    } public void setHeaders(Headers headers) { this.headers = headers;
    }
}

2 解码

public class Decoder extends ByteToMessageDecoder {

    MessagePack messagePack;
    Messages messages;
    Headers headers;
    Class cls; private Logger log = LoggerFactory.getLogger(this.getClass().getName()); @Override  protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { messagePack = new MessagePack(); messages = new Messages(); headers = new Headers(); int lg = in.readInt(); if (in.readableBytes() < lg) {
            in.resetReaderIndex(); return;
        } byte msgCrc = in.readByte(); byte cl = in.readByte(); if (msgCrc != Tools.MSG_CRC) { log.debug("非法协议包");
            in.resetReaderIndex(); return;
        }

        in.markReaderIndex(); headers.setMsgCrc(msgCrc); headers.setCls(cl); headers.setMsgLength(lg); headers.setMsgType(in.readByte()); if (cl == DictClass.loginDO) { cls = DictClass.loginClass;
        } if (cl == DictClass.commandDO) { cls = DictClass.commandClass;
        } if (cl == DictClass.deviceInfoDO) { cls = DictClass.deviceInfoClass;
        } if (cl == DictClass.heartDO) { cls = DictClass.heartClass;
        } final byte[] array; final int length = in.readableBytes(); if (length != 0) {
            array = new byte[length];
            in.getBytes(in.readerIndex(), array, 0, length); log.debug("解码do size:" + array.length); messages.setBody(messagePack.read(array, cls));
        } messages.setHeaders(headers);

        out.add(messages);
    }
}
3 编码

public class Encoder extends MessageToByteEncoder<Messages> {

    MessagePack messagePack; byte[] raw; private Logger log = LoggerFactory.getLogger(this.getClass().getName()); @Override  protected void encode(ChannelHandlerContext ctx, Messages msgs, ByteBuf out) throws Exception { messagePack = new MessagePack(); if (msgs == null || msgs.getHeaders() == null) { throw new Exception("编码内容不能为空!");
        } //序列化body  raw = messagePack.write(msgs.getBody());
        out.writeInt(raw.length);
        out.writeByte(msgs.getHeaders().getMsgCrc());
        out.writeByte(msgs.getHeaders().getCls());
        out.writeByte(msgs.getHeaders().getMsgType()); log.debug("编码do:" + msgs.getBody().toString()); log.debug("编码size:" + raw.length); if (msgs.getBody() != null) {
            out.writeBytes(raw);
        }
    }
}
存在问题:

    多次运行,出现解码出现无法正确解析问题,请教问题出在哪里????


举报
fywname
发帖于3年前 7回/624阅
共有7个答案 最后回答: 2年前

楼主, 你这排版让我看了半天,都有点不相信是Java代码了。使用TCP做网络通信,一定要处理好数据粘包问题,不然你的程序不可能正常运行的,。

处理粘包都差不多, 之前使用MINA做过类似的工作,还是给你贴个地址吧。

http://blog.94fzb.com/post/238。希望对你有帮助

引用来自“wzfz”的评论

楼主, 你这排版让我看了半天,都有点不相信是Java代码了。使用TCP做网络通信,一定要处理好数据粘包问题,不然你的程序不可能正常运行的,。

处理粘包都差不多, 之前使用MINA做过类似的工作,还是给你贴个地址吧。

http://blog.94fzb.com/post/238。希望对你有帮助

不好意思,编辑器有点问题,现在也无法再次编辑排版了。

排版太乱,decoder内逻辑注意几点:

1.你定义的消息头长度应该是固定的,所以解码时先判断readableBytes长度应大于等于消息头长度,然后先解码消息头;

2.接着根据消息头里的msgLength(假设指整个消息,包含消息头的长度)判断,接下来的readableBytes是否大于(msgLength-消息头长度),然后再解码。

你好,下面是排版过的,编码,解码那段代码问题?
编码
public class Encoder extends MessageToByteEncoder<Messages> {

    MessagePack messagePack; byte[] raw
private Logger log = LoggerFactory.getLogger(this.getClass().getName()); 
@Override  
protected void encode(ChannelHandlerContext ctx, Messages msgs, ByteBuf out) throws Exception { 
messagePack = new MessagePack(); 
if (msgs == null || msgs.getHeaders() == null) {
throw new Exception("编码内容不能为空!");
        } 
//序列化body  raw = messagePack.write(msgs.getBody());
        out.writeInt(raw.length);
        out.writeByte(msgs.getHeaders().getMsgCrc());
        out.writeByte(msgs.getHeaders().getCls());
        out.writeByte(msgs.getHeaders().getMsgType()); 
log.debug("编码do:" + msgs.getBody().toString()); 
log.debug("编码size:" + raw.length); 
if (msgs.getBody() != null) {
            out.writeBytes(raw);
        }
    }
}

解码

public class Decoder extends ByteToMessageDecoder {

    MessagePack messagePack;
    Messages messages;
    Headers headers;
    Class cls
private Logger log = LoggerFactory.getLogger(this.getClass().getName()); 
@Override  
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { 
messagePack = new MessagePack(); 
messages = new Messages(); 
headers = new Headers(); 
int lg = in.readInt(); 
byte msgCrc = in.readByte(); 
byte cl = in.readByte(); 
byte msgType = in.readByte(); 
if (msgCrc != Tools.MSG_CRC) { 
log.debug("非法协议包");
in.resetReaderIndex(); 
return;
        } 
if (in.readableBytes() < lg) {
   in.resetReaderIndex(); return;
        }
  in.markReaderIndex(); 
headers.setMsgCrc(msgCrc); 
headers.setCls(cl); 
headers.setMsgLength(lg); 
headers.setMsgType(msgType); 
if (cl == DictClass.loginDO) { 
cls = DictClass.loginClass;
  
if (cl == DictClass.commandDO) { 
cls = DictClass.commandClass;
if (cl == DictClass.deviceInfoDO
{ cls = DictClass.deviceInfoClass;
 } 
if (cl == DictClass.heartDO
{ cls = DictClass.heartClass;
 
final byte[] array; 
final int length = in.readableBytes(); 
if (length != 0) {
array = new byte[length];
 in.getBytes(in.readerIndex(), array, 0, length); 
log.debug("解码do size:" + array.length); 
messages.setBody(messagePack.read(array, cls));
 
messages.setHeaders(headers);
out.add(messages);
 }
}

你不知道 ByteBuf 有个read吗,非要 ByteBuf getBytes(in.readerIndex(), array, 0, length); 另外前面这个方法这里本身调用的意图就是错的

调整如下:

final ByteBuf frame; 
final int length = in.readableBytes(); 
if (length != 0) {
    frame = in.readBytes(length); 
log.debug("解码do size:" + frame.array().length); 
messages.setBody(messagePack.read(frame.array(), cls));
}
已经解决,谢谢!

--- 共有 1 条评论 ---
hibegin楼主排版咋个这样的难的认呀。 final 这个关键字,感觉用的也太泛滥了撒。。。。netty的处理看上去时要mina的处理看上去时要简单一些额。 setBody() 这个方法内部会完成不停的 decode ? 3年前 回复
* 1:枚举类型支持很弱 直接传.name()吧
 *
 * 2:不能有final
 *
 * 3:多参构造器必须有默认构造器
 *
 * 4:消息类及子类加@Message
顶部