8
回答
Netty 4.x怎么处理多个protobuf协议?
利用AWS快速构建适用于生产的无服务器应用程序,免费试用12个月>>>   


protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline p = ch.pipeline();
      //解码用  
        p.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());  
        //构造函数传递要解码成的类型  这里如果再有一个LocalDates的话应该怎么处理
        p.addLast("protobufDecoder", new ProtobufDecoder(LocalTimeProtocol.LocalTimes.getDefaultInstance()));  
 //编码用  
        p.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());  
        p.addLast("protobufEncoder", new ProtobufEncoder());  
 //业务逻辑用  这里如果再有一个LocalDateClientHandler的话应该怎么处理
        p.addLast("handler", new LocalTimeClientHandler());  
    }


希望给出的方案不是


if (arg1 instanceof Localtime){}
else if (arg1 instanceof LocalDate){}
.
.
.

因为实体太多!目前的想法时根据接收过来的arg1定义一个ID,根据ID调用相应的ProtobufDecoder;下面的Handler也一样。但是由于刚接触Netty不熟,所以特来求助。

查找方案中有看到一遍相似的文章:http://www.cnblogs.com/Solstice/archive/2011/04/03/2004458.html
想法可能不成熟,希望各位踢教!
麻烦各位最好给出Demo或者有类似实现的开源项目地址的,多谢!


举报
_beaman
发帖于3年前 8回/4K+阅

参考官方的例子:PortUnificationServerHandler

简单的说就是在每个包的第一个字符写一个特殊的char,通过这个字符来动态加载不同的handler,这样就可以在一个端口实现多个协议的解析。不过本人不建议这么做。

public class PortUnificationServerHandler extends ByteToMessageDecoder {



    private final SslContext sslCtx; 
    private final boolean detectSsl; 
    private final boolean detectGzip; 


    public PortUnificationServerHandler(SslContext sslCtx) { 
        this(sslCtx, true, true); 
    } 


    private PortUnificationServerHandler(SslContext sslCtx, boolean detectSsl, boolean detectGzip) { 
        this.sslCtx = sslCtx; 
        this.detectSsl = detectSsl; 
        this.detectGzip = detectGzip; 
    } 


    @Override 
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { 
        // Will use the first five bytes to detect a protocol. 
        if (in.readableBytes() < 5) { 
            return; 
        } 


        if (isSsl(in)) { 
            enableSsl(ctx); 
        } else { 
            final int magic1 = in.getUnsignedByte(in.readerIndex()); 
            final int magic2 = in.getUnsignedByte(in.readerIndex() + 1); 
            if (isGzip(magic1, magic2)) { 
                enableGzip(ctx); 
            } else if (isHttp(magic1, magic2)) { 
                switchToHttp(ctx); 
            } else if (isFactorial(magic1)) { 
                switchToFactorial(ctx); 
            } else { 
                // Unknown protocol; discard everything and close the connection. 
                in.clear(); 
                ctx.close(); 
            } 
        } 
    } 


    private boolean isSsl(ByteBuf buf) { 
        if (detectSsl) { 
            return SslHandler.isEncrypted(buf); 
        } 
        return false; 
    } 


    private boolean isGzip(int magic1, int magic2) { 
        if (detectGzip) { 
            return magic1 == 31 && magic2 == 139; 
        } 
        return false; 
    } 


    private static boolean isHttp(int magic1, int magic2) { 
        return 
            magic1 == 'G' && magic2 == 'E' || // GET 
            magic1 == 'P' && magic2 == 'O' || // POST 
            magic1 == 'P' && magic2 == 'U' || // PUT 
            magic1 == 'H' && magic2 == 'E' || // HEAD 
            magic1 == 'O' && magic2 == 'P' || // OPTIONS 
            magic1 == 'P' && magic2 == 'A' || // PATCH 
            magic1 == 'D' && magic2 == 'E' || // DELETE 
            magic1 == 'T' && magic2 == 'R' || // TRACE 
            magic1 == 'C' && magic2 == 'O';   // CONNECT 
    } 


    private static boolean isFactorial(int magic1) { 
        return magic1 == 'F'; 
    } 


    private void enableSsl(ChannelHandlerContext ctx) { 
        ChannelPipeline p = ctx.pipeline(); 
        p.addLast("ssl", sslCtx.newHandler(ctx.alloc())); 
        p.addLast("unificationA", new PortUnificationServerHandler(sslCtx, false, detectGzip)); 
        p.remove(this); 
    } 


