thrift 使用 TFramedTransport 分层传输协议.如何获取客户端IP

小乞丐 发布于 2016/06/02 11:35
阅读 2K+
收藏 0

服务端:

  

TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(new InetSocketAddress("0.0.0.0", port), timeout);
 THsHaServer.Args trArgs = new THsHaServer.Args(serverTransport);
 /**TTransport协议类型参见thrift.txt文件 */
 trArgs.protocolFactory(new TCompactProtocol.Factory());
 trArgs.transportFactory(new TFramedTransport.Factory());
 trArgs.processorFactory(new TProcessorFactory(new LogProcessor(processor)));
 server = new THsHaServer(trArgs);
 Executors.newSingleThreadExecutor().execute(new Runnable() {
 @Override
 public void run() {
 server.serve(); //启动服务
 }
 });



LogProcessor:

public class LogProcessor implements TProcessor {

	private Logger logger = Logger.getLogger("access");
	
	 private TProcessor processor;  
     
    public LogProcessor(TProcessor processor)  
    {  
        this.processor = processor;  
    } 
    
	 /** 
     * 该方法,客户端每调用一次,就会触发一次 
     */ 
	@Override
	public boolean process(TProtocol in, TProtocol out) throws TException {
		  
		 /** 
         * 从TProtocol里面获取TTransport对象 
         * 把TTransport对象转换成TSocket,然后在TSocket里面获取Socket,就可以拿到客户端IP 
         */  
		TSocket socket = (TSocket)in.getTransport();  
	   SocketAddress clientIp = socket.getSocket().getRemoteSocketAddress();
	   logger.info("接收到来至[" + clientIp + "]的请求.");
	   return processor.process(in, out);
	}

}



 

TSocket socket = (TSocket)in.getTransport();   强转会报错。

in.getTransport() 得到的是TFramedTransport


错误信息:

11:35:25,941 [pool-1-thread-1] ERROR (org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.java:524) - Unexpected throwable while invoking!
java.lang.ClassCastException: org.apache.thrift.transport.TFramedTransport cannot be cast to org.apache.thrift.transport.TSocket
at com.retail.wst.thrift.monitor.LogProcessor.process(LogProcessor.java:48)
at org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.invoke(AbstractNonblockingServer.java:518)
at org.apache.thrift.server.Invocation.run(Invocation.java:18)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


加载中
0
LarrySu
LarrySu

可以通过TServerEventHandler获取到.

给你一段代码,不清楚的可以追问:

import java.net.Socket;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.transport.TTransport;

/**
 * Thrift调用监控事件
 * 
 * 仅打印连接信息日志.
 * 
 * @author <a href="mailto:larry7696@gmail.com">Larry</a>
 * @version 0.1
 */
public class MonitorServerEventHandler implements TServerEventHandler {

    private static final Log LOG = LogFactory.getLog(MonitorServerEventHandler.class);

    @Override
    public ServerContext createContext(TProtocol arg0, TProtocol arg1) {
	if (arg0 != null && arg0.getTransport() != null) {
	    Socket socket = ((TSocket) arg0.getTransport()).getSocket();
	    LOG.info("[Monitor] ThriftServer Socket Info : server地址: " + socket.getLocalAddress() + " , server端口: "
		    + socket.getLocalPort() + " , client地址: " + socket.getInetAddress() + " , client端口: "
		    + socket.getPort());
	}
	return null;
    }

    @Override
    public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) {
    }

    @Override
    public void preServe() {
    }

    @Override
    public void processContext(ServerContext arg0, TTransport arg1, TTransport arg2) {
    }

}



0
小乞丐
小乞丐

引用来自“LarrySu”的评论

可以通过TServerEventHandler获取到.

给你一段代码,不清楚的可以追问:

import java.net.Socket;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.transport.TTransport;

/**
 * Thrift调用监控事件
 * 
 * 仅打印连接信息日志.
 * 
 * @author <a href="mailto:larry7696@gmail.com">Larry</a>
 * @version 0.1
 */
public class MonitorServerEventHandler implements TServerEventHandler {

    private static final Log LOG = LogFactory.getLog(MonitorServerEventHandler.class);

    @Override
    public ServerContext createContext(TProtocol arg0, TProtocol arg1) {
	if (arg0 != null && arg0.getTransport() != null) {
	    Socket socket = ((TSocket) arg0.getTransport()).getSocket();
	    LOG.info("[Monitor] ThriftServer Socket Info : server地址: " + socket.getLocalAddress() + " , server端口: "
		    + socket.getLocalPort() + " , client地址: " + socket.getInetAddress() + " , client端口: "
		    + socket.getPort());
	}
	return null;
    }

