socket服务端TIME_WAIT状态过多

心境111 发布于 2018/05/28 11:04
阅读 664
收藏 0

socket服务端,每分钟的量大概有130左右,接收数据后启一下线程进行处理,处理后用shutdownOutput()关闭,正常情况time_wait保持2分钟就会关闭,但是生产的time_wait状态一直在累加中。所以每隔几天服务端就会挂掉,客户端一直连不上socket服务。

@Bean
  public String ioAcceptor() throws Exception {
    new Thread() {
      @Override
      public void run() {
        //修改线程池大小修改为配置
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(smaxpoolsize / 2, smaxpoolsize, 0,
            TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
            new ThreadPoolExecutor.CallerRunsPolicy());

        ServerSocket server = null;
        try {
          logger.debug("启动socket服务开始");
          server = new ServerSocket(serverport);
          logger.debug("启动socket服务成功");
        } catch (Exception e) {
          logger.error("启动socket服务发生异常", e);
        }

        Socket socket = null;
        while (true) {
          String requestId = UniqueId.getId("CGNO");
          try {
            socket = server.accept();
          } catch (IOException e) {
            logger.error("接收socket消息发生异常, requestId: {}", requestId, e);
            shutdownSocket(socket, requestId);
            continue;
          }

          int activeCount = threadPool.getActiveCount();
          int corePoolSize = threadPool.getCorePoolSize();
          int maximumPoolSize = threadPool.getMaximumPoolSize();
          // 已经激活的线程数 <  最大线程数
          if (activeCount < maximumPoolSize) {
            try {
              threadPool.execute(
                  new CGSocketThread(socket, cgMinaService, encoding, isEncrypt, publicKey,
                      privateKey, serverIp, serverportCG, timeout, isMock, requestId));
            } catch (Exception e) {
              logger.error("新开线程异常: requestId: {}", requestId, e);
              shutdownSocket(socket, requestId);
            }
          } else {
            logger.warn("已激活线程数大于等于最大线程数了: 已激活线程数: {}, 核心线程数: {}, 最大线程数: {}, requestId: {}",
                activeCount,
                corePoolSize, maximumPoolSize, requestId);
            shutdownSocket(socket, requestId);
          }
        }
      }
    }.start();

    return "Success";
  }

  /**
   * 断开socket连接
   */
  private void shutdownSocket(Socket socket, String requestId) {
    try {
      socket.shutdownInput();
      OutputStream os = socket.getOutputStream();
      String result = "error";
      if (isEncrypt) {
        result = RSA.encrypt(result, publicKey);
      }
      os.write(result.getBytes());
      os.flush();
      socket.shutdownOutput();
    } catch (Exception e) {
      logger.error("关闭socket异常: requestId: {}", requestId, e);
    }
  }

这个是CGSocketThread类里面的run方法

@Override
  public void run() {
    InputStream is = null;
    InputStreamReader isr = null;
    BufferedReader br = null;
    OutputStream os = null;
    PrintWriter pw = null;
    String result = "";
    try {
      is = socket.getInputStream();
      isr = new InputStreamReader(is);
      br = new BufferedReader(isr);
      StringBuffer sb = new StringBuffer();
      String reqMsg = "";
      while ((reqMsg = br.readLine()) != null) {
        sb.append(reqMsg);
      }

      String messageStr = sb.toString();
      if (messageStr.indexOf("TOCG") == 0) {

        // 转换编码
//                messageStr = new String(messageStr.getBytes("UTF-8"),encoding);

        messageStr = messageStr.substring(4);
        CGSocketClient client = new CGSocketClient(serverIp, serverportCG, timeout, encoding,
            isEncrypt, publicKey, privateKey, isMock);
        result = client.sendAndReceive(messageStr);

        // 转换编码
//                result = new String(result.getBytes(encoding),"UTF-8");
      } else {
        // 转换编码
        messageStr = new String(messageStr.getBytes(encoding), "UTF-8");

        if (isEncrypt) {
          // 加密发送
          logger.info("{}===接收===接收业务端数据:{}", requestId, messageStr);
          messageStr = RSA.decrypt(messageStr, privateKey);
          logger.info("{}===解密===接收业务端数据:{}", requestId, messageStr);
        }
        if (StringUtils.isBlank(messageStr)) {
          result = "异常数据请求";
        } else {
          result = cgMinaService.serverExchange(messageStr);
        }
        if (isEncrypt) {
          logger.info("{}===返回===调用高速Socket服务返回结果:{}", requestId, result.toString());
          result = RSA.encrypt(result, publicKey);
          logger.info("{}===加密===高速Socket服务加密结果:{}", requestId, result.toString());
        }

        // 转换编码
        result = new String(result.getBytes("UTF-8"), encoding);
      }

    } catch (Exception e) {
      logger.error("{}处理socket消息发生异常", requestId, e);
    } finally {

      try {
        // 必须先关闭输入流才能获取下面的输出流
        socket.shutdownInput();
        // 获取输出流
        os = socket.getOutputStream();
        os.write(result.getBytes(encoding));
        os.flush();
        socket.shutdownOutput();
      } catch (IOException e1) {
        logger.error("{}关闭socket异常:", requestId, e1);
      }

}

加载中
0
Jian_Ming
Jian_Ming

你们客户端没有正常关闭连接?

心境111
心境111
服务端主动关闭了,客户端会有影响吗?
OSCHINA
登录后可查看更多优质内容
返回顶部
顶部