java 多线程资源占用高

OS小小小 发布于 2016/06/17 15:39
阅读 900
收藏 4


项目背景:

cmpp2 多线程发送短信,每秒200-400提交速率(可控制),也可控制线程数如:十个线程执行发送,那么每个线程的发送数=200/10(/秒)

技术实现:

1、使用一个主线程 在启动时候  创建 4个 LinkedBlockingQueue、分别来 管理和执行  抓取待发送数据、存入发送后数据、存入回执数据、存入回复数据。

2、在主线程启动时候 会 使用newFixedThreadPool(threadNum) 线程池 ,for (threadNum ) 来submit 发送线程,在发送线程里面 使用 while(true) 执行发送任务,出错重连、时间速率控制。

问题:

1、cpu 使用率太高,一直95% 、这才一个通道 一个协议 如果 接入 上百个都开多线程的话,不敢想。

2、如果 去掉while(true) 死循环 cpu使用率应该可以下降不少、但是无法保证 速率/时间、以及重连的时间及时性。

代码:

主线程:

随系统启动...


cmpp2= Executors.newFixedThreadPool(threadNum);
        for (int i = 1; i <= threadNum; i++)
        {
            threadTime.put("cmpp2Td"+i+"time", "");//String.valueOf(System.currentTimeMillis()));
            threadTime.put("cmpp2Td"+i+"num", String.valueOf(singleRate));
            SendMsgByCmpp2DBService s = new SendMsgByCmpp2DBService(sms,"cmpp2Td"+i,smsSendRecordService);
            s.speed = singleRate;
//            s.start();
            map.put("cmpp2_"+i, s);
           cmpp2.submit(s);
        }



任务线程:

public void run()
    {
        
        smsRecord = new SmsSendRecord();
        interval = interval/Rate;
        speed = Rate;
        while (true)
        {
            if (!this.is_login)
            {
              
                this.login();
               SendMsgCMPP2ThredPoolByDB. is_popData = false;
                continue;
            }
            if(this.smp.getConnState()!= null){
                SendMsgCMPP2ThredPoolByDB. is_popData = false;
                continue;
            }
            if(!SendMsgCMPP2ThredPoolByDB. is_popData)
            SendMsgCMPP2ThredPoolByDB. is_popData = true;
          
            
           
            
          
               
                currentTime = System.currentTimeMillis();
                
                tCurrentTime = System.currentTimeMillis(); 
                int currentTimeMinutes = new Date(tCurrentTime).getMinutes();
                int lastTimeMinutes = new Date(tLastTime).getMinutes();
                
                if ((tCurrentTime-tLastTime)>=1000 || currentTimeMinutes != lastTimeMinutes){
                    tCount = 0;
                    tLastTime=tCurrentTime;
                } else {
                    if (tCount >= speed) {
                        continue;
                    }
                }
                
                if (tCount <= speed) {
                    tCount++;
                    map =(Map)JSONObject.parse(SendMsgCMPP2ThredPoolByDB.ZhangYi.poll());
                    
                    
                    if (map == null){
                        continue;
                    }
                    if(map.get("DestTerminalID")==null||map.get("SrcTerminalID")==null||map.get("BatchNo")==null||map.get("ExpandNo") == null ||map.get("Priority") == null ||map.get("MsgContent") == null)
                        continue;
                    smsRecord.setPhone(map.get("DestTerminalID").toString());
                    smsRecord.setShow_phone(map.get("SrcTerminalID").toString());
                    smsRecord.setSub_no(map.get("BatchNo").toString());
                    smsRecord.setCustomer_name(map.get("ExpandNo").toString());
                    smsRecord.setChannel_name(map.get("Priority").toString());
                    smsRecord.setContent(map.get("MsgContent").toString());
                    smsRecord.setCreate_date(new Date(tLastTime));
                      this.sendMsg(smsRecord);
                } else {
                    tLastTime=currentTime;
                }
                initCount++;
            
            
  
            
            
            
            
            
            
            
            
        }
    
    }



请各大神解惑、指出待优化点/////////万分感谢!!


加载中
0
0
martinyuan
martinyuan

1、如果想去掉while(true),可以考虑通知实现;

2、关于自动重连的问题,可以考虑重发送逻辑中抽离出来,采用心跳检测完成;

3、另外发送速率统计部分也应该抽离出来。