    @Override
    public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) {
    }

    @Override
    public void preServe() {
    }

    @Override
    public void processContext(ServerContext arg0, TTransport arg1, TTransport arg2) {
    }

}



荏苒不行...

org.apache.thrift.transport.TFramedTransport cannot be cast to org.apache.thrift.transport.TSocket

0
LarrySu
LarrySu

引用来自“LarrySu”的评论

可以通过TServerEventHandler获取到.

给你一段代码,不清楚的可以追问:

import java.net.Socket;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.transport.TTransport;

/**
 * Thrift调用监控事件
 * 
 * 仅打印连接信息日志.
 * 
 * @author <a href="mailto:larry7696@gmail.com">Larry</a>
 * @version 0.1
 */
public class MonitorServerEventHandler implements TServerEventHandler {

    private static final Log LOG = LogFactory.getLog(MonitorServerEventHandler.class);

    @Override
    public ServerContext createContext(TProtocol arg0, TProtocol arg1) {
	if (arg0 != null && arg0.getTransport() != null) {
	    Socket socket = ((TSocket) arg0.getTransport()).getSocket();
	    LOG.info("[Monitor] ThriftServer Socket Info : server地址: " + socket.getLocalAddress() + " , server端口: "
		    + socket.getLocalPort() + " , client地址: " + socket.getInetAddress() + " , client端口: "
		    + socket.getPort());
	}
	return null;
    }

    @Override
    public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) {
    }

    @Override
    public void preServe() {
    }

    @Override
    public void processContext(ServerContext arg0, TTransport arg1, TTransport arg2) {
    }

}



引用来自“小乞丐”的评论

荏苒不行...

org.apache.thrift.transport.TFramedTransport cannot be cast to org.apache.thrift.transport.TSocket

sorry.

TFramedTransport本身不是TSocket,而是在此基础上包了一层,所以直接转换TSocket会出错.
而真正的TSocket其实是TFramedTransport中的transport_,但可惜的是此属性为private.
所以,结论是常规的方式是无法获取到真正的TSocket的.


如果硬要这么干,只能用非常规的方式.比如继承TFramedTransport,自己保存transport_,并重写Factory的getTransport方法.

0
LarrySu
LarrySu

还有一个办法,但比较恶心:


复制TFramedTransport.java的源代码新建一个类TFramedTransport2.java.
将代码"private TTransport transport_ = null;"改为"public TTransport transport_ = null;".
其它调用的地方都用TFramedTransport2.java.
然后通过
TSocket socket = (TSocket)((TFramedTransport2)in.getTransport()).transport_; 
获取TSocket对象.

0
小乞丐
小乞丐

引用来自“LarrySu”的评论

还有一个办法,但比较恶心:


复制TFramedTransport.java的源代码新建一个类TFramedTransport2.java.
将代码"private TTransport transport_ = null;"改为"public TTransport transport_ = null;".
其它调用的地方都用TFramedTransport2.java.
然后通过
TSocket socket = (TSocket)((TFramedTransport2)in.getTransport()).transport_; 
获取TSocket对象.

java.lang.ClassCastException: org.apache.thrift.transport.TMemoryInputTransport cannot be cast to org.apache.thrift.transport.TSocket

按照你说的方式 任然不行哦。

LarrySu
LarrySu
你用本地内存方式,而的不是socket方式调用吗?
0
LarrySu
LarrySu

引用来自“LarrySu”的评论

还有一个办法,但比较恶心:


复制TFramedTransport.java的源代码新建一个类TFramedTransport2.java.
将代码"private TTransport transport_ = null;"改为"public TTransport transport_ = null;".
其它调用的地方都用TFramedTransport2.java.
然后通过
TSocket socket = (TSocket)((TFramedTransport2)in.getTransport()).transport_; 
获取TSocket对象.

引用来自“小乞丐”的评论

java.lang.ClassCastException: org.apache.thrift.transport.TMemoryInputTransport cannot be cast to org.apache.thrift.transport.TSocket

按照你说的方式 任然不行哦。

客户端调用的代码是什么?

Transport的类型必须为TSocket才能这样强制转换,明显你的不是TSocket.

我随手写了一个thrift客户端代码给你参考一下:


