disruptor号称是快如闪电的队列,但是我实际测试,为什么比jdk的LinkedBlockingQueue还慢一些呢

learnli 发布于 2018/06/15 15:32
阅读 252
收藏 1

测试环境是window7系统,disruptor3.4.2,jdk为1.8。
单生产者、10个消费者,处理1000000个数据

测试代码:

1.disruptor:

//DisruptorTest.java

package iot.cmcc.test.disruptor;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.IgnoreExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.WorkerPool;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;

public class DisruptorTest {
    private static Long time = System.currentTimeMillis();

    public static void main(String[] args) {
        RingBuffer<TradeTransaction> ringBuffer;
        Producer producer = null;
        // 创建缓冲池
        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        // 创建工厂
        EventFactory<TradeTransaction> factory = new EventFactory<TradeTransaction>() {
            @Override
            public TradeTransaction newInstance() {
                return new TradeTransaction();
            }
        };
        // 创建bufferSize ,也就是RingBuffer大小,必须是2的N次方
        int ringBufferSize = 1024 * 1024; //
        WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
        // 创建ringBuffer
        ringBuffer = RingBuffer.create(ProducerType.MULTI, factory, ringBufferSize, YIELDING_WAIT);
        SequenceBarrier barriers = ringBuffer.newBarrier();
        // 创建10个消费者来处理同一个生产者发的消息(这10个消费者不重复消费消息)
        // Consumer[] consumers = new Consumer[10];
        WorkHandler<TradeTransaction>[] workHandlers = new WorkHandler[10];
        for (int i = 0; i < workHandlers.length; i++) {
            workHandlers[i] = new Consumer();
        }
        WorkerPool<TradeTransaction> workerPool = new WorkerPool<TradeTransaction>(ringBuffer, barriers,
                new IgnoreExceptionHandler(), workHandlers);

        ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
        workerPool.start(executor);

        producer = new Producer(ringBuffer);

        for (int i = 0; i < 1000000; i++) {
            producer.onData(time);
        }

        // executor.shutdown();
    }
}
//TradeTransaction.java

package iot.cmcc.test.disruptor;

public class TradeTransaction {

    public Long value;
    public String seq;

    public String getSeq() {
        return seq;
    }

    public void setSeq(String seq) {
        this.seq = seq;
    }

    public Long getValue() {
        return value;
    }

    public void setValue(Long value) {
        this.value = value;
    }
}
//Producer.java

package iot.cmcc.test.disruptor;

import com.lmax.disruptor.RingBuffer;

public class Producer {
    private RingBuffer<TradeTransaction> ringBuffer;

    public Producer(RingBuffer<TradeTransaction> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(Long time) {
        // 可以把ringBuffer看做一个事件队列,那么next就是得到下面一个事件槽
        long sequence = ringBuffer.next();
        ringBuffer.get(sequence).setValue(time);
        ringBuffer.get(sequence).setSeq(sequence + "");
        ringBuffer.publish(sequence);
    }

}
//Consumer.java

package iot.cmcc.test.disruptor;

import com.lmax.disruptor.WorkHandler;

public class Consumer implements WorkHandler<TradeTransaction> {

    @Override
    public void onEvent(TradeTransaction event) throws Exception {
        // TODO Auto-generated method stub
        System.out.println("消费者C1消费了一条消息:" + event.getSeq());
        System.out.println("花费时间 :" + (System.currentTimeMillis() - event.getValue()));
    }
}

//IntEventExceptionHandler.java

package iot.cmcc.test.disruptor;

import org.apache.log4j.Logger;

public class IntEventExceptionHandler {
    private static final Logger logger = Logger.getLogger(IntEventExceptionHandler.class);

    public void handleEventException(Throwable ex, long sequence, Object event) {
        logger.error("handleEventException", ex);
    }

    public void handleOnStartException(Throwable ex) {
        logger.error("handleOnStartException", ex);
    }

    public void handleOnShutdownException(Throwable ex) {
        logger.error("handleOnShutdownException", ex);
    }
}
2.jdk的LinkedBlockingQueue队列

//jdkTest.java

package iot.cmcc.test.jdkseq;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class jdkTest {
    private static Long time = System.currentTimeMillis();

    public static void main(String[] args) {
        // Creating BlockingQueue of size 10
        BlockingQueue<TradeTransaction> queue = new LinkedBlockingQueue<>(10000);
        Producer producer = new Producer(queue);
        producer.setTime(time);
        Consumer consumer = new Consumer(queue);
        // starting producer to produce Strings in queue
        new Thread(producer).start();
        // starting consumer to consume Strings from queue
        for (int i = 0; i < 10; i++) {
            new Thread(consumer).start();
        }
        System.out.println("Producer and Consumer has been started");
    }
}
//TradeTransaction.java

package iot.cmcc.test.jdkseq;

public class TradeTransaction {

    public Long value;
    public String seq;

    public String getSeq() {
        return seq;
    }

    public void setSeq(String seq) {
        this.seq = seq;
    }

    public Long getValue() {
        return value;
    }

    public void setValue(Long value) {
        this.value = value;
    }
}
//Producer.java

package iot.cmcc.test.jdkseq;

import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {
    private BlockingQueue<TradeTransaction> queue;

    public Producer(BlockingQueue<TradeTransaction> q) {
        this.queue = q;
    }

    private static Long time;

    public static Long getTime() {
        return time;
    }

    public static void setTime(Long time) {
        Producer.time = time;
    }

    @Override
    public void run() {
        for (int i = 0; i < 1000000; i++) {
            try {
                TradeTransaction msg = new TradeTransaction();
                msg.setValue(getTime());
                msg.setSeq(i + "");
                queue.put(msg);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}
//Consumer.java

package iot.cmcc.test.jdkseq;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {

    private BlockingQueue<TradeTransaction> queue;

    public Consumer(BlockingQueue<TradeTransaction> q) {
        this.queue = q;
    }

    @Override
    public void run() {
        while (true) {
            try {
                String msg;
                TradeTransaction tt = queue.take();
                Long l = tt.getValue();
                System.out.println("消费者消费了一条消息 :" + tt.getSeq());
                System.out.println("花费时间 :" + (System.currentTimeMillis() - l));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}


////////////////////////////////////////////////////////////

最后的结果是:jdk的LinkedBlockingQueue比disruptor快将近4倍。这个结果跟预期差很远,是我的测试代码有问题吗?

加载中
返回顶部
顶部