使用线程池,一段时间后,发现有些线程假死

布谷鸟 发布于 2013/07/26 09:54
阅读 8K+
收藏 1

上一个问题:http://www.oschina.net/question/224858_119159

可能是判断有误,南辕北辙了,所以没有找到合适的方法,直到现在还没有解决.

很可能是我在线程池上处理的不得当,使用的是ThreadPoolExecutor这个东西,总是在运行一段时间之后 ,速度越来越慢,直到彻底跑不动。

每次彻底死掉之后,通过netstat -a,发现仍然有相当多的数量的线程(和核心线程的工作数量不相上下),一直占着资源,状态为ESTABLISHED,查询后得知表示两台机器建立连接,正在通信,它们持续很久。可能问题就出在这里,在这个环节上,是不是假死掉了,因为我发现这些线程持续好长时间,它们占用了执行任务的队列,导致新的任务没有办法添加进来,于是就死掉了。

尝试过动态调整加大线程池的数量参数,它们又能再跑一阵子,确信它们是在任务队列中堵死掉了。

工作线程是socket,允许在网络连接和通讯的过程中耗费一些时间,但是我如何能及时的干掉这些看似已经无法正常工作的线程呢?

昨天晚上尝试用一个守护线程定期检查,及时移除超时任务,不得其法,失败了。

初识线程池,请各位指教,不胜感激!


hi,@明月照大江 我已经把代码剥出来,在这里,就是一个按照smtp协议的简单会话,socket用的不多,劳驾帮忙分析一下哪里会出问题,可能造成线程死掉,不胜感激。

public class SmtpThread implements Runnable {
	private Socket socket = null;// 套接字
	private InputStream in = null;// 输入流
	private OutputStream out = null;// 输出流
	private static final int SMTP_PORT = 25;// smtp端口号
	private static final int TIME_OUT = 10000;// 10秒

	private Element element;

	/**
	 * 实例化
	 * 
	 * @param e
	 */
	public SmtpThread(Element e) {
		element = e;
	}  
public void run() {
 // TODO Auto-generated method stub
 // 创建连接先
 try {
 createConnection();// 发送邮件
 String rs = posetMail();
 if ("ok".equals(rs)) {
 System.out.println("发送成功");
 } else {
 System.out.println("发送失败,响应消息:" + rs);
 }
 } catch (IOException e) {
 // TODO Auto-generated catch block
 // 创建连接出错
 System.out.println("连接失败! 消息:" + e.toString());
 } finally {
 try {
 close();
 } catch (IOException e) {
 // TODO Auto-generated catch block
 e.printStackTrace();
 }
 }
 }
//于14:25补充
// 创建连接
	private void createConnection() throws IOException {
		socket = new Socket(element.getRemoteAddr(), SMTP_PORT, element.getLocalAddr(), element.getLocalPort());
		socket.setSoTimeout(TIME_OUT);
		// 获取数据流
		in = socket.getInputStream();
		out = socket.getOutputStream();

	}

