HBase并发批量入库测试的时候,入库的速度越来越慢

big_peng 发布于 03/13 12:17
阅读 275
收藏 0

在使用HBase作为数据库的时候,由于需要每秒将1w条数据同时存到数据库中
在进行测试的时候,使用了多线程进行并发数据的模拟产生,并使用了HBase的批量插入
把程序jar包放到服务器集群上运行的时候,刚开始还可以正常的入库,运行一两分钟之后就会报错

后来发现是数据入库消耗的时间大于提交数据的时间,就会造成数据阻塞
不知道为什么HBase在进行高频数据并发插入的时候效率会越来越慢
下面是测试时使用的代码

package com.thread;

import java.io.IOException;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;

import com.hbasepool.HBaseConnectionPool2;

public class ListThread {
	public static void main(String[] args) throws Exception {
		Connection conn = HBaseConnectionPool2.getConnection();
		System.out.println("------------------开始高并发批量插入------------------");
		//final List<Put> putList = Collections.synchronizedList(new ArrayList<Put>());
		
		final List<Put> putList = new CopyOnWriteArrayList<Put>();
		//final List<Long> startTime = new CopyOnWriteArrayList<Long>();
		final Table table = conn.getTable(TableName.valueOf("Dust_ID".getBytes()));
		for (int i = 1; i <= 4000; i++) {
			new Thread() {
				public void run() {
					boolean temp = true;
					ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
					while(true){
						// TODO Auto-generated method stub
						//long st = System.currentTimeMillis();
						Put put = new Put(((int)(Math.random()*6)+""+(int)(Math.random()*10)+"-"+System.currentTimeMillis()+"-"+Thread.currentThread().getName()).getBytes());
						put.addColumn("info".getBytes(), "UnitTime".getBytes(), (Math.random()*100+"5").getBytes());
						put.addColumn("info".getBytes(), "UnitTime1".getBytes(), (Math.random()*100+"5").getBytes());
						put.addColumn("info".getBytes(), "UnitTime2".getBytes(), (Math.random()*100+"5").getBytes());
						put.addColumn("info".getBytes(), "UnitTime3".getBytes(), (Math.random()*100+"5").getBytes());
						put.addColumn("info".getBytes(), "UnitTime4".getBytes(), (Math.random()*100+"5").getBytes());
						putList.add(put);
						//System.out.println((System.currentTimeMillis()-st)*1.0/1000);
						/*if(putList.size()==1){
							startTime.add(System.currentTimeMillis());
						}*/
						if(putList.size()==4000){
							cachedThreadPool.execute(new Thread(){
								public synchronized void run(){
									System.out.println("------------------开始入库------------------");
									long start = System.currentTimeMillis();
									try {
										table.put(putList);
										//query();
									} catch (Exception e) {
										// TODO Auto-generated catch block
										e.printStackTrace();
									}
									putList.clear();
									System.out.println("入库消耗时间"+((System.currentTimeMillis()-start)*1.0/1000));
									System.out.println("满了,倒出来");
									/*System.out.println("总消耗时间"+((System.currentTimeMillis()-startTime.get(0))*1.0/1000));
									startTime.clear();
									System.out.println("------------------结束高并发批量插入------------------");*/
								}
							});
						}
						try{
							if(temp){
								temp = false;
								Thread.sleep(8000);
							}else{
								Thread.sleep(1000);
							}
						}catch (Exception e){
							e.printStackTrace();
						}
					}
				}
			}.start();
		}
	}
}

数据表的预分区也已经做过了

加载中
返回顶部
顶部