8
回答
Java线程池设计疑问
利用AWS快速构建适用于生产的无服务器应用程序,免费试用12个月>>>   

设计一个线程池,这个线程池的任务有多种状态(类似短信的发送中、已发送、送达、已读等等),需要保证同一个任务的不同状态在一个线程处理,以保证有序处理。业务上比较适合使用 CachedThreadPool。

问题在于怎么保证同一个任务的不同状态在一个线程处理呢。

举报
Freecs
发帖于3年前 8回/518阅

以下是问题补充:

  • @Freecs :如果直接从线程池取线程来更新就可能出现一个线程先更新为送达了,然后另一个线程又更新为已发送,状态就乱了。所以需要保证后面的状态在一个线程按顺序来处理。请大家支招 (3年前)
共有8个答案 最后回答: 3年前

我想并不是线程池的问题,问题是同一任务不可并行处理。简单的处理方式就是在处理时,同任务加lock,用一个线程池。

如:

任务:class Task{ ReentrantLock lock=new ReentrantLock()   }

处理:try{ task.getLock().lock();  //do something  }finally{ task.getLock().unlock();}

这种简单处理有个小问题,就是如果处理时间长,状态多时,会占用线程池数量。

复杂的可能需要引入队列概念。

--- 共有 1 条评论 ---
Freecs是的,单任务性能有瓶颈。 需要加入队列,后面由ThreadPool来处理队列中的资源。过程中遇到了问题:同一个短信任务,状态变化(比如已发送、送达两个)很接近(1ms左右),这时候Pool没控制的话,CPU调度时,就先处理了送达的,然后再去处理已发送的,导致顺序乱套了。 本来想在每个任务加Lock 来控制,但任务多lock控制不好就死锁了 3年前 回复

用线程池 +任务方式本质上就是任务无状态(指跟执行者无关)以提高多个并发度的,现在需求又是要求任务跟worker关联,当然不行了.

既然必须跟线程有关,那就定义好并发数N,直接生成条线程,线程对象自身携带任务队列,然后在线程中自己循环处理自己的队列就好了.

--- 共有 2 条评论 ---
szf回复 @Freecs : 那就用N个单实例线程池好了: 看看API文档 java.util.concurrent.ThreadPoolExecutor 3年前 回复
Freecs确实,目前的做法就是直接指定N个线程。处理队列中的资源。有个缺点就是线程从最开始就new出来了N个,线程中是阻塞的,有信号才去队列取出数据处理,想在这里看能否用到线程池。 3年前 回复
参考netty4如何保证一个链路始终被同一个线程处理,让封装的任务记录最初处理的线程,待下次处理时判断是否为上次那个线程即可
--- 共有 1 条评论 ---
Freecs多谢! 3年前 回复

有这样两个设计,你可以考虑一下:

第一种,两级队列

1、将状态变换定义为操作(包含了将什么数据从什么状态转换成什么状态),然后定义一个pending操作队列,和一个available操作队列;

2、每次有相关操作过来时,首先丢进一个pending操作队列中;

3、使用几个线程轮流检查pending的操作中是否有可执行的(即该操作对应的数据状态可以被该操作安装状态机跳转的规则进行变换),若有则扔进available操作队列中,否则扔回pending队列末尾;

4、使用另一个线程池处理available队列中的操作;

这样的话,应该可以满足任务必须按状态机跳转处理的要求,但是有一个小问题就是:pending队列中如果堆积任务过多,有可能因为后处理操作比先处理操作更早进入pending队列,导致后处理操作被扔回pending队列末尾而有很大的等待延迟。所以pending队列需要设计好优先级排序的算法。

第二种,任务聚合

这个设计基于这样一个认识:每个数据的处理流程和速度相差不大,而且如果一个数据(比如短信的状态)优先开始被处理(第一次状态改变),那么理想情况下它也会被优先处理完(即进入最后的状态)。这样的话,我们可以将对同一个数据进行的操作进行聚合,聚合为一个以数据为核心的大任务:

1、任务队列以数据为单位,保存要被和正在被处理的数据;

2、数据中有对应的一个等待处理的操作队列,包含要对它进行的操作,每当来了一个操作请求时,都把它放到对应的数据的操作队列中;

3、线程池从任务队列中取任务时,依次遍历任务列表:检查每个数据的操作队列中是否有满足当前数据状态和状态机跳转规则的操作,即可以被处理的操作,有则取出处理并标记,没有则检查下一个数据;

4、如果没有可处理的操作,则所有线程pending。

另外,如果你的数据和业务逻辑合适话,也可以将上述两种方式融合一下。有道云笔记的webclipper的后台处理逻辑就和你这个问题类似,而且还有一些别的考虑,最终的设计方案也是上述两种方案的一种融合,吞吐率和有效执行率都不错。


--- 共有 1 条评论 ---
Freecs没错,如果单纯使用两级队列,感觉吞吐率不够,pending队列会过度的堆积,影响性能。 任务聚合的方式没有太明白,再消化一下。 3年前 回复

引用来自“方武卓”的评论

有这样两个设计,你可以考虑一下:

第一种,两级队列

1、将状态变换定义为操作(包含了将什么数据从什么状态转换成什么状态),然后定义一个pending操作队列,和一个available操作队列;

2、每次有相关操作过来时,首先丢进一个pending操作队列中;

3、使用几个线程轮流检查pending的操作中是否有可执行的(即该操作对应的数据状态可以被该操作安装状态机跳转的规则进行变换),若有则扔进available操作队列中,否则扔回pending队列末尾;

4、使用另一个线程池处理available队列中的操作;

这样的话,应该可以满足任务必须按状态机跳转处理的要求,但是有一个小问题就是:pending队列中如果堆积任务过多,有可能因为后处理操作比先处理操作更早进入pending队列,导致后处理操作被扔回pending队列末尾而有很大的等待延迟。所以pending队列需要设计好优先级排序的算法。

第二种,任务聚合

这个设计基于这样一个认识:每个数据的处理流程和速度相差不大,而且如果一个数据(比如短信的状态)优先开始被处理(第一次状态改变),那么理想情况下它也会被优先处理完(即进入最后的状态)。这样的话,我们可以将对同一个数据进行的操作进行聚合,聚合为一个以数据为核心的大任务:

1、任务队列以数据为单位,保存要被和正在被处理的数据;

2、数据中有对应的一个等待处理的操作队列,包含要对它进行的操作,每当来了一个操作请求时,都把它放到对应的数据的操作队列中;

3、线程池从任务队列中取任务时,依次遍历任务列表:检查每个数据的操作队列中是否有满足当前数据状态和状态机跳转规则的操作,即可以被处理的操作,有则取出处理并标记,没有则检查下一个数据;

4、如果没有可处理的操作,则所有线程pending。

另外,如果你的数据和业务逻辑合适话,也可以将上述两种方式融合一下。有道云笔记的webclipper的后台处理逻辑就和你这个问题类似,而且还有一些别的考虑,最终的设计方案也是上述两种方案的一种融合,吞吐率和有效执行率都不错。


任务聚合就是这样一个设计:
Class Job {
  int id;// 短信id
  State state; // 短信状态
  List<Op> oplist;
}
List<Job> joblist;
Map<Integer, Job> jobmap;
每当有一个短信需要处理时,就生成一个Job放到joblist里,当产生了一个对这个短信进行的操作op时,jobmap.get(id).add(op)。而线程池每次遍历joblist,检查这个job里的state状态和oplist,如果有可执行的op,则取出执行,否则一直遍历整个joblist。为了防止有job的op等待时间过长导致每次遍历joblist都得遍历的非常靠后造成延迟,可以使用新旧队列的设计,将新加入的job和旧job区分,一直没有可执行op的job扔到旧队列中去,这样每次取op时通过权衡遍历新队列和旧队列就很快可以得到可执行的op

引用来自“方武卓”的评论

有这样两个设计,你可以考虑一下:

第一种,两级队列

1、将状态变换定义为操作(包含了将什么数据从什么状态转换成什么状态),然后定义一个pending操作队列,和一个available操作队列;

2、每次有相关操作过来时,首先丢进一个pending操作队列中;

3、使用几个线程轮流检查pending的操作中是否有可执行的(即该操作对应的数据状态可以被该操作安装状态机跳转的规则进行变换),若有则扔进available操作队列中,否则扔回pending队列末尾;

4、使用另一个线程池处理available队列中的操作;

这样的话,应该可以满足任务必须按状态机跳转处理的要求,但是有一个小问题就是:pending队列中如果堆积任务过多,有可能因为后处理操作比先处理操作更早进入pending队列,导致后处理操作被扔回pending队列末尾而有很大的等待延迟。所以pending队列需要设计好优先级排序的算法。

第二种,任务聚合

这个设计基于这样一个认识:每个数据的处理流程和速度相差不大,而且如果一个数据(比如短信的状态)优先开始被处理(第一次状态改变),那么理想情况下它也会被优先处理完(即进入最后的状态)。这样的话,我们可以将对同一个数据进行的操作进行聚合,聚合为一个以数据为核心的大任务:

1、任务队列以数据为单位,保存要被和正在被处理的数据;

2、数据中有对应的一个等待处理的操作队列,包含要对它进行的操作,每当来了一个操作请求时,都把它放到对应的数据的操作队列中;

3、线程池从任务队列中取任务时,依次遍历任务列表:检查每个数据的操作队列中是否有满足当前数据状态和状态机跳转规则的操作,即可以被处理的操作,有则取出处理并标记,没有则检查下一个数据;

4、如果没有可处理的操作,则所有线程pending。

另外,如果你的数据和业务逻辑合适话,也可以将上述两种方式融合一下。有道云笔记的webclipper的后台处理逻辑就和你这个问题类似,而且还有一些别的考虑,最终的设计方案也是上述两种方案的一种融合,吞吐率和有效执行率都不错。


引用来自“方武卓”的评论

任务聚合就是这样一个设计:
Class Job {
  int id;// 短信id
  State state; // 短信状态
  List<Op> oplist;
}
List<Job> joblist;
Map<Integer, Job> jobmap;
每当有一个短信需要处理时,就生成一个Job放到joblist里,当产生了一个对这个短信进行的操作op时,jobmap.get(id).add(op)。而线程池每次遍历joblist,检查这个job里的state状态和oplist,如果有可执行的op,则取出执行,否则一直遍历整个joblist。为了防止有job的op等待时间过长导致每次遍历joblist都得遍历的非常靠后造成延迟,可以使用新旧队列的设计,将新加入的job和旧job区分,一直没有可执行op的job扔到旧队列中去,这样每次取op时通过权衡遍历新队列和旧队列就很快可以得到可执行的op
Thanks a lot!
顶部