TTransport transport;  
try {  
    transport = new TFramedTransport.Factory().getTransport(new TSocket("localhost", 9999));
    TProtocol protocol = new TCompactProtocol.Factory().getProtocol(transport);
    transport.open();
    XXX.Client client = new XXX.Client(protocol);
    System.out.println(client.xxxxx());
} catch (TTransportException e) {  
    e.printStackTrace();  
} catch (TException e) {  
    e.printStackTrace();  
} finally {
    if (transport != null) {
        transport.close();  
    }
}

因为是随手写的,不保证没有错误.仅供参考..


0
小乞丐
小乞丐

引用来自“LarrySu”的评论

还有一个办法,但比较恶心:


复制TFramedTransport.java的源代码新建一个类TFramedTransport2.java.
将代码"private TTransport transport_ = null;"改为"public TTransport transport_ = null;".
其它调用的地方都用TFramedTransport2.java.
然后通过
TSocket socket = (TSocket)((TFramedTransport2)in.getTransport()).transport_; 
获取TSocket对象.

引用来自“小乞丐”的评论

java.lang.ClassCastException: org.apache.thrift.transport.TMemoryInputTransport cannot be cast to org.apache.thrift.transport.TSocket

按照你说的方式 任然不行哦。

引用来自“LarrySu”的评论

客户端调用的代码是什么?

Transport的类型必须为TSocket才能这样强制转换,明显你的不是TSocket.

我随手写了一个thrift客户端代码给你参考一下:


TTransport transport;  
try {  
    transport = new TFramedTransport.Factory().getTransport(new TSocket("localhost", 9999));
    TProtocol protocol = new TCompactProtocol.Factory().getProtocol(transport);
    transport.open();
    XXX.Client client = new XXX.Client(protocol);
    System.out.println(client.xxxxx());
} catch (TTransportException e) {  
    e.printStackTrace();  
} catch (TException e) {  
    e.printStackTrace();  
} finally {
    if (transport != null) {
        transport.close();  
    }
}

因为是随手写的,不保证没有错误.仅供参考..


客户端代码:

logger.info("开始连接任务调用监控服务器." + host + ":" + port);
		transport = new WstTFramedTransport(new TSocket(host, port, timeout));
		TProtocol protocol = new TCompactProtocol(transport);
		client = new MonitorTaskThriftServer.Client(protocol);
		transport.open();
		logger.info("任务调用监控服务器成功. " + host + ":" + port);

WstTFramedTransport 是复制TFramedTransport类,加了个getTransport方法 来获取transport_ 。

服务端代码:

try{
			TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(new InetSocketAddress("0.0.0.0", port), timeout);
			THsHaServer.Args trArgs = new THsHaServer.Args(serverTransport);
			/**TTransport协议类型参见thrift.txt文件 */
			trArgs.protocolFactory(new TCompactProtocol.Factory());
			trArgs.transportFactory(new WstTFramedTransport.Factory());
			//trArgs.processorFactory(new TProcessorFactory(processor));
			trArgs.processorFactory(new TProcessorFactory(new LogProcessor(processor)));
			server = new THsHaServer(trArgs);
			Executors.newSingleThreadExecutor().execute(new Runnable() {
				@Override
				public void run() {
					server.serve(); //启动服务
				}
			});
		}catch(Exception ex){
			logger.error("thrift 非阻塞服务启动异常." + ex.getMessage());
			throw new ThriftException(ex);
		}



LogProcessor 类:


 /** 
     * 该方法,客户端每调用一次,就会触发一次 
     */ 
	@Override
	public boolean process(TProtocol in, TProtocol out) throws TException {
		  
		 /** 
         * 从TProtocol里面获取TTransport对象 
         * 把TTransport对象转换成TSocket,然后在TSocket里面获取Socket,就可以拿到客户端IP 
         */  
		WstTFramedTransport transport = (WstTFramedTransport)in.getTransport();  
		TSocket socket = (TSocket)transport.getTransport();
	   SocketAddress clientIp = socket.getSocket().getRemoteSocketAddress();
	   logger.info("接收到来至[" + clientIp + "]的请求.");
	   return processor.process(in, out);
	}








0
LarrySu
LarrySu

引用来自“小乞丐”的评论

引用来自“LarrySu”的评论

还有一个办法,但比较恶心:


复制TFramedTransport.java的源代码新建一个类TFramedTransport2.java.
将代码"private TTransport transport_ = null;"改为"public TTransport transport_ = null;".
其它调用的地方都用TFramedTransport2.java.
然后通过
TSocket socket = (TSocket)((TFramedTransport2)in.getTransport()).transport_; 
获取TSocket对象.