4、上多通道要考虑资源使用可控。

5、实在不行按照业务拆分成多模块,用redis 或mq类的扩展一下架构设计;

martinyuan
martinyuan
回复 @OS小小小 : map =(Map)JSONObject.parse(SendMsgCMPP2ThredPoolByDB.ZhangYi.take()); 换成take,阻塞线程,试试。
martinyuan
martinyuan
回复 @OS小小小 : 1、通知只是告知队列里有新的数据需要处理了; 5、内存队列换成redis队列 实现成本增加,但是可扩展性增加;
OS小小小
OS小小小
1、通知实现的话 ,岂不是 无法保证 最少发送么,又会陷入另一个问题中 是吗? 或者是我的想法不对么? 2、嗯,这一块可以这样做。谢谢你 3、速率统计这里 我目前想不到怎么抽离、既可以控制到位,又可以保证不影响。。。 5、redis 是有的 但是 redis的队列的话 跟我这个 没啥区别吧,可能速度更快一点。
0
_________0
_________0
while(true) 里面 没数据最起码要休眠啊,不停死循环操作,又没有休眠cpu不高才怪
_________0
_________0
回复 @OS小小小 : 休眠是必须的,只是前面有数据进来,可以用wait notify 的思路通知,思路就是这样,CountDownLatch 之类多线程通讯也可以实现有数据来就能立即处理的功能
OS小小小
OS小小小
嗯,目前在测试 排除没有数据的情况,所以这一块没有去让他休眠,后面会加进去。 就针对于目前这种情况,有啥好办法吗
0
空无一长物

我的思路是:一个主线程,多个任务子线程。

主线程有一层while(true),这个循环是不断的扫描LinkedBlockingQueue是否有数据,有则交个任务子线程(也就是你这里定义的线程池)处理,而不是像你这样每个子任务线程都有一个while(true)

ihuotui
ihuotui
这才是对的做法
OS小小小
OS小小小
嗯,这思路可以。谢谢哈
0
shijacky
shijacky

引用来自“K袁”的评论

我的思路是:一个主线程,多个任务子线程。

主线程有一层while(true),这个循环是不断的扫描LinkedBlockingQueue是否有数据,有则交个任务子线程(也就是你这里定义的线程池)处理,而不是像你这样每个子任务线程都有一个while(true)

正确做法. 还有就是LinkedBlockingQueue 本身阻塞的,while(true)没问题,主要在于不需要每个发送线程都去block
0
不日小鸡
while(true)不加休眠就会这样
0
王斌_
王斌_

java 的线程数量大致要和cpu数量一致,并不是越多越快,线程调度是很消耗时间的。要用好多线程,就需要设计出好的多线程业务模型,不恰当的sleep和block是性能的噩梦。利用好LinkedBlockingQueue,队列空闲时读队列的线程会释放cpu。利用消息触发后续线程工作,就没必要使用while(true)来不停的扫描。

不日小鸡
@蓝水晶飞机 看到你要比牛逼,我就没有兴趣跟你说话了
蓝水晶飞机
蓝水晶飞机
回复 @不日小鸡 : 我就是装逼怎么啦,特么的装逼装出样子来的,起码也比你牛逼啊。
不日小鸡
回复 @蓝水晶飞机 : 你说这话不能掩盖你没有回复我的问题又来回复我导致装逼失败的事实。 那你不是楼主你回复我干什么,还不是回答我的问题。 不要装逼了好么,装多就成傻逼了
蓝水晶飞机
蓝水晶飞机
回复 @不日小鸡 : 此贴楼主不是你,装什么逼。
不日小鸡
回复 @王斌_ : 这些我都知道,我的意思是你这样回复可能会误导其他看帖子的人或者新手,让他们以为线程数就等于CPU数
下一页
0
中山野鬼
中山野鬼

引用来自“OS小小小”的评论

怎么 没人来呀 @中山野鬼

抬举我了。c++ 我还敢对不知深浅的人说,“权当我不懂”,java真心只是学过,没有实际工程上的经验。哈。而且我是c的思维,面对c适合的应用开发,是反对使用线程的。基本思维是,执行模块的生命周期不以任务为决定,同类的执行模块,可根据物理硬核数量,形成对应独立多个进程,但绝对不会同类的任务独立对应多个线程。哈。所以java这类面向线程的设计,没办法参与讨论。设计应用目标不同,系统组织策略自然有异。

