集合框架 Queue---LinkedBlockingQueue

长平狐 发布于 2012/11/28 15:41
阅读 281
收藏 0

java.util.concurrent
类 LinkedBlockingQueue<E>

java.lang.Object
  java.util.AbstractCollection<E>
      java.util.AbstractQueue<E>
          java.util.concurrent.LinkedBlockingQueue<E>
类型参数:
E - 此 collection 中所保存元素的类型。
所有已实现的接口:
Serializable, Iterable<E>, Collection<E>, BlockingQueue<E>, Queue<E>

public class LinkedBlockingQueue<E>
   
   
extends AbstractQueue<E>
implements BlockingQueue<E>, Serializable
 

一个基于已链接节点的、范围任意的 blocking queue。此队列按 FIFO(先进先出)排序元素。队列的头部 是在队列中时间最长的元素。队列的尾部 是在队列中时间最短的元素。新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。

可选的容量范围构造方法参数作为防止队列过度扩展的一种方法。如果未指定容量,则它等于 Integer.MAX_VALUE。除非插入节点会使队列超出容量,否则每次插入后会动态地创建链接节点。

此类及其迭代器实现 Collection Iterator 接口的所有可选 方法。

此类是 Java Collections Framework 的成员。

 注意1:容量范围可以在构造方法参数中指定作为防止队列过度扩展。如果未指定容量,则它等于 Integer.MAX_VALUE 
 注意2:它是线程安全的,是线程阻塞的。 
 注意3:不接受 null 元素 
 注意4:它实现了BlockingQueue接口。关于BlockingQueue,请参考《BlockingQueue》 
 注意5:它没有线程ArrayBlockingQueue那样的公平性设置。为什么这样设计呢?puzzle. 
 注意6:此类及其迭代器实现了 Collection 和 Iterator 接口的所有可选 方法

LinkedBlockingQueue

采用对于的next构成链表的方式来存储对象。由于读只操作队头,而写只操作队尾,这里巧妙地采用了两把锁,对put和offer采用putLock,对take和poll采用takeLock,避免了读写时相互竞争锁的现象,因此LinkedBlockingQueue在高并发读写操作都多的情况下,性能会比ArrayBlockingQueue好很多,在遍历以及删除元素时则要把两把锁都锁住。

下面是源码,里面读写的的锁都是全局的,而且是final的

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
    /**
     * 链表节点node类结构
     */
    static class Node<E> {
        volatile E item;//volatile使得所有的write happen-befor read,保证了数据的可见性 
        Node<E> next;
        Node(E x) { item = x; }
    }
    /** 队列容量,默认为Integer.MAX_VALUE*/
    private final int capacity;
    /** 用原子变量 表示当前元素的个数 */
    private final AtomicInteger count = new AtomicInteger(0);
    /** 表头节点 */
    private transient Node<E> head;
    /** 表尾节点 */
    private transient Node<E> last;
    /** 获取元素或删除元素时 要加的takeLock锁 */
    private final ReentrantLock takeLock = new ReentrantLock();
    /** 获取元素 notEmpty条件 */
    private final Condition notEmpty = takeLock.newCondition();
    /** 插入元素时 要加putLock锁 */
    private final ReentrantLock putLock = new ReentrantLock();
    /** 插入时,要判满 */
    private final Condition notFull = putLock.newCondition();
    /**
     * 唤醒等待的take操作,在put/offer中调用(因为这些操作中不会用到takeLock锁)
     */
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }
    /**
     * 唤醒等待插入操作,在take/poll中调用.
     */
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }
    /**
     * 插入到尾部
     */
    private void insert(E x) {
        last = last.next = new Node<E>(x);
    }
    /**
     * 获取并移除头元素
     */
    private E extract() {
        Node<E> first = head.next;
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }
    /**
     * 锁住两把锁,在remove,clear等方法中调用
     */
    private void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }
    /**
     * 和fullyLock成对使用
     */
    private void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }
    /**
     * 默认构造,容量为 Integer.MAX_VALUE 
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    /**
     *指定容量的构造
     */
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
    /**
     * 指定初始化集合的构造
     */
    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        for (E e : c)
            add(e);
    }
    /**
     * 通过原子变量,直接获得大小
     */
    public int size() {
        return count.get();
    }
    /**
     *返回理想情况下(没有内存和资源约束)此队列可接受并且不会被阻塞的附加元素数量。
     */
    public int remainingCapacity() {
        return capacity - count.get();
    }
    /**
     * 将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。
     */
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            try {
                while (count.get() == capacity)
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to a non-interrupted thread
                throw ie;
            }
            insert(e);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
    /**
     * 将指定元素插入到此队列的尾部,如有必要,则等待指定的时间以使空间变得可用。
     */
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            for (;;) {
                if (count.get() < capacity) {
                    insert(e);
                    c = count.getAndIncrement();
                    if (c + 1 < capacity)
                        notFull.signal();
                    break;
                }
                if (nanos <= 0)
                    return false;
                try {
                    nanos = notFull.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    notFull.signal(); // propagate to a non-interrupted thread
                    throw ie;
                }
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }
    /**
     *将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量),
     *在成功时返回 true,如果此队列已满,则返回 false。
     */
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                insert(e);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
    //获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            try {
                while (count.get() == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to a non-interrupted thread
                throw ie;
            }
            x = extract();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
    
    //获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            for (;;) {
                if (count.get() > 0) {
                    x = extract();
                    c = count.getAndDecrement();
                    if (c > 1)
                        notEmpty.signal();
                    break;
                }
                if (nanos <= 0)
                    return null;
                try {
                    nanos = notEmpty.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    notEmpty.signal(); // propagate to a non-interrupted thread
                    throw ie;
                }
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
    
    //获取并移除此队列的头,如果此队列为空,则返回 null。
    public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = extract();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
    //获取但不移除此队列的头;如果此队列为空,则返回 null。
    public E peek() {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }
    /**
     * 从此队列移除指定元素的单个实例(如果存在)。
     */
    public boolean remove(Object o) {
        if (o == null) return false;
        boolean removed = false;
        fullyLock();
        try {
            Node<E> trail = head;
            Node<E> p = head.next;
            while (p != null) {
                if (o.equals(p.item)) {
                    removed = true;
                    break;
                }
                trail = p;
                p = p.next;
            }
            if (removed) {
                p.item = null;
                trail.next = p.next;
                if (last == p)
                    last = trail;
                if (count.getAndDecrement() == capacity)
                    notFull.signalAll();
            }
        } finally {
            fullyUnlock();
        }
        return removed;
    }
    ……
}



原文链接:http://blog.csdn.net/longeremmy/article/details/8230086
加载中
返回顶部
顶部