netty5 客户端发服务端,但是服务端接收不到的问题,请帮忙看下。

chenlyzen 发布于 2016/05/13 07:31
阅读 2K+
收藏 0

刚开始写nett,遇到点问题,请大神帮忙看看。

用netty5 写了个客户端 向以前一个socket写的tcpserver发消息,启动后链接可以建立,客户也收到了服务端返回的响应消息。但是客户端 发心跳的HeartBeatReqHandler  发送的 消息 服务端 无法收到。

代码如下:

package com.zte.pis.ats.netty.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.zte.pis.ats.netty.NettyConstant;
import com.zte.pis.ats.netty.codec.MessageDecoder;
import com.zte.pis.ats.netty.codec.MessageEncoder;
import com.zte.pis.ats.netty.struct.ATSHeaderRequest;
import com.zte.pis.ats.netty.struct.ATSMessage;
import com.zte.pis.ats.socketutil.NextPos;


/**
 * 
 * @author chenlz
 *
 */
public class NettyClient {

    private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
    EventLoopGroup group = new NioEventLoopGroup();
    ChannelFuture future = null;

    public void connect(int port, String host) throws Exception {

	// 配置客户端NIO线程组
    //利用Netty的ChannelPipeline和ChannelHandler知己,可以非常方便的实现功能解耦和业务产品的定制。
    	//通过Handler Chain的机制可以方便的实现切面拦截和定制,比Aop性能更高
	try {
	    Bootstrap b = new Bootstrap();
	    b.group(group).channel(NioSocketChannel.class)
		    .option(ChannelOption.TCP_NODELAY, true)
		    .handler(new ChannelInitializer<SocketChannel>() {
			@Override
			public void initChannel(SocketChannel ch)
				throws Exception {
				// 增加了NettyMessageDecoder用于Netty消息解码,为了防止由于单挑消息过大导致的内存溢出,并对单条消息最大长度进行了上限限制。
			    ch.pipeline().addLast(new MessageDecoder());
			    //Netty消息 编码器
			    ch.pipeline().addLast("MessageEncoder",new MessageEncoder());
			    //超时Handler
			    ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(5000));
			    //心跳消息Handler
			    ch.pipeline().addLast("HeartBeatHandler",new HeartBeatReqHandler());
			    //响应业务Handler
			    ch.pipeline().addLast("BussinessDealHandler",new BussinessDealReqHandler());
			}
		    });
	    // 发起异步连接操作
	    future = b.connect(
		    new InetSocketAddress(host, port),
		    new InetSocketAddress(NettyConstant.LOCALIP,NettyConstant.LOCAL_PORT)).sync();
	    //成功绑定到端口之后,给channel增加一个 管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程。
	    future.channel().closeFuture().sync();
	} finally {
		//下面是断链 重连 代码
	    // 所有资源释放完成之后,清空资源,再次发起重连操作
	    executor.execute(new Runnable() {
		@Override
		public void run() {
		    try {
			TimeUnit.SECONDS.sleep(1);
			try {
			    connect(NettyConstant.PORT, NettyConstant.REMOTEIP);// 发起重连操作
			} catch (Exception e) {
			    e.printStackTrace();
			}
		    } catch (InterruptedException e) {
			e.printStackTrace();
		    }
		}
	    });
	}
    }
    
    public void sendMsg(ATSMessage message) throws Exception 
    {   
    	if(future!=null)
    	{   
    		//future.channel().writeAndFlush(message).sync();   
    		future.channel().writeAndFlush(message);   
    	}
    	else
    	{   
    		System.out.println("消息发送失败,连接尚未建立!");   
    	}   
    }  

	private ATSMessage buildHeatBeat() {
		ATSMessage message = new ATSMessage();
		ATSHeaderRequest request = new ATSHeaderRequest((short)17, (short)2016, (short)3,
        		(short)3, (short)15, (short)5, (short)55, (short)999,"H");
	    message.setHeader(request);
	    return message;
	}
	
    private ATSMessage buildBussinessReq() {
    	ATSMessage message = new ATSMessage();
        ATSHeaderRequest request = new ATSHeaderRequest((short)100, (short)2016, (short)3,
        		(short)3, (short)15, (short)5, (short)55, (short)999,"B");
        String a = "PLATFORM SIDP Open PID ATime1 DTime1 RID S D L DES\rPID ATime1 DTime1 RID S D L DES\r";
        message.setHeader(request);
        message.setBody(a);
	    return message;
    }
    
    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
    	NettyConstant.LOCAL_PORT = 12098+1;
    	NettyClient nettyClient = new NettyClient();
    	nettyClient.connect(NettyConstant.PORT, NettyConstant.REMOTEIP);
    	
    	System.out.println("xx");
    	/*ATSMessage bussinessReq = nettyClient.buildBussinessReq();
    	nettyClient.sendMsg(bussinessReq);*/
    }

}

