netty4 服务端并发数据丢包

无语寒星 发布于 2016/02/22 16:03
阅读 3K+
收藏 0

<无详细内容>

加载中
0
无语寒星
无语寒星
package org.dht.vehicle.data.com;


import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


import org.dht.vehicle.com.deCoder.BJVehicleDeviceDataDecoder;
import org.dht.vehicle.com.message.MessageManager;
import org.dht.vehicle.com.message.MessageSendVehicleRegister;


import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;


public class BJTCPComService {
 static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
 private static ChannelFuture f = null;;
 private static EventLoopGroup group = new NioEventLoopGroup();
 private static Bootstrap b = new Bootstrap();
 private static Map<String, DeviceConInfo> map = new ConcurrentHashMap<String, DeviceConInfo>();


 public static void start() {
 // TODO Auto-generated method stub


 b.group(group)
 .channel(NioSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG, 1024)
            .option(ChannelOption.SO_RCVBUF, 1024*1024)
                .option(ChannelOption.SO_SNDBUF, 10*1024*1024)      
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) 
 .handler(new ChannelInitializer<SocketChannel>() {
 @Override
 protected void initChannel(SocketChannel ch)
 throws Exception {
 ChannelPipeline p = ch.pipeline();
 p.addLast(new BJVehicleDeviceDataDecoder());
 p.addLast(new DeviceClientHandler());
 }
 });


 }


 public static Map<String, DeviceConInfo> getMap() {
 return map;
 }


 public static Channel getChannel() {
 if (null != f)
 return f.channel();
 return null;
 }


 public static void connect(String ip, String port)
 throws NumberFormatException, InterruptedException {


 ChannelFuture f = b.connect(ip, Integer.parseInt(port)).sync();
 DeviceConInfo d = new DeviceConInfo();
 d.socketChannel = (SocketChannel) f.channel();
 map.put(String.valueOf(1), d);
 }


 public static void connect(int num, int oneNums, String ip, String port,
 int beginID) {
 for (int i = 0; i < num; i++) {


 try {
 ChannelFuture f = b.connect(ip, Integer.parseInt(port)).sync();


 System.out.println("====" + MessageManager.getMessageManger()
 + "====" + f.channel());
 DeviceConInfo d = new DeviceConInfo();
 String strID = String.format("%07d", i + 1 + beginID);
 String identiCode = "abcdefghij" + strID;
 d.socketChannel = (SocketChannel) f.channel();
 d.identiCode = identiCode;
 d.onState = BJProtocolConst.CONNECTED;
 map.put(identiCode, d);
 MessageSendVehicleRegister messagePack = new MessageSendVehicleRegister(
 null, 0, d);


 MessageManager.getMessageManger().addSocketMessage(messagePack);


 Thread.sleep(5);
 } catch (InterruptedException e) {
 // TODO Auto-generated catch block
 e.printStackTrace();
 }
 }
 }


 public static void connectNums() {
 System.out.println("======client nums:" + map.size() + "=====");
 }


 public static int getOnLineDevices() {
 int nums = 0;
 for (Map.Entry entry : map.entrySet()) {
 DeviceConInfo devConInfo = (DeviceConInfo) entry.getValue();


 if (BJProtocolConst.LOGINED == devConInfo.onState) {
 nums++;
 }


 }
 return nums;


 }


 public static void diConnect() {


 for (Map.Entry entry : map.entrySet()) {
 DeviceConInfo devConInfo = (DeviceConInfo) entry.getValue();


 if (null != devConInfo.socketChannel)
 devConInfo.socketChannel.close();
 try {
 devConInfo.socketChannel.closeFuture().sync();
 } catch (InterruptedException e) {
 // TODO Auto-generated catch block
 e.printStackTrace();
 }
 map.remove(entry.getKey());
 }


 }


 public static void stop() {
 diConnect();
 group.shutdownGracefully();


 }


 public static DeviceConInfo update(String identiCode,
 DeviceConInfo deviceConInfo) {


 return map.put(identiCode, deviceConInfo);


 }


 public static DeviceConInfo get(String identiCode) {


 return map.get(identiCode);


 }


 public static void remove(SocketChannel socketChannel) {


 for (Map.Entry entry : map.entrySet()) {
 DeviceConInfo devConInfo = (DeviceConInfo) entry.getValue();
 if (devConInfo.socketChannel == socketChannel) {
 map.remove(entry.getKey());
 }
 }
 }
}


 

 



0
无语寒星
无语寒星
package org.dht.vehicle.com.socketfactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;




public class BasicSocketServer implements SocketServer {

