在elastic-job的execute方法中,有List集合的循环判断,可出现ConcurrentModificationException: null异常

clark-lee 发布于 2018/07/31 10:31
阅读 574
收藏 0

异常信息:

java.util.ConcurrentModificationException: null
	at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901) ~[?:1.8.0_66]
	at java.util.ArrayList$Itr.next(ArrayList.java:851) ~[?:1.8.0_66]
	at com.frxs.trade.datasyn.biz.event.schedule.elasticjob.TradeAreaDataSynJob.execute(TradeAreaDataSynJob.java:82) [classes/:?]
	at com.dangdang.ddframe.job.executor.type.SimpleJobExecutor.process(SimpleJobExecutor.java:41) [elastic-job-common-core-2.1.6-frxs.jar:?]
	at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.process(AbstractElasticJobExecutor.java:206) [elastic-job-common-core-2.1.6-frxs.jar:?]
	at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.access$000(AbstractElasticJobExecutor.java:47) [elastic-job-common-core-2.1.6-frxs.jar:?]
	at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor$1.run(AbstractElasticJobExecutor.java:185) [elastic-job-common-core-2.1.6-frxs.jar:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_66]
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [?:1.8.0_66]
	at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_66]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_66]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_66]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_66]

java代码:

/**
     * 区域列表缓存
     */
    @Autowired
    private AreaCacheTool areaCacheTool;

    /**
     * 数据同步线程池
     */
    @Autowired
    private ThreadPoolTaskExecutor tradeDataSynThreadPool;

    /**
     * 定时任务执行方法
     *
     * @param shardingContext 分片上下文
     */
    @Override
    public void execute(ShardingContext shardingContext) {
        // 总分片数
        int totalCount = shardingContext.getShardingTotalCount();

        // 当前分片项
        int item = shardingContext.getShardingItem();

        try {
            // 获取所有未删除的区域
            List<Integer> areaIds = areaCacheTool.queryAllArea();
            if (CollectionUtils.isEmpty(areaIds)) {
                return;
            }

            // 测试代码,开发阶段使用
            areaIds.clear();
            areaIds.add(100);
            areaIds.add(101);
            areaIds.add(102);
            areaIds.add(103);

            LogUtil.debug("区域数据同步任务开始 - 分片:{} - 区域:{}", item, areaIds);

            int size = 0;
            // 是本分片下的区域就创建任务并执行
            CompletionService completion = new ExecutorCompletionService(tradeDataSynThreadPool);
            for (int areaId : areaIds) {
                if (areaId % totalCount == item) {
                    // 每个区域创建一个线程,并开始执行
                    TradeAreaThread tradeAreaThread = new TradeAreaThread(areaId);
                    completion.submit(tradeAreaThread);

                    // 累计子线程数
                    size++;
                }
            }

            // 阻塞拿到执行结果(如果不阻塞线程,会导致本job立马结束,然后下次job会与上次未完成的job串行;不过即便如此,进入区域子线程时也有区域文件锁,防止串行)
            String subResult = getSubResult(completion, size);

            // 打印执行结果
            LogUtil.info("区域数据同步 分片:{},结果:{}", item, subResult);

        } catch (DataSyncRuntimeException dsre) {
            LogUtil.error("区域数据同步 任务处理失败,分片: " + item, dsre);

        } catch (Exception ex) {
            LogUtil.error("区域数据同步 任务处理失败,分片: " + item, ex);
        }
    }

    /**
     * 阻塞-获取子线程运行结果
     *
     * @param completion 批处理对象
     */
    private String getSubResult(CompletionService completion, int size) {

        if (size == 0) {
            return "没有满足条件的区域";
        }

        JSONArray subResult = new JSONArray();

        for (int i = 0; i < size; i++) {
            try {
                // 阻塞-获取区域子线程执行结果
                Object result = completion.take().get();
                if (null != result) {
                    subResult.add(result);
                }
            } catch (Exception ex) {
            }
        }

        return subResult.toJSONString();
    }

 

加载中
OSCHINA
登录后可查看更多优质内容
返回顶部
顶部