package com.zte.pis.ats.netty.client;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import com.zte.pis.ats.netty.struct.ATSHeaderRequest;
import com.zte.pis.ats.netty.struct.ATSMessage;

public class HeartBeatReqHandler extends ChannelHandlerAdapter {

    private volatile ScheduledFuture<?> heartBeat;
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    	ctx.writeAndFlush(buildHeatBeat());
    }
    
	private ATSMessage buildHeatBeat() {
		ATSMessage message = new ATSMessage();
		ATSHeaderRequest request = new ATSHeaderRequest((short)17, (short)2016, (short)3,
        		(short)3, (short)15, (short)5, (short)55, (short)999,"H");
	    message.setHeader(request);
	    return message;
	}

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
	    throws Exception 
	{
		ATSMessage message = (ATSMessage) msg;
		// 握手成功,主动发送心跳消息
		if (message.getHeader() != null && "Z".equals(message.getHeader().getMessageType())) 
		{
			System.out.println("Client receive server hand rsp message : ---> " + message);
			heartBeat = ctx.executor().scheduleAtFixedRate(
				    new HeartBeatReqHandler.HeartBeatTask(ctx), 0, 5000,
				    TimeUnit.MILLISECONDS);
		}
		else if (message.getHeader() != null
			&& "F".equals(message.getHeader().getMessageType())) 
		{
		    System.out.println("Client receive server heart beat message : ---> " + message);
		} 
		else
		    ctx.fireChannelRead(msg);
    }

    private class HeartBeatTask implements Runnable 
    {
		private final ChannelHandlerContext ctx;
	
		public HeartBeatTask(final ChannelHandlerContext ctx) {
		    this.ctx = ctx;
		}
	
		@Override
		public void run() {
			ATSMessage heatBeat = buildHeatBeat();
		    System.out
			    .println("Client send heart beat messsage to server : ---> "
				    + heatBeat);
		    ctx.writeAndFlush(heatBeat);
		}
	
		/*private NettyMessage buildHeatBeat() {
		    NettyMessage message = new NettyMessage();
		    Header header = new Header();
		    header.setType(MessageType.HEARTBEAT_REQ.value());
		    message.setHeader(header);
		    return message;
		}*/
		private ATSMessage buildHeatBeat() {
			ATSMessage message = new ATSMessage();
			ATSHeaderRequest request = new ATSHeaderRequest((short)17, (short)2016, (short)3,
	        		(short)3, (short)15, (short)5, (short)55, (short)999,"H");
		    message.setHeader(request);
		    return message;
		}
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
	    throws Exception {
	cause.printStackTrace();
	if (heartBeat != null) {
	    heartBeat.cancel(true);
	    heartBeat = null;
	}
	ctx.fireExceptionCaught(cause);
    }
}

可运行代码: 

NettyClient 是客户端类
ATSSever 是服务端类

http://pan.baidu.com/s/1hrGK4c8

加载中
0
c
cep

1。我觉得你启动发送心跳任务的时机不太好,你放在了接收到服务器的心跳后。如果服务器不发送,你可能一直不会发送。所以你需要(1)确定一下你是否收到了服务器的心跳,建议用wireshark类似的网络抓包工具看一下,(2)修改启动发送心跳的任务,把它放到登录成功后启动

chenlyzen
chenlyzen
我只是不明白在 HeartBeatReqHandler中 channelRegistered 和channelActive这两个方法 明明 调用了并且 encode了,为什么消息没有发送到服务端。 而在NettyClient4 中的Main 方法中的 却可以发送到服务端。这个是为什么啊?知道原因的话,请赐教。
chenlyzen
chenlyzen
谢谢你的回答,在连接建立后,服务端发送了响应并且客户端收到了,我在断点时已经跟到。 1.是需要抓下,照理没有连接客户是发送不了的,我断点时候客户端的encode 都走了 2.我在直接用nettyclientdesendmsg的方法发送。服务端是可以收到消息的。 不过在handler里用ctx。writeandflush方法发的,服务端却收不到…… 手机打的,可能说的不太清楚。见谅
0
chenlyzen
chenlyzen

我在这段代码里有 就已经 发送 消息去 服务端,不过服务收不到啊。。。

public  class  HeartBeatReqHandlerextendsChannelHandlerAdapter {
 
    privatevolatileScheduledFuture<?> heartBeat;
     
    @Override
    publicvoidchannelActive(ChannelHandlerContext ctx)throwsException {
        ctx.writeAndFlush(buildHeatBeat());
    }

0
chenlyzen
chenlyzen
问题查明,是encode 时候,有事body为空。导致异常。不过 异常 并没有被抛出,不知道为啥。
返回顶部
顶部