引用来自“小乞丐”的评论

java.lang.ClassCastException: org.apache.thrift.transport.TMemoryInputTransport cannot be cast to org.apache.thrift.transport.TSocket

按照你说的方式 任然不行哦。

引用来自“LarrySu”的评论

客户端调用的代码是什么?

Transport的类型必须为TSocket才能这样强制转换,明显你的不是TSocket.

我随手写了一个thrift客户端代码给你参考一下:


TTransport transport;  
try {  
    transport = new TFramedTransport.Factory().getTransport(new TSocket("localhost", 9999));
    TProtocol protocol = new TCompactProtocol.Factory().getProtocol(transport);
    transport.open();
    XXX.Client client = new XXX.Client(protocol);
    System.out.println(client.xxxxx());
} catch (TTransportException e) {  
    e.printStackTrace();  
} catch (TException e) {  
    e.printStackTrace();  
} finally {
    if (transport != null) {
        transport.close();  
    }
}

因为是随手写的,不保证没有错误.仅供参考..


客户端代码:

logger.info("开始连接任务调用监控服务器." + host + ":" + port);
		transport = new WstTFramedTransport(new TSocket(host, port, timeout));
		TProtocol protocol = new TCompactProtocol(transport);
		client = new MonitorTaskThriftServer.Client(protocol);
		transport.open();
		logger.info("任务调用监控服务器成功. " + host + ":" + port);

WstTFramedTransport 是复制TFramedTransport类,加了个getTransport方法 来获取transport_ 。

服务端代码:

try{
			TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(new InetSocketAddress("0.0.0.0", port), timeout);
			THsHaServer.Args trArgs = new THsHaServer.Args(serverTransport);
			/**TTransport协议类型参见thrift.txt文件 */
			trArgs.protocolFactory(new TCompactProtocol.Factory());
			trArgs.transportFactory(new WstTFramedTransport.Factory());
			//trArgs.processorFactory(new TProcessorFactory(processor));
			trArgs.processorFactory(new TProcessorFactory(new LogProcessor(processor)));
			server = new THsHaServer(trArgs);
			Executors.newSingleThreadExecutor().execute(new Runnable() {
				@Override
				public void run() {
					server.serve(); //启动服务
				}
			});
		}catch(Exception ex){
			logger.error("thrift 非阻塞服务启动异常." + ex.getMessage());
			throw new ThriftException(ex);
		}



LogProcessor 类:


 /** 
     * 该方法,客户端每调用一次,就会触发一次 
     */ 
	@Override
	public boolean process(TProtocol in, TProtocol out) throws TException {
		  
		 /** 
         * 从TProtocol里面获取TTransport对象 
         * 把TTransport对象转换成TSocket,然后在TSocket里面获取Socket,就可以拿到客户端IP 
         */  
		WstTFramedTransport transport = (WstTFramedTransport)in.getTransport();  
		TSocket socket = (TSocket)transport.getTransport();
	   SocketAddress clientIp = socket.getSocket().getRemoteSocketAddress();
	   logger.info("接收到来至[" + clientIp + "]的请求.");
	   return processor.process(in, out);
	}








兄弟既然这样方法都使用了,就恶心到底吧:)

复制一份TNonblockingServer.java,
在SelectAcceptThread的handleAccept方法中添加你想要的代码,如下:



/**
     * Accept a new connection.
     */
    private void handleAccept() throws IOException {
      SelectionKey clientKey = null;
      TNonblockingTransport client = null;
      try {
        // accept the connection
        client = (TNonblockingTransport)serverTransport.accept();
        clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
        
        TNonblockingSocket socket = (TNonblockingSocket) client;
		System.out.println(socket.getSocketChannel().getRemoteAddress()); // 你要的东西在这里

        // add this key to the map
          FrameBuffer frameBuffer = createFrameBuffer(client, clientKey, SelectAcceptThread.this);

          clientKey.attach(frameBuffer);
      } catch (TTransportException tte) {
        // something went wrong accepting.
        LOGGER.warn("Exception trying to accept!", tte);
        tte.printStackTrace();
        if (clientKey != null) cleanupSelectionKey(clientKey);
        if (client != null) client.close();
      }
    }




LarrySu
LarrySu
如果你坚持要使用THsHaServer服务,那么这个也要复制一份. eg: public class WstTHsHaServer extends WstTNonblockingServer
返回顶部
顶部