netty4.x服务端无法接收来自tomcat容器中启动的netty4.x客户端程序发送的消息

sh20161130 发布于 2016/12/01 16:17
阅读 664
收藏 0

收藏!数据建模最全知识体系解读!>>>


  1. 项目背景


本人最近在做一个有关web应用自动部署的程序,用netty来做deploy-proxy接收来自web后端的部署指令,deploy-proxy为服务端,deploy-client为客户端,web应用依赖?deploy-client,web应用由?deploy-proxy通过调用tomcat脚本启动,web应用启动后会通过client向proxy注册?。

    2. 问题

依赖deploy-client的web应用通过tomcat部署启动后能正常连接到deploy-proxy?,但client发送给proxy的消息,proxy却无法收到,通过wireshark抓包,显示服务端协议栈已收到包,且ACK了,但proxy的ChannelInboundHandlerAdapter实现的channelRead缺未被调用;通过eclipse环境中main启动客户端则能顺利与proxy通信。

除了服务端看似未收到消息外,无任何异常,无jar冲突,tomcat启动web应用无异常,被这个问题搞蒙了,求高手解答

    3. 代码

Netty4.x服务端代码如下:

public class ProxyServer {
	static Logger logger = Logger.getLogger(ProxyServer.class);
	
	private int maxActive = Runtime.getRuntime().availableProcessors();
	
	private ProxyContext context;
	
	private EventLoopGroup bossGroup = new NioEventLoopGroup();
	private EventLoopGroup workerGroup = new NioEventLoopGroup(maxActive);
	
	public ProxyServer(){
		
	}
	
	public void startup(){
		try {
			context = new ProxyContext();
			
			ApplicationContext springCtx = new ClassPathXmlApplicationContext("classpath:spring/spring-*.xml");
			context.setSpringCtx(springCtx);
			context.setDeployEnv(PropertiesParser.getProperty("deploy.env", "beta"));
			context.setDeployHome(PropertiesParser.getProperty("deploy.home"));
			context.setDeployRepUrl(PropertiesParser.getProperty("deploy.rep.url"));
			context.setExternalIp(PropertiesParser.getProperty("deploy.external.ip"));
			context.setInternalIp(PropertiesParser.getProperty("deploy.internal.ip"));
			DeployConstant.DEPLOY_HOST_IP = context.getInternalIp();
			
			ProxyBean proxy = ProxyBean.getInstance();
			proxy.setContext(context);
			
			ServerBootstrap bootstrap = new ServerBootstrap();
			bootstrap.group(bossGroup, workerGroup)
				.channel(NioServerSocketChannel.class)
				.childHandler(getChannelInitializer())
				.option(ChannelOption.SO_BACKLOG, 128)
				.childOption(ChannelOption.SO_KEEPALIVE, true)
				.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
			
			ChannelFuture future = bootstrap.bind(DeployConstant.PROXY_HOST_PORT).sync();
			logger.info("proxy-server startup successfully");
			future.channel().closeFuture().sync();
		} catch (Exception e) {
			logger.error("proxy-server startup failure", e);
		} 
	}
	
	public void shutdown(){
		workerGroup.shutdownGracefully();
		bossGroup.shutdownGracefully();
	}
	
	private ChannelInitializer<SocketChannel> getChannelInitializer(){
		
		return new ChannelInitializer<SocketChannel>() {
  
			@Override
			protected void initChannel(SocketChannel ch) throws Exception {
				DelimiterBasedFrameDecoder decoder = new DelimiterBasedFrameDecoder(8192, DeployConstant.lineDelimiter());
				decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
				ch.pipeline().addLast(decoder);
				ch.pipeline().addLast(new ReadTimeoutHandler(300, TimeUnit.SECONDS));
				ch.pipeline().addLast(new AppUpMessageHandler(context));
			}
			
		};
		
	}
	
	public static void main(String[] args) {
		ProxyServer server = new ProxyServer();
		server.startup();
	}
	
}



