咨询一个netty channel的问题

syxfengyi 发布于 03/21 09:49
阅读 75
收藏 1

在学习netty的时候,参考视频敲了一个简单的模拟聊天室的代码,发现ChannelGroup的writeAndFlush()方法并不能通知到客户端,代码如下:
服务端:TestChatServer
package com.chat.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

public class TestChatServer {

    public static void main(String args[]) throws Exception{
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).
                childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();

                        pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
                        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
                        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
                        //添加自定义处理
                        pipeline.addLast(new SimpleChannelInboundHandler<String>(){

                            //读取消息
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
                                Channel channel = ctx.channel();
                                System.out.println("接收到客户端消息:" + msg);
                                TestChannelGroup.channels.forEach(ch->{
                                    if(channel != ch){  //如果不是自己
                                        ch.writeAndFlush(ch.remoteAddress() + "收到消息:" + msg);
                                    }else{
                                        ch.writeAndFlush("[自己]收到消息:" + msg);
                                    }
                                });
                                }

                            //通道添加
                            @Override
                            public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
                                Channel channel = ctx.channel();
                                //通知客户端有新的接入
                                TestChannelGroup.channels.writeAndFlush("有新的用户接入,用户地址:" + channel.remoteAddress());
                                //通知后将channel添加到channelGroup中
                                TestChannelGroup.channels.add(channel);
                                System.out.println("用户:" + channel.remoteAddress() + "加入系统");
                            }

                            //通道活动状态
                            @Override
                            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                Channel channel = ctx.channel();
                                System.out.println("用户:" + channel.remoteAddress() + "上线");
                            }

                            //通道不活动状态
                            @Override
                            public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                                Channel channel = ctx.channel();
                                System.out.println("用户:" + channel.remoteAddress() + "下线");
                            }

                            //通道移除
                            @Override
                            public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
                                Channel channel = ctx.channel();
                                System.out.println("用户:" + channel.remoteAddress() + "退出系统");
                            }

                            @Override
                            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                                cause.printStackTrace();
                            }


                        });


                    }
                });
            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}


客户端代码:
package com.chat.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

import java.io.BufferedReader;
import java.io.InputStreamReader;

public class TestChatClient {

    public static void main(String args[]) throws  Exception{
        EventLoopGroup loopGroup = new NioEventLoopGroup();

        try{
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(loopGroup).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();

                            pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
                            pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
                            pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
                            //添加自定义处理
                            pipeline.addLast(new SimpleChannelInboundHandler<String>() {
                                @Override
                                protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
                                    //接收服务端发送的消息
                                    System.out.println(msg);
                                }
                            });
                        }
                    });
            Channel channel = bootstrap.connect("localhost", 8899).sync().channel();
            BufferedReader br = new BufferedReader(new InputStreamReader(System.in));

            for(;;){
                String input = br.readLine();
                channel.writeAndFlush(input + "\n");
                System.out.println(input);
               }

        }finally{
            loopGroup.shutdownGracefully();
        }
    }
}

TestChannelGroup:
public class TestChannelGroup {

    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}

我在启动两个客户端的时候,服务端可以监听到上线

然后在其中一个客户端输入123发送消息, 服务端也能收到并打印,但理论上来说他执行了TestChannelGroup.channels.forEach(ch->{
                                    if(channel != ch){  //如果不是自己
                                        ch.writeAndFlush(ch.remoteAddress() + "收到消息:" + msg);
                                    }else{
                                        ch.writeAndFlush("[自己]收到消息:" + msg);
                                    }
                                });
这个应该能发送消息到客户端的呢,但客户端并没有收到相关的消息
而且我在启动第二个客户端的时候应该执行了 
//通知客户端有新的接入
TestChannelGroup.channels.writeAndFlush("有新的用户接入,用户地址:" + channel.remoteAddress());
这个方法,那我第一个客户端应该有通知的,不知道为什么啥都没有
代码都是按视频敲的,不知道为啥不生效,无非就是他用的是mac的ide 我的是windows而已
请大神赐教

加载中
0
JackChang
JackChang

writeAndFlush返回也是个 ChannelFuture  ,加个监听器就可以

final ChannelFuture writeFuture =xxxChannel.writeAndFlush("Test");
writeFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        if (writeFuture.isSuccess()) {
            LOGGER.debug("Write successful");
        } else {
            LOGGER.error("Error writing message xxxx");
        }
    }
});

0
s
syxfengyi

自己找到原因了 因为用了DelimiterBasedFrameDecoder  结束符应该加上换行符 所以应该在存在writeAndFlush的地方都需要加上\n或者\r\n 这样就能正常发送了  是自己一开始没看仔细

返回顶部
顶部