elastic-job-lite频繁报主键重复错误

开源中国总书记 发布于 2017/12/27 15:30
阅读 655
收藏 0

     今天公司其他项目组使用elastic-job-lite时,发现一只报主键重复。根据错误信息发现是执行JobEventRdbStorage类的insertJobExecutionEventWhenSuccess方法报错。

具体错误信息如下:

com.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException: Duplicate entry '8a39750c-3384-49b9-ad24-a7d18115cec3' for key 'PRIMARY'
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_101]
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_101]
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_101]
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_101]
    at com.mysql.jdbc.Util.handleNewInstance(Util.java:425) ~[mysql-connector-java-5.1.43.jar:5.1.43]
    at com.mysql.jdbc.Util.getInstance(Util.java:408) ~[mysql-connector-java-5.1.43.jar:5.1.43]
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:935) ~[mysql-connector-java-5.1.43.jar:5.1.43]
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3973) ~[mysql-connector-java-5.1.43.jar:5.1.43]
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3909) ~[mysql-connector-java-5.1.43.jar:5.1.43]
    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2527) ~[mysql-connector-java-5.1.43.jar:5.1.43]
    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2680) ~[mysql-connector-java-5.1.43.jar:5.1.43]
    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2494) ~[mysql-connector-java-5.1.43.jar:5.1.43]
    at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858) ~[mysql-connector-java-5.1.43.jar:5.1.43]
    at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1197) ~[mysql-connector-java-5.1.43.jar:5.1.43]
    at com.alibaba.druid.filter.FilterChainImpl.preparedStatement_execute(FilterChainImpl.java:2931) [druid-1.0.31.jar:1.0.31]
    at com.alibaba.druid.filter.FilterEventAdapter.preparedStatement_execute(FilterEventAdapter.java:440) [druid-1.0.31.jar:1.0.31]
    at com.alibaba.druid.filter.FilterChainImpl.preparedStatement_execute(FilterChainImpl.java:2929) [druid-1.0.31.jar:1.0.31]
    at com.alibaba.druid.wall.WallFilter.preparedStatement_execute(WallFilter.java:601) [druid-1.0.31.jar:1.0.31]
    at com.alibaba.druid.filter.FilterChainImpl.preparedStatement_execute(FilterChainImpl.java:2929) [druid-1.0.31.jar:1.0.31]
    at com.alibaba.druid.filter.FilterEventAdapter.preparedStatement_execute(FilterEventAdapter.java:440) [druid-1.0.31.jar:1.0.31]
    at com.alibaba.druid.filter.FilterChainImpl.preparedStatement_execute(FilterChainImpl.java:2929) [druid-1.0.31.jar:1.0.31]
    at com.alibaba.druid.proxy.jdbc.PreparedStatementProxyImpl.execute(PreparedStatementProxyImpl.java:131) [druid-1.0.31.jar:1.0.31]
    at com.alibaba.druid.pool.DruidPooledPreparedStatement.execute(DruidPooledPreparedStatement.java:493) [druid-1.0.31.jar:1.0.31]
    at com.dangdang.ddframe.job.event.rdb.JobEventRdbStorage.insertJobExecutionEventWhenSuccess(JobEventRdbStorage.java:233) [elastic-job-common-core-2.1.4.jar:na]
    at com.dangdang.ddframe.job.event.rdb.JobEventRdbStorage.updateJobExecutionEventWhenSuccess(JobEventRdbStorage.java:206) [elastic-job-common-core-2.1.4.jar:na]
    at com.dangdang.ddframe.job.event.rdb.JobEventRdbStorage.addJobExecutionEvent(JobEventRdbStorage.java:156) [elastic-job-common-core-2.1.4.jar:na]
    at com.dangdang.ddframe.job.event.rdb.JobEventRdbListener.listen(JobEventRdbListener.java:42) [elastic-job-common-core-2.1.4.jar:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_101]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_101]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_101]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_101]
    at com.google.common.eventbus.Subscriber.invokeSubscriberMethod(Subscriber.java:95) [guava-19.0.jar:na]
    at com.google.common.eventbus.Subscriber$1.run(Subscriber.java:80) [guava-19.0.jar:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_101]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_101]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]

项目配置如下:

    1、数据源配置:

@Configuration
@MapperScan(basePackages = "com.qf.repository", sqlSessionTemplateRef = "multiTenantSqlSessionTemplate")
public class MultiTenantDataSourceConfig {


    @Primary
    @Bean(name="multiTenantDataSource")
    @ConfigurationProperties(prefix="multiTenant.datasource")
    public DataSource multiTenantDataSource() {
        return new DruidDataSource();
    }