proxy服务端消息处理适配器代码:

public class AppUpMessageHandler extends ChannelInboundHandlerAdapter {
	static Logger logger = Logger.getLogger(AppUpMessageHandler.class);
	
	private ProxyContext context;
	private AttributeKey<AppChannel> APP_CHANNEL = AttributeKey.valueOf("appChannel");
	
	public AppUpMessageHandler(ProxyContext context){
		this.context = context;
	}
	
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		ctx.fireChannelActive();
		logger.info("channelActive.....");
	}
	
	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		System.out.println("channel 通道 Read 读取 Complete 完成");
		ctx.flush();
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		ByteBuf byteBuf = (ByteBuf)msg;
		try {
			byte msgType = byteBuf.getByte(0);
			byte ackType = byteBuf.getByte(1);
			int msgLen = byteBuf.getShort(2);
			byte[] dst = new byte[msgLen];
			byteBuf.getBytes(4, dst);
			String msgBody = new String(dst);
			
			StringBuffer sb = new StringBuffer();
			sb.append("[")
			  .append("MT").append(":").append(msgType).append(", ")
			  .append("AT").append(":").append(ackType).append(", ")
			  .append("ML").append(":").append(msgLen).append(", ")
			  .append("MB").append(":").append(msgBody)
			  .append("]");
			
			//注册并需要确认
			if(msgType == EventType.REGISTER.getEventType() && ackType == AckType.ACK.getAckType()){
				logger.info("::: APP.UP.REGISTER ::: " + sb.toString());
				handleRegister(msgBody, ctx);
			}
			
			//应用上报应用信息,无需确认
			else if(msgType == EventType.APPINFO.getEventType() && ackType == AckType.UNACK.getAckType()){
				logger.info("::: APP.UP.APPINFO ::: " + sb.toString());
				AppChannel appChannel = ctx.channel().attr(APP_CHANNEL).get();
				JSONObject ackBody = JSON.parseObject(msgBody);
				appChannel.setPid(ackBody.getInteger("pid"));
				appChannel.setUpTime(ackBody.getDate("upTime"));
				
				try {
					Event event = new Event();
					event.setAppId(appChannel.getAppId());
					event.setEventType(EventType.APPINFO);
					ProxyContext.eventQueue.offer(event);
				} catch (Exception e) {
					logger.error(e.getMessage(), e);
				}
			}
			
			//心跳并需要确认
			else if(msgType == EventType.KEEPALIVE.getEventType() && ackType == AckType.ACK.getAckType()){
				logger.debug("::: APP.UP.KEEPALIVE ::: " + sb.toString());
				handleKeepalive(msgBody, ctx);
			}
			
			else{
				logger.error("::: APP.UP.UNKNOWN ::: " + sb.toString());
			}
		} catch (Exception e) {
			logger.error(e.getMessage(), e);
		} finally {
			ReferenceCountUtil.release(msg);
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		logger.info("exceptionCaught.....");
		logger.error(cause.getMessage(), cause);
		AppChannel appChannel = ctx.channel().attr(APP_CHANNEL).get();
		if(appChannel != null){
			if(context.getAppChannelMap().containsKey(appChannel.getAppId()))
				context.getAppChannelMap().remove(appChannel.getAppId());
		}
	}
	
	private void handleRegister(String msgBody, ChannelHandlerContext ctx) throws UnsupportedEncodingException{
		JSONObject ackBody = JSON.parseObject(msgBody);
		String appId = ackBody.getString("appId");
		
		ackBody.clear();
		ackBody.put("ok", true);
		byte[] buf = ackBody.toString().getBytes(DeployConstant.CHARSET);
		ByteBuf ackMsg = UnpooledByteBufAllocator.DEFAULT.heapBuffer(DeployConstant.HEADER_LEN + buf.length);
		
		//MT(messageType)
		ackMsg.writeByte(EventType.REGISTER.getEventType());
		//AT(ackType)
		ackMsg.writeByte(AckType.UNACK.getAckType());
		//ML(messageLen)
		ackMsg.writeShort((short)buf.length);
		//MB(messageBody)
		ackMsg.writeBytes(buf);
		ackMsg.writeBytes(DeployConstant.lineDelimiter());
		
		//ack
		ctx.channel().writeAndFlush(ackMsg);
		ctx.flush();
		
		AppChannel appChannel = new AppChannel(appId, ctx);
		ctx.channel().attr(APP_CHANNEL).set(appChannel);
		
		context.getAppChannelMap().put(appId, appChannel);
	}
	
	private void handleKeepalive(String msgBody, ChannelHandlerContext ctx) throws UnsupportedEncodingException{
		AppChannel userChannel = ctx.channel().attr(APP_CHANNEL).get();
		
		ByteBuf ackMsg = UnpooledByteBufAllocator.DEFAULT.heapBuffer(DeployConstant.HEADER_LEN);
		//MT(messageType)
		ackMsg.writeByte(EventType.KEEPALIVE.getEventType());
		//AT(ackType)
		ackMsg.writeByte(AckType.UNACK.getAckType());
		//ML(messageLen)
		ackMsg.writeShort((short)0);
		ackMsg.writeBytes(DeployConstant.lineDelimiter());
		//ack
		ctx.channel().writeAndFlush(ackMsg);
		ctx.flush();
		
		userChannel.setKeepaliveTime(new Date());
	}
	
}



