我使用Netty,和MessagePack进行编解码,由客户端成功发送的user对象,服务端没有感知到。
但如果客户端发送字符串,服务端正常接收。
问题:服务端无法感知客户端发送的复杂对象
使用的jar信息如下:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha1</version>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.6</version>
</dependency>
详细代码及日志如下:
服务端代码如下:
package com.liding.netty.xuliehua.msgpack;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
public class EchoServer {
public void bind(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 102400)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
arg0.pipeline().addLast("frameDecoder",new LengthFieldBasedFrameDecoder(65535, 0, 2,0,2));
arg0.pipeline().addLast("msgpack decoder",new MsgpackDecoder());
arg0.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));
arg0.pipeline().addLast("msgpack encoder", new MsgpackEncoder());
arg0.pipeline().addLast(new EchoServerHandler());
}
});
ChannelFuture future = b.bind(port).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
try {
new EchoServer().bind(20001);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
package com.liding.netty.xuliehua.msgpack;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class EchoServerHandler extends ChannelHandlerAdapter {
private int count;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println(".....");
System.out.println("接到客户端信息 :" + msg+",counter :"+ ++count);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.channelReadComplete(ctx);
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
// TODO Auto-generated method stub
super.exceptionCaught(ctx, cause);
ctx.close();
System.out.println("异常。。。关闭连接 ");
}
}
客户端代码如下:
package com.liding.netty.xuliehua.msgpack;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
public class EchoClient {
public void connect(String post,int port) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
arg0.pipeline().addLast("frameDecoder",new LengthFieldBasedFrameDecoder(65535, 0, 2,0,2));
arg0.pipeline().addLast("msgpack decoder",new MsgpackDecoder());
arg0.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));
arg0.pipeline().addLast("msgpack encoder", new MsgpackEncoder());
arg0.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture future = b.connect(post, port).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) {
try {
new EchoClient().connect("127.0.0.1",20001);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
package com.liding.netty.xuliehua.msgpack;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class EchoClientHandler extends ChannelHandlerAdapter{
private int count;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
UserInfo[] infos = UserInfo();
for(UserInfo info : infos) {
ctx.writeAndFlush(info);
System.out.println("client write :" + info);
}
ctx.write("abc");
ctx.flush();
}
private UserInfo[] UserInfo() {
int n = 10;
UserInfo[] infos = new UserInfo[n];
UserInfo info = null;
for(int i=0;i<n;i++) {
info = new UserInfo();
info.buildUserId(i).buildUserName("name>abc> "+i);
infos[i] = info;
}
System.out.println(infos[5]);
return infos;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println(" Client receive the msgpack message :" + msg+",couter:"+ ++count);
// ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
// TODO Auto-generated method stub
super.exceptionCaught(ctx, cause);
ctx.close();
}
}
编码解码代码如下:
package com.liding.netty.xuliehua.msgpack;
import org.msgpack.MessagePack;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class MsgpackEncoder extends MessageToByteEncoder<Object>{
@Override
protected void encode(ChannelHandlerContext arg0, Object arg1, ByteBuf arg2)
throws Exception {
MessagePack msgpack = new MessagePack();
byte[] raw = msgpack.write(arg1);
arg2.writeBytes(raw);
}
}
package com.liding.netty.xuliehua.msgpack;
import java.util.List;
import org.msgpack.MessagePack;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf>{
@Override
protected void decode(ChannelHandlerContext arg0, ByteBuf arg1,
List<Object> arg2) throws Exception {
final byte[] array;
final int length = arg1.readableBytes();
array = new byte[length];
arg1.getBytes(arg1.readerIndex(), array, 0, length);
MessagePack msgpack = new MessagePack();
Object o = msgpack.read(array);
arg2.add(o);
System.out.println("解码:"+o);
}
}
user类:
package com.liding.netty.xuliehua.msgpack;
public class UserInfo{
private String userName;
private int userId;
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public int getUserId() {
return userId;
}
public void setUserId(int userId) {
this.userId = userId;
}
public UserInfo buildUserName(String userName) {
this.userName = userName;
return this;
}
public UserInfo buildUserId(int userId) {
this.userId = userId;
return this;
}
@Override
public String toString() {
return "UserInfo [userName=" + userName + ", userId=" + userId + "]";
}
}
客户端发送了 10个user对象一个字符串:
class UserInfo implements Serializable
应该是要实现序列化
这个我自己搞定啦,谢谢大家。
原因:
1)客户端messagepack解码器需要传入对象泛形;
2)需要在MessagePack实例中注册自定义的对象;
so ga so ga~~~~ 困扰我几天的问题解决了~~~