    private void enableGzip(ChannelHandlerContext ctx) { 
        ChannelPipeline p = ctx.pipeline(); 
        p.addLast("gzipdeflater", ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP)); 
        p.addLast("gzipinflater", ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP)); 
        p.addLast("unificationB", new PortUnificationServerHandler(sslCtx, detectSsl, false)); 
        p.remove(this); 
    } 


    private void switchToHttp(ChannelHandlerContext ctx) { 
        ChannelPipeline p = ctx.pipeline(); 
        p.addLast("decoder", new HttpRequestDecoder()); 
        p.addLast("encoder", new HttpResponseEncoder()); 
        p.addLast("deflater", new HttpContentCompressor()); 
        p.addLast("handler", new HttpSnoopServerHandler()); 
        p.remove(this); 
    } 


    private void switchToFactorial(ChannelHandlerContext ctx) { 
        ChannelPipeline p = ctx.pipeline(); 
        p.addLast("decoder", new BigIntegerDecoder()); 
        p.addLast("encoder", new NumberEncoder()); 
        p.addLast("handler", new FactorialServerHandler()); 
        p.remove(this); 
    } 
} 



--- 共有 2 条评论 ---
wonderland911 这个办法 在多线程操作同一个channelpipeline时会出问题吧。。 3年前 回复
_beaman多谢Grrr哥!多嘴问一句,Grrr哥推荐的方式是? 3年前 回复
官方有例子的。写一个protocol identify handler,当消息来的时候加载不同的handler.
--- 共有 1 条评论 ---
_beaman能不能麻烦贴下相关的地址!谢谢! 3年前 回复

二楼的项目Maven构建了半天失败了!

大概是问题太简单了,大家都不屑回答!求指点!

--- 共有 5 条评论 ---
_beaman回复 @石头哥哥 : 看了好久,没弄清楚你的ID是怎么取到的? 3年前 回复
石头哥哥回复 @_beaman : 弄清楚了没有? 3年前 回复
_beaman回复 @石头哥哥 : 看了您的框架,多谢! 3年前 回复
_beaman回复 @石头哥哥 : 先谢谢石头哥哥的回答。我想是我的问题没有说清楚,我的问题,麻烦石头哥哥看楼下的问题,谢谢! 3年前 回复
石头哥哥每一个pb就是一个请求相应的对。把pb消息用数组messgelite封装起来,消息id作为数据下标。在程序初始化的时候实例化pb实体,netty解析出ID,直接索引对应的PB消息实体,对应的方法强转对应的pb消息知道MVC中的controller不 ?大概就是这样的思路 3年前 回复

可能我把问题想复杂了


原问题中initChannel方法的第6行和第11行

//构造函数传递要解码成的类型  这里如果再有一个LocalDates的话应该怎么处理
6        p.addLast("protobufDecoder", new ProtobufDecoder(LocalTimeProtocol.LocalTimes.getDefaultInstance())); 
 //业务逻辑用  这里如果再有一个LocalDateClientHandler的话应该怎么处理
11        p.addLast("handler", new LocalTimeClientHandler());

这里第六行的处理我是想如果要在解码消息时选择相应的编码,就要继承重写一个

public class ProtobufCodecFactory extends ProtobufDecoder{

    public ProtobufCodecFactory(MessageLite prototype) {
        super(prototype);
    }
    
    @Override
    public void channelRead(ChannelHandlerContext arg0, Object arg1) throws Exception {
        super.channelRead(arg0, arg1);
        if(arg1 instanceof GeneratedMessage){

            //这里我不太清楚ID应该怎么取出来,每一个协议的第一个都是 int32 id=1
            //取出来后返回的取出上述MessageLite中的是相应protobuf的实例吗?
        }
    }

}




第11行中的Handler我也是同样的想法?

让大家见笑了,麻烦指教下。纠结了一周多了。谢谢!


遇到同样的问题, 请问楼主用什么方法解决的呀?
--- 共有 3 条评论 ---
_beaman目前发现当ID小于127时通过在Decord方法中ByteBuf msg.getByte(3)可以取到相应的ID,如果>127时返回的是相应的补码(根据观察)。然后第四位也会有值。具体细节还不清楚! 3年前 回复
useraxian回复 @_beaman : 怎么获取ID呀? 3年前 回复
_beaman不好意思!楼主也只能给出大致的方案: 1、官方:https://developers.google.com/protocol-buffers/docs/techniques#union 2、重写解码器、然后根据相应的ID取出相应的实体。(楼主也还在学习中……) 3年前 回复
应该把编码解码和handler实现分开做protocol identify, 而在handler中,可以根据这个protocol identify作转发处理,我们以前在mina中是这样做的,在netty中也可以的
--- 共有 1 条评论 ---
_beaman嗯,谢谢! 3年前 回复
顶部