
项目背景:
cmpp2 多线程发送短信,每秒200-400提交速率(可控制),也可控制线程数如:十个线程执行发送,那么每个线程的发送数=200/10(/秒)
技术实现:
1、使用一个主线程 在启动时候 创建 4个 LinkedBlockingQueue、分别来 管理和执行 抓取待发送数据、存入发送后数据、存入回执数据、存入回复数据。
2、在主线程启动时候 会 使用newFixedThreadPool(threadNum) 线程池 ,for (threadNum ) 来submit 发送线程,在发送线程里面 使用 while(true) 执行发送任务,出错重连、时间速率控制。
问题:
1、cpu 使用率太高,一直95% 、这才一个通道 一个协议 如果 接入 上百个都开多线程的话,不敢想。
2、如果 去掉while(true) 死循环 cpu使用率应该可以下降不少、但是无法保证 速率/时间、以及重连的时间及时性。
代码:
主线程:
随系统启动...
cmpp2= Executors.newFixedThreadPool(threadNum); for (int i = 1; i <= threadNum; i++) { threadTime.put("cmpp2Td"+i+"time", "");//String.valueOf(System.currentTimeMillis())); threadTime.put("cmpp2Td"+i+"num", String.valueOf(singleRate)); SendMsgByCmpp2DBService s = new SendMsgByCmpp2DBService(sms,"cmpp2Td"+i,smsSendRecordService); s.speed = singleRate; // s.start(); map.put("cmpp2_"+i, s); cmpp2.submit(s); }
任务线程:
public void run() { smsRecord = new SmsSendRecord(); interval = interval/Rate; speed = Rate; while (true) { if (!this.is_login) { this.login(); SendMsgCMPP2ThredPoolByDB. is_popData = false; continue; } if(this.smp.getConnState()!= null){ SendMsgCMPP2ThredPoolByDB. is_popData = false; continue; } if(!SendMsgCMPP2ThredPoolByDB. is_popData) SendMsgCMPP2ThredPoolByDB. is_popData = true; currentTime = System.currentTimeMillis(); tCurrentTime = System.currentTimeMillis(); int currentTimeMinutes = new Date(tCurrentTime).getMinutes(); int lastTimeMinutes = new Date(tLastTime).getMinutes(); if ((tCurrentTime-tLastTime)>=1000 || currentTimeMinutes != lastTimeMinutes){ tCount = 0; tLastTime=tCurrentTime; } else { if (tCount >= speed) { continue; } } if (tCount <= speed) { tCount++; map =(Map)JSONObject.parse(SendMsgCMPP2ThredPoolByDB.ZhangYi.poll()); if (map == null){ continue; } if(map.get("DestTerminalID")==null||map.get("SrcTerminalID")==null||map.get("BatchNo")==null||map.get("ExpandNo") == null ||map.get("Priority") == null ||map.get("MsgContent") == null) continue; smsRecord.setPhone(map.get("DestTerminalID").toString()); smsRecord.setShow_phone(map.get("SrcTerminalID").toString()); smsRecord.setSub_no(map.get("BatchNo").toString()); smsRecord.setCustomer_name(map.get("ExpandNo").toString()); smsRecord.setChannel_name(map.get("Priority").toString()); smsRecord.setContent(map.get("MsgContent").toString()); smsRecord.setCreate_date(new Date(tLastTime)); this.sendMsg(smsRecord); } else { tLastTime=currentTime; } initCount++; } }
请各大神解惑、指出待优化点/////////万分感谢!!
1、如果想去掉while(true),可以考虑通知实现;
2、关于自动重连的问题,可以考虑重发送逻辑中抽离出来,采用心跳检测完成;
3、另外发送速率统计部分也应该抽离出来。
4、上多通道要考虑资源使用可控。
5、实在不行按照业务拆分成多模块,用redis 或mq类的扩展一下架构设计;
我的思路是:一个主线程,多个任务子线程。
主线程有一层while(true),这个循环是不断的扫描LinkedBlockingQueue是否有数据,有则交个任务子线程(也就是你这里定义的线程池)处理,而不是像你这样每个子任务线程都有一个while(true)
引用来自“K袁”的评论
我的思路是:一个主线程,多个任务子线程。
主线程有一层while(true),这个循环是不断的扫描LinkedBlockingQueue是否有数据,有则交个任务子线程(也就是你这里定义的线程池)处理,而不是像你这样每个子任务线程都有一个while(true)
java 的线程数量大致要和cpu数量一致,并不是越多越快,线程调度是很消耗时间的。要用好多线程,就需要设计出好的多线程业务模型,不恰当的sleep和block是性能的噩梦。利用好LinkedBlockingQueue,队列空闲时读队列的线程会释放cpu。利用消息触发后续线程工作,就没必要使用while(true)来不停的扫描。
引用来自“OS小小小”的评论
怎么 没人来呀 @中山野鬼抬举我了。c++ 我还敢对不知深浅的人说,“权当我不懂”,java真心只是学过,没有实际工程上的经验。哈。而且我是c的思维,面对c适合的应用开发,是反对使用线程的。基本思维是,执行模块的生命周期不以任务为决定,同类的执行模块,可根据物理硬核数量,形成对应独立多个进程,但绝对不会同类的任务独立对应多个线程。哈。所以java这类面向线程的设计,没办法参与讨论。设计应用目标不同,系统组织策略自然有异。
唯一的建议是:永远不要依赖工具,特别是所谓的垃圾资源处理回收机制,无论它做的再好,一旦你依赖,必然你的代码,在不久的将来会因为系统设计规模的变大,而变的垃圾。哈。
听不懂的随便喷,希望听懂的,能记得这个观点,这不是我一个人的观点。
@乌龟壳 : 几种方法都可以,第一是按照计算步骤,每个进程处理一个步骤,然后切换共享空间(这没有数据传递逻辑上的额外开销),就是流水思维。第二个是block的思维,同样的几个进程负责相同计算,但负责不同片区。同时存在另一类的进程是对前期并发处理完的工作进行边界处理。 你这个例子体现不出进程和线程的差异的。
如果非要考虑进程和线程在片内cache的差异,如果没记错(错了大家纠正哈),进程之间的共享是在二级缓存之间吧。即便线程能做到一级缓存之间的共享,但对于这种大批量像素的计算,用进程仍然是使用 dma,将数据成块载入一级缓存区域进行处理,而这个载入工作和计算工作是同步的。不会有额外太多的延迟。
你举的这个例子,还真好是我以前的老本行。再说了。像素计算,如今都用专用计算处理器了吧。还用x86或arm来处理,不累死啊。哈。
而且这种东西java不适合,同样的处理器,用c写,基本可以比java快1到2倍。因为c可以直接根据硬件特性和计算逻辑特点有效调度底层硬件驱动方式。而java即便你用了底层优化的官方库,仍然不能保证硬件与计算目标特性的高度整合。
@乌龟壳 ,数据分两种,一种和算法或处理相关的。一种是待处理的数据。 前者,不应该共享,后者属于数据加工流程,必然存在数据传递或流动,最低成本的传递/流动方式就是共享内存,交替使用权限的思路。 但这仅仅针对待加工的数据和辅助信息,而不针对程序本身。 进程不会搞混乱这些东西特别是(待加工数据的辅助信息),而线程,就各种乱吧。哈。
进程之间,虽然用共享空间,但它本质是数据传递/流动,当你采用多机(物理机器)并发处理时,进程移动到另外一个物理主机,则共享空间就是不能选择的传递/流动方式了。但线程就没有这些概念。