mina接收消息:开始符号,和结束符号问题,TextLineCodecFactory默认是/r/n

levis999 发布于 07/30 21:19
阅读 94
收藏 0

mina接收消息:开始符号,和结束符号问题,TextLineCodecFactory默认是/r/n

客户机消息是 STX是开头 ETX是结尾,对应16进制02  03,这时候TextLineCodecFactory需要怎么设置呢?

 

 

核心代码:

        TextLineCodecFactory tlcf = new TextLineCodecFactory(Charset.forName("gb2312"),new LineDelimiter("STXETX"),new LineDelimiter("STXETX"));

目前收到消息如下:没有正常调用接收消息的messageReceived方法

 

完整代码:

package com.yice.cloud.websocket;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;

import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.LineDelimiter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MinaTimeTest {
    private static final int PORT = 9123;
     
    public static void main(String[] args) throws IOException {
        //首先,我们为服务端创建IoAcceptor,NioSocketAcceptor是基于NIO的服务端监听器
        IoAcceptor acceptor = new NioSocketAcceptor();
        //接着,如结构图示,在Acceptor和IoHandler之间将设置一系列的Fliter"\r\n", "\r\n"
        //包括记录过滤器和编解码过滤器。其中TextLineCodecFactory是mina自带的文本解编码器
        acceptor.getFilterChain().addLast("logger", new LoggingFilter());
        TextLineCodecFactory tlcf = new TextLineCodecFactory(Charset.forName("gb2312"),new LineDelimiter("STXETX"),new LineDelimiter("STXETX"));
        //CustomProtocolCodecFactory tlcf = new CustomProtocolCodecFactory(Charset.forName("gb2312"));
        
        acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(tlcf));
        /*acceptor.getFilterChain().addLast("codec",
                new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));*/
        
        
        //配置事务处理Handler,将请求转由TimeServerHandler处理。
        acceptor.setHandler(new TimeServerHandler());
        //配置Buffer的缓冲区大小
        acceptor.getSessionConfig().setReadBufferSize(2048);
        //设置等待时间,每隔IdleTime将调用一次handler.sessionIdle()方法
        acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
        //绑定端口
        acceptor.bind(new InetSocketAddress(PORT));
    }
    

    
    static class TimeServerHandler extends IoHandlerAdapter {
        private Logger logger = LoggerFactory.getLogger(this.getClass());
        public void exceptionCaught(IoSession session, Throwable cause)
                throws Exception {
            cause.printStackTrace();
        }
 
        public void messageReceived(IoSession session, Object message)
                throws Exception {
            
            logger.info("接受消息成功..." + message.toString());
            super.messageReceived(session, message);
        }
        
        public void sessionIdle(IoSession session, IdleStatus status)
                throws Exception {
            System.out.println("IDLE ==============" + session.getIdleCount(status));
        }
    }

}
 

 

 

 

加载中
0
yong230
yong230

mina现在都没人用了,为何不用netty

0
JunOnes
JunOnes

我随便写了个,貌似可以用,参考看看

package com.test.mina;

import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.LineDelimiter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;

public class MinaTimeTest {
    private static final int PORT = 9123;

    public static void main(String[] args) throws IOException {
        IoAcceptor acceptor = new NioSocketAcceptor();
        acceptor.getFilterChain().addLast("logger", new LoggingFilter());
        StxEtxLineProtocolFactory codec = new StxEtxLineProtocolFactory(Charset.forName("GB2312"), 1024, "STX", "ETX");

        acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(codec));

        //配置事务处理Handler,将请求转由TimeServerHandler处理。
        acceptor.setHandler(new TimeServerHandler());
        //配置Buffer的缓冲区大小
        acceptor.getSessionConfig().setReadBufferSize(2048);
        //设置等待时间,每隔IdleTime将调用一次handler.sessionIdle()方法
        acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
        //绑定端口
        acceptor.bind(new InetSocketAddress(PORT));
        System.out.println("server started at port " + PORT);
    }



    static class TimeServerHandler extends IoHandlerAdapter {
        private Logger logger = LoggerFactory.getLogger(this.getClass());
        public void exceptionCaught(IoSession session, Throwable cause)
                throws Exception {
            cause.printStackTrace();
        }

        public void messageReceived(IoSession session, Object message)
                throws Exception {

            logger.info("接受消息成功..." + message.toString());
            session.write("接收成功!");
        }

        public void sessionIdle(IoSession session, IdleStatus status)
                throws Exception {
            System.out.println("IDLE ==============" + session.getIdleCount(status));
        }
    }

}

package com.test.mina;

import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;

import java.nio.charset.Charset;

public class StxEtxLineProtocolFactory implements ProtocolCodecFactory {
    private final Charset charset;
    private final int bufferLength;
    private final String stx;
    private final String etx;
    public StxEtxLineProtocolFactory() {
        this(Charset.forName("UTF-8"), 2048, "STX", "ETX");
    }
    public StxEtxLineProtocolFactory(Charset charset, int bufferLength, String stx, String etx) {
        if (charset == null) {
            throw new IllegalArgumentException("charset must been set");
        }
        this.charset = charset;
        this.bufferLength = bufferLength;
        this.stx = stx;
        this.etx = etx;
    }
    public ProtocolEncoder getEncoder(IoSession ioSession) throws Exception {
        return new StxEtxLineEncoder(charset, stx, etx);
    }