	// 发送邮件,并返回响应信息,发送成功返回ok,失败返回响应信息
	private String posetMail() throws IOException {
		String res = response();
		if (parseResponseCode(res) != 220) {
			return res;
		}
		// 服务器握手
		this.writerString("helo " + element.getFromAddr());
		this.writerNewLine();
		res = response();
		if (parseResponseCode(res) != 250) {
			return res;
		}
		// 发信人地址
		this.writerString("mail from: <" + element.getFrom() + ">");
		this.writerNewLine();
		res = response();
		if (parseResponseCode(res) != 250) {
			return res;
		}
		// 接收人地址
		this.writerString("rcpt to: <" + element.getTo() + ">");
		this.writerNewLine();
		res = response();
		if (parseResponseCode(res) != 250) {
			return res;
		}
		// 开始发送数据
		this.writerString("data");
		this.writerNewLine();
		res = response();
		if (parseResponseCode(res) != 354) {
			return res;
		}
		// 发信人
		this.writerString("from: <" + element.getFrom() + ">");
		this.writerNewLine();
		// 收信人
		this.writerString("to: <" + element.getTo() + ">");
		this.writerNewLine();
		// 回信人地址
		this.writerString("reply-to: <" + element.getReplyTo() + ">");
		this.writerNewLine();
		// 标题
		this.writerString("subject: " + element.getTitle());
		this.writerNewLine();
		// 加入时间
		String date = new SimpleDateFormat("HH🇲🇲ss dd/MMM/yyyy ", Locale.US).format(new Date());
		this.writerString("date: " + date);
		this.writerNewLine();
		this.writerString("MIME-Version: 1.0");
		this.writerNewLine();
		// 指定为html邮件
		String charset = Charset.defaultCharset().displayName();
		this.writerString("Content-Type: text/html; charset=\"" + charset + "\"");
		this.writerNewLine();
		// 编码
		this.writerString("Content-Transfer-Encoding: base64");
		this.writerNewLine();
		// 设置优先级,为普通邮件
		this.writerString("X-Priority: 3");
		this.writerNewLine();
		this.writerString("X-Mailer: NSA Mail[Copyright(C)]");
		// 写入两个换行区分接下来的内容
		this.writerNewLine();
		this.writerNewLine();
		// 主体
		String content = element.getContext();
		byte[] data = (content != null ? content : "").getBytes();
		for (int k = 0; k < data.length; k += 54) {
			// 邮件内容
			this.writerString(Base64.encode(data, k, Math.min(data.length - k, 54)));
			this.writerNewLine();
		}
		// 准备结束
		this.writerString(".");
		this.writerNewLine();
		res = response();
		if (parseResponseCode(res) != 250) {
			return res;
		}
		// 退出
		this.writerString("quit");
		this.writerNewLine();
		res = response();
		if (parseResponseCode(res) != 221) {
			return res;
		}
		return "ok";
	}

	// 写入数据
	private void writerString(String str) throws IOException {
		out.write(str.getBytes());
		out.flush();
	}

	// 写入换行
	private void writerNewLine() throws IOException {
		writerString("\r\n");
	}

	// 获取响应信息
	private String response() throws IOException {
		byte[] buffer = new byte[1024];
		int k = 0;
		k = in.read(buffer);
		if (k == -1) {
			return null;
		}
		return new String(buffer, 0, k).trim();
	}

	// 解析响应码
	private static int parseResponseCode(String response) {
		if (response.length() > 3) {
			try {
				return Integer.parseInt(response.substring(0, 3));
			} catch (Exception e) {
				return -1;
			}
		} else {
			return -1;
		}
	}

	// 关闭资源
	private void close() throws IOException {
		if (in != null) {
			in.close();
		}
		if (out != null) {
			out.close();
		}
		if (socket != null) {
			socket.close();
		}
	}

}
加载中
0
袁不语
袁不语

如果 发送邮件 发生异常直接退出,就不会close连接了。当connection非常多的时候,还是很有可能发生的。这样虽然线程结束了,但是connection是没有释放的。

有两个建议

1. 将close的逻辑放在finally里面

2. 评估一下你机器的性能,建立合适大小的连接池,不要每次都创建。

1
大东哥
大东哥
public void run(){	
	try{
       //your code

	
	}catch(Throwable e){
			e.printStackTrace();
		}finally{
			close();
		}
}
首先,可以这样改改看,socket我也不太会,但我觉得可以先这样试试。
0
明月照大江
明月照大江
你要保证你的线程是正常退出的,我怀疑是你在 线程的 run方法中,遇到了几个没处理好的问题,导致线程挂起,当挂起的线程数量等于你的池总额的时候,就跑不动了,所有任务都只能阻塞,你将你的代码简化一下,我们来看看吧,
布谷鸟
布谷鸟
嗯,稍等。简化后我马上帖出来
0
0x0001
0x0001