客户端发送消息的代码:

public class AppInstance {
	static Logger logger = Logger.getLogger(AppInstance.class);
	
	private ProxyClient proxyClient;
	
	//与代理服务器之间的TCP通道
	private Channel channel;
	
	private Timer timer = new Timer(true);
	//是否注册成功,收到注册的确认后,更新该值
	private AtomicBoolean register = new AtomicBoolean(false);
	//是否上报应用信息
	private AtomicBoolean upAppinfo = new AtomicBoolean(false);
	
	private Date upTime = new Date();
	private Date keepaliveTime = new Date();
	
	private AppConf conf;
	private AppInfoEvent appInfo;
	
	//只有收到shutdown的指令后,该属性才被设置为true
	private AtomicBoolean shutdown = new AtomicBoolean(false);
	
	private AtomicBoolean isStartRegister = new AtomicBoolean(false);
	private AtomicBoolean isStartUp = new AtomicBoolean(false);
	
	public AppInstance(AppConf conf){
		this.conf = conf;
		proxyClient = new ProxyClient(this);
	}
	
	public void reset(){
		register.set(false);
		try {
			if(channel != null && channel.isActive())
				channel.close();
		} catch (Exception e) {
			logger.info(e.getMessage(), e);
		}
		channel = null;
	}
	
	/**
	 * 启动注册线程
	 */
	public synchronized void startRegister(){
		if(!isStartRegister.get()){
			Thread registerThread = new Thread(new Runnable() {
				
				@Override
				public void run() {
					while(true){
						try {
							if(channel != null && !channel.isActive())
								channel = null;
							if(channel == null)
								channel = proxyClient.getChannel(DeployConstant.PROXY_HOST_IP, DeployConstant.PROXY_HOST_PORT);
							
							//重新注册
							if(!register.get() && channel != null && channel.isActive()){
								JSONObject msgBody = new JSONObject();
								msgBody.put("appId", conf.getAppId());
								byte[] buf = msgBody.toString().getBytes(DeployConstant.CHARSET);
								logger.info("send app register ...");
								writeByteBuf(channel, EventType.REGISTER, AckType.ACK, buf);
							}
						} catch (Exception e) {
							logger.info(e.getMessage(), e);
						}
						
						try {
							Thread.sleep(3000);
						} catch (InterruptedException e) {
							logger.info(e.getMessage(), e);
						}
					}
				}
				
			}, "AppRegisterThread");
			registerThread.start();
			isStartRegister.set(true);
		}
	}
	
