Java 中 BlockingQueue 和 ThreadPoolExecutor 的使用

tsl0922 发布于 2012/10/29 11:26
阅读 2K+
收藏 6
JDK
在 JDK5 之后,Java 程序员编写多线程应用程序变得更加容易了。JDK5 带来了很多关于多线程的特性,这些在以前都是开发者的噩梦,对于那些将来要调试代码修复BUG的人来说更加严重。

在本文中,我将推荐使用一个这样的新特性,ThreadPoolExecutor BlockingQueue 结合使用。我将向你展示以上类在程序中使用的最佳实践。

本文目录

  • DemoTask 简介
  • 添加 CustomThreadPoolExecutor
  • BlockingQueue 介绍
  • RejectedExecutionHandler 介绍
  • 测试我们的代码

DemoTask 简介

我不会花太多时间在这里,因为 DemoTask 是只是一个支持我们的逻辑和代码编写的测试线程。

package corejava.thread;

public class DemoThread implements Runnable{

    private String name = null;

    public DemoThread(String name)
    {
        this.name = name;
    }

    public String getName()
    {
        return this.name;
    }

    @Override
    public void run()
    {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Executing : "+name);
    }
}

添加 CustomThreadPoolExecutor

这个类是很重要的。CustomThreadPoolExecutor 是 ThreadPoolExecutor 的扩展。尽管不扩展 ThreadPoolExecutor 简单的创建它的实例并使用也可以,但是我们会错过在执行上的一些控制功能。

ThreadPoolExecutor 提供了两个很好的方法,我将强烈建议覆盖,即beforeExecute()afterExecute()方法。它们对我们要执行的 Runnable 的生命周期提供了很好的控制。让我们看看 CustomThreadPoolExecutor 里的这两个方法。

package corejava.thread;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CustomThreadPoolExecutor extends ThreadPoolExecutor{

    public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
    long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        System.out.println("Perform beforeExecute() logic");
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if(t != null)
        {
            System.out.println("Perform exception handler logic");
        }
        System.out.println("Perform afterExecute() logic");
    }

}

BlockingQueue 介绍

是否还记得“生产者-消费者问题”?在JDK5之前,消费者必须一直等待,直到生产者添加产品到资源队列里。使用新的 BlockingQueue 可以很容易解决这个问题。

BlockingQueue 就像是另外一个有额外功能的队列实现。任何试图从里面获取东西的尝试,都可以被视为安全的,因为它不会返回空。只要 BlockingQueue 里没有数据填充,消费者线程会自动等待;一旦填充了,线程就可以消耗这个资源。

BlockingQueue 遵循以下规则:

  • 如果有少于 corePoolSize 个线程正在运行,则 Executor 始终首选添加一个新的线程,而不是排队。
  • 如果有 corePoolSize 或多个线程正在运行,则 Executor 始终首选排队请求,而不是加入一个新的线程。
  • 如果一个请求不能被排队,将创建一个新的线程,除非这样做会超过 maximumPoolSize,在这种情况下,任务将被拒绝

RejectedExecutionHandler 介绍

所以,危险的是:一个任务可以被拒绝。我们需要想办法解决这个问题,因为没有任何一个人愿意错过他程序中的任何任务。

Can we do something about it? Yes, we can…[Borrowed from Obama]

BlockingQueue 在被拒绝的情况下抛出 RejectedExectionException,我们可以为它添加一个处理程序。

最佳实践:在使用新的 concurrent API 时添加 RejectedExecutionHandler

测试我们的代码

我说完了,现在到测试的时候了,看看是不是真的像我说的那样?让我们来写一个测试用例。

我们有大约100个任务。我们预计想使用10个,最多20个线程。下面是我写的代码,你可能写的更好或你有这样的解决方案。

package corejava.thread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class DemoExecutor {

    public static void main(String[] args)
    {
        Integer threadCounter = 0;
        BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(50);

        CustomThreadPoolExecutor executor = new CustomThreadPoolExecutor(10, 20, 5000, TimeUnit.MILLISECONDS, blockingQueue);

        executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
            {
                System.out.println("DemoTask Rejected : " + ((DemoThread)r).getName());
                System.out.println("Waiting for a second !!");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Lets add another time : "+((DemoThread)r).getName());
                executor.execute(r);
            }
        });
        //Let start all core threads initially
        executor.prestartAllCoreThreads();
        while(true)
        {
            threadCounter++;
            //Adding threads one by one
            System.out.println("Adding DemoTask : "+threadCounter);
            executor.execute(new DemoThread(threadCounter.toString()));

            if(threadCounter == 100) break;
        }
    }

}

执行上面的代码,你将会看到预期的结果,并且性能也很好。

OSCHINA.NET 编译,原文链接

加载中
返回顶部
顶部