看你这么说,应该是socket之类的程序,如果netstat 看到的数量和你线程数量差不多,这应该用的是oio,这样每个客户端需要一个线程来维护,如果你没关闭socket或者对方没有关闭socket,在线程内做业务逻辑操作的话,基本上是一个用户一个线程,没关闭就不会退出

0
布谷鸟
布谷鸟

引用来自“0x0001”的答案

看你这么说,应该是socket之类的程序,如果netstat 看到的数量和你线程数量差不多,这应该用的是oio,这样每个客户端需要一个线程来维护,如果你没关闭socket或者对方没有关闭socket,在线程内做业务逻辑操作的话,基本上是一个用户一个线程,没关闭就不会退出

我这里是保证它在通讯完成后close的,问题大概是在有些连接通信的时间没有尽头,如此线程永远没有办法结束,池也满了,程序就死了。起初只是希望做一个定期检查的线程来及时移除并终止这些超时的线程,又没有找到合适的方法、
0
huan
huan
java7 的concurrence 包支持超时关闭的线程
布谷鸟
布谷鸟
现在不敢换环境。是我代码写的有问题
0
布谷鸟
布谷鸟

引用来自“大东哥”的答案

public void run(){	
	try{
       //your code

	
	}catch(Throwable e){
			e.printStackTrace();
		}finally{
			close();
		}
}
首先,可以这样改改看,socket我也不太会,但我觉得可以先这样试试。

这个异常超类感觉不行哎,这个状态就是线程不动了,也不抛出异常,占着队列不放。

一直在想搞一个线程定时检查正在运行的线程,如果它到达了某个阶段,并且超过了指定的时间,就中断并移除它,但是线程池里面我不知道如何取出正在运行的线程,getQueue()得出的队列,遍历任务线程检查状态,昨天晚上测试过达不到期望。

大东哥
大东哥
回复 @布谷鸟 : 你的run方法里面只要抛出IOException以外的异常,都会导致close不执行,socket未关闭,流未关闭,我觉得你先处理这个问题先。
布谷鸟
布谷鸟
回复 @大东哥 : 没有试,线程不动的原因随机性很大,有时候跑几分钟就越来越慢然后停掉,有时候跑几个小时才会完全停止。。过程太长,唉
大东哥
大东哥
感觉,还是试过了不行?
0
布谷鸟
布谷鸟
@袁不语 @大东哥 是的,这是剥离出来的代码,在实际应用中添加有 finally块,也希望问题出在这儿,那就简单了。实际应用的类比这个要胖得多,都不知道怎么贴
布谷鸟
布谷鸟
回复 @袁不语 : 多谢回答!前几天时间紧定写得自己都看不下去了,下午部分功能重写了一次,晚上我重新在跑,希望明天早上不会死,那就不用管它了
袁不语
袁不语
你贴出来的部分,我觉得没有问题。
布谷鸟
布谷鸟
回复 @袁不语 : 好的,我来看一下好不好帖这些东西,赶得急,代码写得跟屎一样,找问题都难。
袁不语
袁不语
我觉得像posetMail这样业务代码可以省略,但是run的代码和Executor的相关代码一定得是原始的。
0
1_second_killer
1_second_killer

试着在连接之前,与关闭之后,打印测试信息随机数+当前时间+开始/结束,伪代码如下

a=random()

a+time+“begin”

a+time+”end“

死了之后数数看看是不是有开了的关不上

0
布谷鸟
布谷鸟

引用来自“1_second_killer”的答案

试着在连接之前,与关闭之后,打印测试信息随机数+当前时间+开始/结束,伪代码如下

a=random()

a+time+“begin”

a+time+”end“

死了之后数数看看是不是有开了的关不上

试过打印关闭异常,它是线程老是不结束,感觉被远程服务器脱着一样,netstat -a看到的结果,线程状态也为 ESTABLISHED,就是建立连接之后,一直等,等到死。。
1_second_killer
1_second_killer
额,可能我没太说清楚,要在联网发送邮件之前,与发送完成之后测试
返回顶部
顶部