    public ProtocolDecoder getDecoder(IoSession ioSession) throws Exception {
        return new StxEtxLineDecoder(charset, stx, etx, bufferLength);
    }
}

package com.test.mina;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.AttributeKey;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.Charset;
import java.util.Arrays;

public class StxEtxLineDecoder implements ProtocolDecoder {
    private static AttributeKey CONTEXT = new AttributeKey(StxEtxLineDecoder.class, "context");
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    private Charset charset;
    private byte[] stxBytes;
    private byte[] etxBytes;
    private int bufferLength;

    public StxEtxLineDecoder(Charset charset, String stx, String etx, int bufferLength) {
        if (bufferLength <= 0) {
            throw new IllegalArgumentException("bufferLength must be a positive value");
        }
        this.charset = charset;
        this.stxBytes = stx.getBytes(charset);
        this.etxBytes = etx.getBytes(charset);
        this.bufferLength = bufferLength;
    }

    public void decode(IoSession session, IoBuffer ioBuffer,
            ProtocolDecoderOutput protocolDecoderOutput) throws Exception {
        Context context = getContext(session);
        byte[] startBuffer = new byte[stxBytes.length];
        byte[] endBuffer = new byte[etxBytes.length];
        while (ioBuffer.hasRemaining()) {
            byte read = ioBuffer.get();
            if (!context.start) {
                context.stxBuffer.put(read);
                if (context.stxBuffer.position() == startBuffer.length) {
                    context.stxBuffer.flip();
                    Arrays.fill(startBuffer, (byte)0);
                    context.stxBuffer.get(startBuffer);
                    if (Arrays.equals(startBuffer, stxBytes)) {
                        context.start = true;
                        context.stxBuffer.clear();
                    } else {
                        context.stxBuffer.clear();
                        context.stxBuffer.put(startBuffer,1, startBuffer.length - 1);
                    }
                }
            } else {
                context.etxBuffer.put(read);
                if (context.etxBuffer.position() == endBuffer.length) {
                    context.etxBuffer.flip();
                    Arrays.fill(endBuffer, (byte) 0);
                    context.etxBuffer.get(endBuffer);
                    if (Arrays.equals(endBuffer, etxBytes)) {
                        context.start = false;
                        int length = context.buffer.position() - etxBytes.length + 1;
                        context.buffer.flip();
                        byte[] content = new byte[length];
                        context.buffer.get(content);
                        protocolDecoderOutput.write(new String(content, charset));
                        context.buffer.clear();
                        context.etxBuffer.clear();
                        continue;
                    } else {
                        context.etxBuffer.clear();
                        context.etxBuffer.put(endBuffer,1, startBuffer.length - 1);
                    }
                }
                if (context.buffer.position() > bufferLength - stxBytes.length - 1) {
                    context.start = false;
                    context.buffer.clear();
                    context.etxBuffer.clear();
                    logger.error("message too long, drop it");
                    continue;
                }
                context.buffer.put(read);
            }
        }
    }

    public void finishDecode(IoSession session, ProtocolDecoderOutput protocolDecoderOutput)
            throws Exception {
        // nothing to do.
    }

    private Context getContext(IoSession session) {
        Context ctx;
        ctx = (Context) session.getAttribute(CONTEXT);
        if (ctx == null) {
            ctx = new Context(bufferLength);
            session.setAttribute(CONTEXT, ctx);
        }
        return ctx;
    }

    public void dispose(IoSession session) throws Exception {
        Context ctx = (Context) session.getAttribute(CONTEXT);
        if (ctx != null) {
            ctx.buffer.free();
            ctx.stxBuffer.free();
            ctx.etxBuffer.free();
            session.removeAttribute(CONTEXT);
        }
    }

    private class Context {
        IoBuffer buffer;
        IoBuffer stxBuffer;
        IoBuffer etxBuffer;
        boolean start;

        Context(int bufferLength) {
            buffer = IoBuffer.allocate(bufferLength);
            stxBuffer = IoBuffer.allocate(stxBytes.length);
            etxBuffer = IoBuffer.allocate(etxBytes.length);
        }
    }
}

package com.test.mina;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoder;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.Charset;

public class StxEtxLineEncoder implements ProtocolEncoder {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    private Charset charset;
    private byte[] stxBytes;
    private byte[] etxBytes;
    public StxEtxLineEncoder(Charset charset, String stx, String etx) {
        this.charset = charset;
        this.stxBytes = stx.getBytes(charset);
        this.etxBytes = etx.getBytes(charset);
    }

    public void encode(IoSession session, Object o, ProtocolEncoderOutput protocolEncoderOutput)
            throws Exception {
        if (o instanceof String) {
            byte[] bytes = ((String)o).getBytes(charset);
            IoBuffer buffer = IoBuffer.allocate(stxBytes.length + bytes.length + etxBytes.length);
            buffer.put(stxBytes).put(bytes).put(etxBytes);
            buffer.flip();
            protocolEncoderOutput.write(buffer);
        } else {
            logger.error("unknown message format, must be string");
        }
    }

    public void dispose(IoSession session) throws Exception {
        // noting to do.
    }
}

 

返回顶部
顶部