spring 异步注解调用 durid连接池 自动关闭

无知的小孩 发布于 2016/10/13 11:35
阅读 234
收藏 0

错误信息 

2016-10-13 11:24:20,919 INFO  com.alibaba.druid.pool.DruidDataSource[Thread-1]-{dataSource-2} closed

2016-10-13 11:24:20,957 INFO  com.alibaba.druid.pool.DruidDataSource[Thread-1]-{dataSource-1} closed

使用jars

代码结构

AppTest.java

@SuppressWarnings("all")
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:spring*.xml"})
public class AppTest extends AbstractJUnit4SpringContextTests{

private static final Logger log = Logger.getLogger(AppTest.class); 

@Autowired
private DataProgress dataProgress; 
    
    @Test
    public void testData(){
 
// dataProgress.Read("arc_poor_family");
// dataProgress.Read("arc_poor_family_member");

    dataProgress.Synchronization("arc_poor_family");
    //dataProgress.Synchronization("arc_poor_family_member");
   
    }
    
}
DataProgressImpl.java

@Component
public class DataProgressImpl implements DataProgress{

private static final Logger log = Logger.getLogger(DataProgressImpl.class);

@Autowired
private DataService dataService;

@Async
public void Synchronization(String tName) {
log.info("表["+tName+"]准备开始同步数据");
TableStatu ts = new TableStatu(); 
ts.setStatus("开始");
ts.settName(tName);

int rn = 0;
int pageSize = 1000;

long t1 = System.currentTimeMillis();
log.info("表["+tName+"]开始");
while ("开始".equals(ts.getStatus())) {
log.info("表["+tName+"]同步数据["+rn+"]行开始,["+pageSize+"]条");
try{
dataService.Synchronization(ts, tName, rn, pageSize);
}catch(Exception e) {
e.printStackTrace();
}
rn += pageSize;
}
long t2 = System.currentTimeMillis();
log.info("表["+tName+"]结束,总耗时["+(t2-t1)+"]毫秒");

DataServiceImpl.java

public void Synchronization(TableStatu tableStatu, String tName, int rn,
int pageSize) throws Exception{

log.info("开始读取表["+tName+"]数据,读取中...");
LinkedList<String> lst_meta = new LinkedList<String>();
List<Map<String, Object>> lst_mt = source_jdbc.queryForList(" desc "+tName);
if(lst_mt == null || lst_mt.size() <= 0)
return;
Table tab = new Table();
tab.settName(tName);
for(Map<String, Object> meta:lst_mt)
{
Object obj_field = meta.get("Field");
lst_meta.add(obj_field.toString());
}
tab.setLst_meta(lst_meta);

    List<Map<String, Object>> lst_data = source_jdbc.queryForList(Sql.getOraPgSQL(tName, lst_meta),new Object[]{rn,pageSize});
    tab.setLst_data(lst_data);
   
    if(lst_data != null && lst_data.size() >0)
    {
    log.info("开始读取表["+tName+"]数据,同步中...");
    Write(tName,lst_meta,lst_data);
    log.info("开始读取表["+tName+"]数据,下一次...");
    tableStatu.settName(tName);
    tableStatu.setStatus("开始");
    }else{
    tableStatu.settName(tName);
    tableStatu.setStatus("结束");
    }

}




public void Write(String tName,LinkedList<String> lst_meta,List<Map<String, Object>> lst_data) {
int colSize = lst_meta.size();
StringBuffer insert_SQL = new StringBuffer();
insert_SQL.append("insert into ").append(tName).append(" ( ");
for(int i = 0;i<colSize;i++)
{
if(i > 0)
insert_SQL.append(",");
insert_SQL.append(lst_meta.get(i));
}
insert_SQL.append(" ) values ( ");
for(int i = 0;i<colSize;i++)
{
if(i > 0)
insert_SQL.append(" , ");
insert_SQL.append("?");
}
insert_SQL.append(")");

final List<Map<String, Object>> lst_data1 = lst_data;
final LinkedList<String> lst_meta1 = lst_meta;
final int colSize1 = colSize;
target_jdbc.batchUpdate(insert_SQL.toString(), new BatchPreparedStatementSetter(){

public int getBatchSize() {
return lst_data1.size();
}


public void setValues(PreparedStatement ps, int a)
throws SQLException {

for(int j = 0;j < colSize1;j++)
{
ps.setObject(j+1, lst_data1.get(a).get(lst_meta1.get(j)));
}
}

});

Table.java

public class Table {

/**
* 表名
*/
String tName;

/**
* 字段名
*/
LinkedList<String> lst_meta;

/**
* 数据
*/
List<Map<String, Object>> lst_data;

public Table(){}

public Table(String tName,LinkedList<String> lst_meta,List<Map<String, Object>> lst_data)
{
this.tName = tName;
this.lst_meta = lst_meta;
this.lst_data = lst_data;
}


public String gettName() {
return tName;
}


public void settName(String tName) {
this.tName = tName;
}


public LinkedList<String> getLst_meta() {
return lst_meta;
}


public void setLst_meta(LinkedList<String> lst_meta) {
this.lst_meta = lst_meta;
}


public List<Map<String, Object>> getLst_data() {
return lst_data;
}


public void setLst_data(List<Map<String, Object>> lst_data) {
this.lst_data = lst_data;
}


}

TableStatu.java

public class TableStatu {

public String tName; //表名

public String status; //状态(未开始/开始/结束) 

public TableStatu(){}
public TableStatu(String tName,String status)
{
this.tName = tName;
this.status = status;
}


public String gettName() {
return tName;
}


public void settName(String tName) {
this.tName = tName;
}


public String getStatus() {
return status;
}


public void setStatus(String status) {
this.status = status;
}




}

Sql.java

public class Sql {


public static String getOraPgSQL(String tName,LinkedList<String> lst_meta)
{
String sql = "select * from (select a.*, rownum rn from ( select {0} from {1} ) a where rownum <= ? ) where rn >= ? ";
int colSize = lst_meta.size();
String cols = "";
for(int a = 0;a < colSize ;a++)
{
if(a > 0)
cols += ",";
cols += lst_meta.get(a);
}
return MessageFormat.format(sql, cols,tName);
}

public static String getMySQLPgSQL(String tName,LinkedList<String> lst_meta)
{
String sql = "select {0} from {1} limit ?,? ";
int colSize = lst_meta.size();
String cols = "";
for(int a = 0;a < colSize ;a++)
{
if(a > 0)
cols += ",";
cols += lst_meta.get(a);
}
return MessageFormat.format(sql, cols,tName);
}

}

spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" 
xmlns:context="http://www.springframework.org/schema/context"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans 
      http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
           http://www.springframework.org/schema/aop 
           http://www.springframework.org/schema/aop/spring-aop-3.2.xsd
           http://www.springframework.org/schema/tx 
           http://www.springframework.org/schema/tx/spring-tx-3.2.xsd  
           http://www.springframework.org/schema/context
           http://www.springframework.org/schema/context/spring-context-3.2.xsd
           http://www.springframework.org/schema/task          
  http://www.springframework.org/schema/task/spring-task-3.2.xsd">




<context:annotation-config />
<aop:aspectj-autoproxy expose-proxy="true" proxy-target-class="true"/> 
<task:annotation-driven /> 

<context:component-scan base-package="com.*" />
<context:property-placeholder location="classpath:config.properties" />


<!-- 配置数据源 -->
<bean name="zyfp_dataSource" class="com.alibaba.druid.pool.DruidDataSource"
init-method="init" destroy-method="close">
<property name="url" value="${zyfp_jdbc_url}"></property>
<property name="username" value="${zyfp_jdbc_username}"></property>
<property name="password" value="${zyfp_jdbc_password}"></property>


<property name="maxActive" value="5" />
<property name="initialSize" value="1" />
<property name="maxWait" value="60000" />
<property name="minIdle" value="1" />
<property name="timeBetweenEvictionRunsMillis" value="60000" />
<property name="minEvictableIdleTimeMillis" value="300000" />
<property name="testWhileIdle" value="true" />
<property name="testOnBorrow" value="false" />
<property name="testOnReturn" value="false" />
<property name="validationQuery" value="SELECT 'X' from dual " />
<property name="poolPreparedStatements" value="false" />
<property name="maxPoolPreparedStatementPerConnectionSize" value="50" />
<property name="filters" value="stat" />
<property name="timeBetweenLogStatsMillis" value="300000" /> 
</bean> 

<bean name="source_jdbc" class="org.springframework.jdbc.core.JdbcTemplate">
<constructor-arg name="dataSource" ref="zyfp_dataSource" />
</bean>
 
<bean name="test_dataSource" class="com.alibaba.druid.pool.DruidDataSource"
init-method="init" destroy-method="close">
<property name="url" value="${test_jdbc_url}"></property>
<property name="username" value="${test_jdbc_username}"></property>
<property name="password" value="${test_jdbc_password}"></property>
<property name="maxActive" value="5" />
<property name="initialSize" value="1" />
<property name="maxWait" value="60000" />
<property name="minIdle" value="1" />
<property name="timeBetweenEvictionRunsMillis" value="60000" />
<property name="minEvictableIdleTimeMillis" value="300000" />
<property name="testWhileIdle" value="true" />
<property name="testOnBorrow" value="false" />
<property name="testOnReturn" value="false" />
<property name="validationQuery" value="SELECT 'X' from dual " />
<property name="poolPreparedStatements" value="false" />
<property name="maxPoolPreparedStatementPerConnectionSize" value="50" />
<property name="filters" value="stat" />
<property name="timeBetweenLogStatsMillis" value="300000" /> 
</bean> 

<bean name="target_jdbc" class="org.springframework.jdbc.core.JdbcTemplate">
<constructor-arg name="dataSource" ref="test_dataSource" />
</bean> 

<!-- 配置线程池 -->  
<bean id = "taskExecutor"  class = "org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >  
   <!-- 线程池维护线程的最少数量 -->  
<property name ="corePoolSize" value ="5" />  
   <!-- 线程池维护线程所允许的空闲时间 -->  
<property name ="keepAliveSeconds" value ="30000" />  
   <!-- 线程池维护线程的最大数量 -->  
<property name ="maxPoolSize" value ="1000" />  
   <!-- 线程池所使用的缓冲队列 -->  
<property name ="queueCapacity" value ="200" />  
</bean>


</beans>

加载中
返回顶部
顶部