Quartz动态创建任务无法注入concurrent属性解决并发问题

sprouting 发布于 10/10 23:04
阅读 180
收藏 0

碰到个问题,搞了一天了,还是没解决,只好问下大家了。

我在这么个功能,动态创建定时任务,只要写业务类和数据库插入一条数据即可。基本功能已经完成,但代码发现一个问题,以前会在xml中写一个配置 :使用MethodInvokingJobDetailFactoryBean生成job,只需要配置属性concurrent=flase 即可保证线程中的任务执行完毕,才会创建新的线程 。

但现在这种动态创建的方式,我就无法注入这个参数了,可能是我没找到,或者姿势错误。

个人尝试过的解决方案:

1 加入 @DisallowConcurrentExecution 注解,无效果,应该和我这种写法有关系

2 在运行的时候找到全部的任务,然后遍历任务ID,如果发现该任务已经在执行,则忽略执行。这种方式也不行,返回的都是正常状态

3 将任务id当做线程名字,遍历全部线程,发现线程存在则忽略。也有问题,首先用了线程池之后不能随便设置名字

4.。。。。。。其他方案

 

下面是核心代码,大家帮忙看下,怎么才能让 比如任务A, 我不确定他执行的快慢,可能是3秒,可能30秒,也可能40秒,这个任务是每20秒执行一次,如果任务A还没执行完,不会继续执行,等执行完后,才会继续。

任务类:

创建定时任务:

 

执行定时任务的类:

@Component
public class ScheduleJobService extends QuartzJobBean {


    @Resource
    private ScheduleJobLogDao scheduleJobLogDao;

    private static Logger log = LoggerFactory.getLogger(ScheduleJobService.class);

    /**
     * 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
     */
    private ExecutorService service = Executors.newSingleThreadExecutor();



    /**
     * 执行定时任务
     * @param context
     */
    @Override
    protected void executeInternal(JobExecutionContext context) {
        ScheduleJob scheduleJob = (ScheduleJob)context.getMergedJobDataMap().get(Constant.JOB_PARAM_KEY);

        //数据库保存执行记录
        ScheduleJobLog scheduleJobLog = new ScheduleJobLog();
        scheduleJobLog.setId(CommonTool.getUUId());
        scheduleJobLog.setJobId(scheduleJob.getId());
        scheduleJobLog.setBeanName(scheduleJob.getBeanName());
        scheduleJobLog.setMethodName(scheduleJob.getMethodName());
        scheduleJobLog.setParams(scheduleJob.getParams());
        scheduleJobLog.setCreateTime(CommonTool.getTimestamp());
        //任务开始时间
        long startTime = System.currentTimeMillis();

        try {
            log.info("任务准备执行,任务ID:" + scheduleJob.getId() + scheduleJob.getBeanName());
            //调用线程
            ScheduleRunnable task = new ScheduleRunnable(scheduleJob.getBeanName(), scheduleJob.getMethodName(), scheduleJob.getParams());
            //设置线程名
            task.setName(scheduleJob.getId());

            //Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。
            Future<?> future = service.submit(task);
            future.get();

            //任务执行总时长
            long times = System.currentTimeMillis() - startTime;
            scheduleJobLog.setTimes((int) times);
            scheduleJobLog.setStatus(Constant.TASK_SUCCESS);

            log.info("任务执行完毕,任务ID:" + scheduleJob.getId() + "  总共耗时:" + times + "毫秒");

        } catch (Exception e){
            log.error("任务执行失败,任务ID:" + scheduleJob.getId(), e);

            //任务执行总时长
            long times = System.currentTimeMillis() - startTime;
            scheduleJobLog.setTimes((int)times);

            scheduleJobLog.setStatus(Constant.TASK_FAIL);
            scheduleJobLog.setError(e.toString());

        } finally {
            scheduleJobLogDao.insert(scheduleJobLog);
        }
    }
}

上面的

ScheduleRunnable 
public class ScheduleRunnable extends Thread{

    /**
     * bean 名
     */
    private Object target;
    /**
     * 方法名
     */
    private Method method;
    /**
     * 参数
     */
    private String params;


    /**
     * 构造函数
     * @param beanName bean的名字
     * @param methodName 方法名
     * @param params 参数
     */
    public ScheduleRunnable(String beanName, String methodName, String params) throws NoSuchMethodException {
        this.target = SpringContextTool.getBean(beanName);
        this.params = params;

        if (CheckTool.checkNull(params)){
            this.method = target.getClass().getDeclaredMethod(methodName);
        } else {
            this.method = target.getClass().getDeclaredMethod(methodName, String.class);
        }
    }

    @Override
    public void run() {
        try {
            ReflectionUtils.makeAccessible(method);
            if (CheckTool.checkNull(params)){
                method.invoke(target);
            } else {
                method.invoke(target, params);
            }
        } catch (Exception  e) {
            throw new ServiceException("执行定时任务失败!" + e);
        }
    }
}

 

加载中
0
妹子楼顶有鸽子
妹子楼顶有鸽子

看下Spring的MethodInvokingJobDetailFactoryBean代码就知道了: concurrent = true的时候,使用MethodInvokingJob,反之使用带有@DisallowConcurrentExecution标记的StatefulMethodInvokingJob,所以重点就变成了如何让你的Job的类具有@DisallowConcurrentExecution标记标识

 

 

妹子楼顶有鸽子
妹子楼顶有鸽子
回复 @sprouting : 另外对Quartz而言,应该是把你数据库的每一行任务都注册成Quartz的Job,而不是执行的时候扫描这个表进行所有的任务在同一个Job的执行。如果确实需要扫描这个表进行所有的任务在同一个Job的执行,那就给任务加一个任务状态来进行区分
妹子楼顶有鸽子
妹子楼顶有鸽子
回复 @sprouting : MethodInvokingJobDetailFactoryBean这个不也是反射吗?它为何可以?反射只是最终的执行策略,给你的反射的方法加一层Job的代理不就可以了吗?
sprouting
sprouting
不行哦,直接给job添加 DisallowConcurrentExecution 注解并没有用,代码中调用相应方法是走的反射的方式,怀疑Quartz根本无法管理这样的类,并发设置也就失去了作用。 this.method = target.getClass().getDeclaredMethod(methodName); method.invoke(target);
返回顶部
顶部