	public void stop(){
		timer.cancel();
		if(channel != null && channel.isActive())
			channel.close();
	}
	
	/**
	 * 启动上报
	 */
	public synchronized void startUp(){
		if(!isStartUp.get()){
			timer.schedule(new TimerTask() {
				
				@Override
				public void run() {
					try {
						//心跳检测
						if(register.get() && channel != null && channel.isActive() && (System.currentTimeMillis() - keepaliveTime.getTime() > 5 * 1000)){
							try {
								writeByteBuf(channel, EventType.KEEPALIVE, AckType.ACK, new byte[0]);
							} catch (Exception e) {
								logger.info(e.getMessage(), e);
								register.set(false);
							}
						}
						
						//上报应用信息
						if(register.get() && channel != null && channel.isActive() && !upAppinfo.get()){
							if(appInfo == null){
								appInfo = new AppInfoEvent();
								appInfo.setPid(JvmResourceStat.getInstance().getPid());
								appInfo.setAppId(conf.getAppId());
								appInfo.setAppName(conf.getAppName());
								appInfo.setDatev(conf.getDatev());
								appInfo.setUpTime(upTime);
								
								appInfo.setJvmArgs(conf.getJvmArgs());
								appInfo.setPort(conf.getPort());
								appInfo.setConnectionTimeout(conf.getConnectionTimeout());
								appInfo.setMaxThreads(conf.getMaxThreads());
								appInfo.setAcceptCount(conf.getAcceptCount());
							}
							String body = JSON.toJSONString(appInfo);
							writeByteBuf(channel, EventType.APPINFO, AckType.UNACK, body.getBytes(DeployConstant.CHARSET));
							upAppinfo.set(true);
						}
					} catch (Exception e) {
						logger.info(e.getMessage(), e);
					}
				}
				
			}, 0, 3*1000);
			isStartUp.set(true);
		}
		
	}
	
	public void writeByteBuf(Channel channel, EventType messageType, AckType ackType, byte[] buf){
		ByteBuf byteBuf = UnpooledByteBufAllocator.DEFAULT.heapBuffer(DeployConstant.HEADER_LEN + buf.length);
		byteBuf.writeByte(messageType.getEventType());
		byteBuf.writeByte(ackType.getAckType());
		byteBuf.writeShort((short)buf.length);
		byteBuf.writeBytes(buf);
		byteBuf.writeBytes(DeployConstant.lineDelimiter());
		channel.writeAndFlush(byteBuf);
	}

	public Date getKeepaliveTime() {
		return keepaliveTime;
	}
	public void setKeepaliveTime(Date keepaliveTime) {
		this.keepaliveTime = keepaliveTime;
	}

	public Date getUpTime() {
		return upTime;
	}

	public AtomicBoolean getRegister() {
		return register;
	}

	public AtomicBoolean getShutdown() {
		return shutdown;
	}
	
	public static void main(String[] args) throws InterruptedException {
		AppConf conf = new AppConf();
		conf.setAppId("123");
		conf.setAppName("abc");
		AppInstance app = new AppInstance(conf);
		app.startRegister();
		app.startUp();
		Thread.currentThread().join();
	}
	
}











加载中
0
s
sh20161130

已解决问题,还是粗心大意导致的

ackMsg.writeByte(EventType.REGISTER.getEventType());
ackMsg.writeByte(AckType.UNACK.getAckType());
ackMsg.writeShort((short)buf.length);
ackMsg.writeBytes(buf);
ackMsg.writeBytes(DeployConstant.lineDelimiter());

ctx.channel().writeAndFlush(ackMsg);

加粗斜线的这两行有问题,应该修改为:
ctx.channel().writeAndFlush(ackMsg);
ctx.channel().writeAndFlush(DeployConstant.lineDelimiter());
因粘包符长度未被计入数据包的长度,而却被写入数据包实际未写入,故服务端不知道如何decode数据流



返回顶部
顶部