	protected ChannelHandler serverChannel;	
	protected Channel acceptorChannel;
	protected ServerBootstrap b ;	
	protected EventLoopGroup bossGroup ;
	protected EventLoopGroup workerGroup ;
	protected List<Integer> port;
	protected List<ChannelFuture> channelFuture;	
	
	public BasicSocketServer(){
		this.channelFuture = new ArrayList<ChannelFuture>();
	}
	public void setServerChannel(ChannelHandler serverChannel){
		this.serverChannel = serverChannel;
	}
	public ChannelHandler getServerChannel(){
		return this.serverChannel ;
	}
	public void setPort(List<Integer> port){
		this.port = port;
	}
	
	public void Start() throws Exception {
		// TODO Auto-generated method stub
		try{
			
			createServerBootstrap();
			
		}finally{
			Stop();
		}
	}

	public void Stop() throws Exception {
		// TODO Auto-generated method stub
		closeFuture();
		bossGroup.shutdownGracefully();
		workerGroup.shutdownGracefully();
	}

	public void Restart() throws Exception {
		// TODO Auto-generated method stub
		Stop();
	    Start();
	}

	public void createServerBootstrap() throws Exception{
			
		// TODO Auto-generated method stub
	try{
		 b           = new ServerBootstrap();
	    	
	     bossGroup   = new NioEventLoopGroup(1);
		 workerGroup = new NioEventLoopGroup();	
		 		 		 
	    b.group(bossGroup, workerGroup)
	     .channel(NioServerSocketChannel.class)
	     .option(ChannelOption.SO_BACKLOG, 1024)
	     .option(ChannelOption.SO_RCVBUF, 10*1024*1024)
         .option(ChannelOption.SO_SNDBUF, 1024*1024)
	     .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)   
         .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
         .childOption(ChannelOption.SO_KEEPALIVE, true)
	     .handler(new LoggingHandler(LogLevel.INFO))
	     .childHandler(serverChannel);
	    bindPort();		 
		// Wait until the server socket is closed.
		closeFuture();
		 
	 } finally {
         // Shut down all event loops to terminate all threads.
         bossGroup.shutdownGracefully();
         workerGroup.shutdownGracefully();
     }
	}


	public void bindPort() throws InterruptedException {
		// TODO Auto-generated method stub		
		 
		
		Iterator<Integer> iter = port.iterator();           
		int nPort;            
	        while(iter.hasNext())  
	        {  
	        	nPort = (Integer)iter.next().intValue();  
	        	if(nPort>0){
	        	ChannelFuture f = b.bind(nPort).sync();		        	
	        	channelFuture.add(f);
	        	}
	        	//port.remove(iter.next());
	        } 		 		
	}


	
	public void closeFuture() throws InterruptedException {
		// TODO Auto-generated method stub
		
		  Iterator<ChannelFuture> iter = channelFuture.iterator();        
          ChannelFuture f = null;
          
	        while(iter.hasNext())  
	        {  
	        
	            f=(ChannelFuture)iter.next();
	            if(null != f){
		        	f.channel().closeFuture().sync(); 	
	        	 }
	        	//port.remove(iter.next());
	        } 		 			
	}

}



0
k
kaiyuanniao

Netty物联网高并发系统
课程观看地址:http://www.xuetuwuyou.com/course/178
课程出自学途无忧网:http://www.xuetuwuyou.com/

学会构建netty高并发TCP长连接服务器架构

软件版本:
Netty4.1.11
spring4.3.3Release
maven3.2
课程目录:
第1节、netty物联网介绍
第2节、Netty服务器编写
第3节、Netty客户端与服务器通信
第4节、编码解码
第5节、Netty服务器架构上
第6节、Netty服务器架构下
第7节、Netty客户端架构
第8节、Netty客户端长连接架构

相关课程推荐:
深入浅出Netty源码剖析
课程观看地址:http://www.xuetuwuyou.com/course/157

NIO+Netty5各种RPC架构实战演练视频教程
课程观看地址:http://www.xuetuwuyou.com/course/52

Netty三部曲,夜行侠老师带你玩转netty
课程观看地址:http://www.xuetuwuyou.com/course/175/tasks

Netty实战高性能分布式RPC
课程观看地址:http://www.xuetuwuyou.com/course/171
 

0
t
tomrichdow

Netty源码剖析视频教程
网盘地址:https://pan.baidu.com/s/1MFV_uq4PvdPHdrOJpuLFIA 密码: h3if
备用地址(腾讯微云):http://url.cn/5R2xYI4 密码:OmbopD

返回顶部
顶部