唯一的建议是:永远不要依赖工具,特别是所谓的垃圾资源处理回收机制,无论它做的再好,一旦你依赖,必然你的代码,在不久的将来会因为系统设计规模的变大,而变的垃圾。哈。

听不懂的随便喷,希望听懂的,能记得这个观点,这不是我一个人的观点。


乌龟壳
乌龟壳
给100万像素做插值运算进行染色特效,请问单线程怎么做比多线程快?
0
中山野鬼
中山野鬼

@乌龟壳 : 几种方法都可以,第一是按照计算步骤,每个进程处理一个步骤,然后切换共享空间(这没有数据传递逻辑上的额外开销),就是流水思维。第二个是block的思维,同样的几个进程负责相同计算,但负责不同片区。同时存在另一类的进程是对前期并发处理完的工作进行边界处理。 你这个例子体现不出进程和线程的差异的。

如果非要考虑进程和线程在片内cache的差异,如果没记错(错了大家纠正哈),进程之间的共享是在二级缓存之间吧。即便线程能做到一级缓存之间的共享,但对于这种大批量像素的计算,用进程仍然是使用 dma,将数据成块载入一级缓存区域进行处理,而这个载入工作和计算工作是同步的。不会有额外太多的延迟。

你举的这个例子,还真好是我以前的老本行。再说了。像素计算,如今都用专用计算处理器了吧。还用x86或arm来处理,不累死啊。哈。

而且这种东西java不适合,同样的处理器,用c写,基本可以比java快1到2倍。因为c可以直接根据硬件特性和计算逻辑特点有效调度底层硬件驱动方式。而java即便你用了底层优化的官方库,仍然不能保证硬件与计算目标特性的高度整合。

乌龟壳
乌龟壳
回复 @中山野鬼 : 简单来说,你的多个进程处理结果进行汇总的时候,是不是要做内存复制操作?如果是多线程天然就不用,多进程用系统的共享内存机制也不用,问题是既然用了共享内存,和多线程就没区别了。
中山野鬼
中山野鬼
回复 @乌龟壳 : 两回事哦。共享空间是独立的,而线程如果我没记错,全局变量,包括文件内的(静态变量)是共享的。不同线程共享同一个进程内的变量嘛。这些和业务逻辑相关的东西,每个线程又是独立一套业务逻辑,针对c语言,这样去设计,不是没事找事嘛。面向对象语言,这块都帮你处理好了,自然没有关系。
乌龟壳
乌龟壳
既然有共享空间了,那你所说的进程和线程实际就是一回事了。
0
中山野鬼
中山野鬼

@乌龟壳   ,数据分两种,一种和算法或处理相关的。一种是待处理的数据。 前者,不应该共享,后者属于数据加工流程,必然存在数据传递或流动,最低成本的传递/流动方式就是共享内存,交替使用权限的思路。 但这仅仅针对待加工的数据和辅助信息,而不针对程序本身。 进程不会搞混乱这些东西特别是(待加工数据的辅助信息),而线程,就各种乱吧。哈。

进程之间,虽然用共享空间,但它本质是数据传递/流动,当你采用多机(物理机器)并发处理时,进程移动到另外一个物理主机,则共享空间就是不能选择的传递/流动方式了。但线程就没有这些概念。

乌龟壳
乌龟壳
回复 @中山野鬼 : 是啊,java天然就不是像C一样对汇编的包装。
中山野鬼
中山野鬼
@乌龟壳 面向企业级的各种业务,java这些没问题的。而且更有优势,面向计算设备特性的设计开发,就不行了。哈。
乌龟壳
乌龟壳
回复 @中山野鬼 : 也算各有场景吧,java同样可以多进程可以分布式来降低多线程的风险。java也可以静态编译成目标机器码。总之事在人为。
中山野鬼
中山野鬼
回复 @乌龟壳 : 高手,啥都可以,低手,依赖这些,就是各种想当然。哈哈。
乌龟壳
乌龟壳
回复 @中山野鬼 : 那针对java的垃圾回收,这个东西是可以调节它算法的,不算依赖工具吧,哈。不然依赖C语言语法也算依赖工具咯。哈。;-p
下一页
返回顶部
顶部