    @Bean(name = "multiTenantSqlSessionFactory")
    public SqlSessionFactory multiTenantSqlSessionFactory(@Qualifier("multiTenantDataSource") DataSource dataSource) throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        bean.setConfigLocation(new ClassPathResource("mybatis-config.xml"));
        bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:/mapper/multitenant.xml"));
        return bean.getObject();
    }

    @Bean(name = "multiTenantTransactionManager")
    public DataSourceTransactionManager multiTenantTransactionManager(@Qualifier("multiTenantDataSource") DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

    @Bean(name = "multiTenantSqlSessionTemplate")
    public SqlSessionTemplate multiTenantSqlSessionTemplate(@Qualifier("multiTenantSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
        return new SqlSessionTemplate(sqlSessionFactory);
    }

}

2、数据库链接信息配置:

multiTenant:
  datasource:
    url: jdbc:mysql://127.0.0.1:3306/elastic-job?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
    username: root
    password: a123
    driver-class-name: com.mysql.jdbc.Driver
    maxActive: 150 #连接池处于活动状态的数据库连接的最大数目,0表示不限制,表示最大并发
    initialSize: 10 #初始化连接数
    #maxIdle: 50 #连接池处于空闲状态的数据库连接的最大数目,取非正整数表示不受限制,超过此数值时多余的空闲连接将会被释放
    minIdle: 10 #连接池处于空闲状态的数据库连接的最小数目,低于此数值将会创建所欠缺的连接,设0无限制
    #testOnBorrow: true #指定连接被调用时是否经过校验,如果校验未通过,则该连接被连接池断掉,并由连接池尝试调用另一个连接,值为true,则validationQuery参数必须为一个非空字串
    validationInterval: 30000 #检查连接死活的时间间隔,单位:毫秒
    #validationQuery: SELECT 1 #在连接返回给调用者前用于校验连接是否有效的SQL语句,如果指定了SQL语句,则必须为一个SELECT语句,且至少有一行结果
    type: com.alibaba.druid.pool.DruidDataSource
    maxWait: 60000 # 配置获取连接等待超时的时间
    timeBetweenEvictionRunsMillis: 60000 # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
    minEvictableIdleTimeMillis: 300000 # 配置一个连接在池中最小生存的时间,单位是毫秒
    validationQuery: SELECT 1 FROM DUAL
    testOnBorrow: false
    testWhileIdle: true
    testOnReturn: false
    poolPreparedStatements: true # 打开PSCache,并且指定每个连接上PSCache的大小
    maxPoolPreparedStatementPerConnectionSize: 20
    filters: stat,wall,log4j # 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙
    connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000 # 通过connectProperties属性来打开mergeSql功能;慢SQL记录
    useGlobalDataSourceStat: true # 合并多个DruidDataSource的监控数据

3、Job配置:

regCenter:
  serverList: 172.16.5.179:2181,172.16.5.180:2181,172.16.5.181:2181
  namespace: elastic-demo
  

testJob:
  cron: 0/5 * * * * ?
  shardingTotalCount: 1
  shardingItemParameters: 0=10
  jobParameter: 100
  description: 银企直连定时查询Job

spring:
  profiles:
    active: dev

 

 

加载中
0
见习程序猿
见习程序猿

因为之前的executeupdate返回为0可能跟mysql版本有关,也可能跟持久层框架有关,所以为了通用性,我就把程序改成“先做了delete后再做insert”

0
小99
小99
你的表达式时间间隔太短了
开源中国总书记
开源中国总书记
官网给的demo时间也是5秒
0
见习程序猿
见习程序猿

改源码,解决了该问题。

0
开源中国总书记
开源中国总书记

引用来自“见习程序猿”的评论

改源码,解决了该问题。

修改后的代码有吗?

0
见习程序猿
见习程序猿
private boolean insertJobExecutionEventWhenSuccess(final JobExecutionEvent jobExecutionEvent) {
        boolean result = false;
        String sqlDelete = "DELETE FROM " + TABLE_JOB_EXECUTION_LOG + " where id = ?";
        String sqlInsert = "INSERT INTO `" + TABLE_JOB_EXECUTION_LOG + "` (`id`, `job_name`, `task_id`, `hostname`, `ip`, `sharding_item`, `execution_source`, `is_success`, `start_time`, `complete_time`) "
                + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);";
        Connection conn = null;
        PreparedStatement preparedStatement = null;

        try {
            conn = dataSource.getConnection();
            preparedStatement = conn.prepareStatement(sqlDelete);
            preparedStatement.setString(1, jobExecutionEvent.getId());
            preparedStatement.execute();
            preparedStatement = conn.prepareStatement(sqlInsert);
            preparedStatement.setString(1, jobExecutionEvent.getId());
            preparedStatement.setString(2, jobExecutionEvent.getJobName());
            preparedStatement.setString(3, jobExecutionEvent.getTaskId());
            preparedStatement.setString(4, jobExecutionEvent.getHostname());
            preparedStatement.setString(5, jobExecutionEvent.getIp());
            preparedStatement.setInt(6, jobExecutionEvent.getShardingItem());
            preparedStatement.setString(7, jobExecutionEvent.getSource().toString());
            preparedStatement.setBoolean(8, jobExecutionEvent.isSuccess());
            preparedStatement.setTimestamp(9, new Timestamp(jobExecutionEvent.getStartTime().getTime()));
            preparedStatement.setTimestamp(10, new Timestamp(jobExecutionEvent.getCompleteTime().getTime()));
            preparedStatement.execute();
            result = true;
        } catch (final SQLException ex) {
            if (isDuplicateRecord(ex)) {
                return updateJobExecutionEventWhenSuccess(jobExecutionEvent);
            }
            // TODO 记录失败直接输出日志,未来可考虑配置化
            log.error(ex.getMessage());
        }
        return result;
    }

 

返回顶部
顶部