`
Donald_Draper
  • 浏览: 953288 次
社区版块
存档分类
最新评论

PriorityBlockingQueue解析

    博客分类:
  • JUC
阅读更多
Queue接口定义:http://donald-draper.iteye.com/blog/2363491
AbstractQueue简介:http://donald-draper.iteye.com/blog/2363608
ConcurrentLinkedQueue解析:http://donald-draper.iteye.com/blog/2363874
BlockingQueue接口的定义:http://donald-draper.iteye.com/blog/2363942
LinkedBlockingQueue解析:http://donald-draper.iteye.com/blog/2364007
ArrayBlockingQueue解析:http://donald-draper.iteye.com/blog/2364034
package java.util.concurrent;

import java.util.concurrent.locks.*;
import java.util.*;

/**
 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
 * the same ordering rules as class {@link PriorityQueue} and supplies
 * blocking retrieval operations.  While this queue is logically
 * unbounded, attempted additions may fail due to resource exhaustion
 * (causing {@code OutOfMemoryError}). This class does not permit
 * {@code null} elements.  A priority queue relying on {@linkplain
 * Comparable natural ordering} also does not permit insertion of
 * non-comparable objects (doing so results in
 * {@code ClassCastException}).
 *
 PriorityBlockingQueue是一个与PriorityQueue具有相同排序策略的无界阻塞队列,
 提供阻塞检索操作。队列虽说是无界的,但当内存资源耗尽时,尝试添加元素,则
 将会失败。队列不允许null元素的存在。优先级队列插入的元素依据元素的Comparable,
 不允许插入一个不可比较的元素。
 * <p>This class and its iterator implement all of the
 * [i]optional[/i] methods of the {@link Collection} and {@link
 * Iterator} interfaces.  The Iterator provided in method {@link
 * #iterator()} is [i]not[/i] guaranteed to traverse the elements of
 * the PriorityBlockingQueue in any particular order. If you need
 * ordered traversal, consider using
 * {@code Arrays.sort(pq.toArray())}.  Also, method {@code drainTo}
 * can be used to [i]remove[/i] some or all elements in priority
 * order and place them in another collection.

 队列实现了集合接口和迭代器的所有方法。迭代器的#iterator方法不能保证,
 不能以特殊的顺序traverse(移动)元素。如果需要以特殊的方式traverse(移动)元素,则
 可以使用Arrays.sort(pq.toArray()方法。drainTo用于将一些元素,或所有元素以优先级的
 顺序移动到另一个集合中。
 *
 * <p>Operations on this class make no guarantees about the ordering
 * of elements with equal priority. If you need to enforce an
 * ordering, you can define custom classes or comparators that use a
 * secondary key to break ties in primary priority values.  For
 * example, here is a class that applies first-in-first-out
 * tie-breaking to comparable elements. To use it, you would insert a
 * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
 *
 队列的所有操作不能保证按照元素优先级的顺序。如果需要重新定义一个以原始优先级作为key的
 比较器,保证顺序。比如原始为FIFO队列,你可以用 FIFOEntry代替原始的Entry
 *  <pre> {@code
 * class FIFOEntry<E extends Comparable<? super E>>
 *     implements Comparable<FIFOEntry<E>> {
 *   static final AtomicLong seq = new AtomicLong(0);
 *   final long seqNum;
 *   final E entry;
 *   public FIFOEntry(E entry) {
 *     seqNum = seq.getAndIncrement();
 *     this.entry = entry;
 *   }
 *   public E getEntry() { return entry; }
 *   public int compareTo(FIFOEntry<E> other) {
 *     int res = entry.compareTo(other.entry);
 *     if (res == 0 && other.entry != this.entry)
 *       res = (seqNum < other.seqNum ? -1 : 1);
 *     return res;
 *   }
 * }}</pre>
 *
 * <p>This class is a member of the
 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
 * Java Collections Framework</a>.
 *
 * @since 1.5
 * @author Doug Lea
 * @param <E> the type of elements held in this collection
 */
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    private static final long serialVersionUID = 5595510919245408276L;
     /*
     * The implementation uses an array-based binary heap, with public
     * operations protected with a single lock. However, allocation
     * during resizing uses a simple spinlock (used only while not
     * holding main lock) in order to allow takes to operate
     * concurrently with allocation.  This avoids repeated
     * postponement of waiting consumers and consequent element
     * build-up. The need to back away from lock during allocation
     * makes it impossible to simply wrap delegated
     * java.util.PriorityQueue operations within a lock, as was done
     * in a previous version of this class. To maintain
     * interoperability, a plain PriorityQueue is still used during
     * serialization, which maintains compatibility at the espense of
     * transiently doubling overhead.
     用一个二进制的数组堆来实现,用一个lock来保护public方法操作。
     然而,为take操作与扩容操作的并发,我们用一个自旋锁来控制空间分配。
     这可以避免消费者的重复等待和元素的包装。
     The need to back away from lock during allocation
      makes it impossible to simply wrap delegated
     java.util.PriorityQueue operations within a lock, as was done
     in a previous version of this class.
     为了保证互操作性,在序列化的时候用了一个空白的PriorityQueue,
     which maintains compatibility at the espense of
     transiently doubling overhead.
     */

    /**
     * Default array capacity.
     */
    默认容量
    private static final int DEFAULT_INITIAL_CAPACITY = 11;

    /**
     * The maximum size of array to allocate.
     * Some VMs reserve some header words in an array.
     * Attempts to allocate larger arrays may result in
     * OutOfMemoryError: Requested array size exceeds VM limit
     */
     //最大队列容量
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    /**
     * Priority queue represented as a balanced binary heap: the two
     * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
     * priority queue is ordered by comparator, or by the elements'
     * natural ordering, if comparator is null: For each node n in the
     * heap and each descendant d of n, n <= d.  The element with the
     * lowest value is in queue[0], assuming the queue is nonempty.
     优先级队列,表示一个平衡二叉树堆: 
     queue[n]的两个字节点为queue[2*n+1] and queue[2*(n+1)]。优先级队列以元素
     的比较作为排序依据,如果比较器为null,则以元素的自然属性排序。每个堆元素n的
     子孙d满足n <= d。如果队列为非空,最低优先级元素放在queue[0]。
     */
    private transient Object[] queue;

    /**
     * The number of elements in the priority queue.队列元素数量
     */
    private transient int size;

    /**
     * The comparator, or null if priority queue uses elements'
     * natural ordering.
     比较器,为空,则按自然属性
     */
    private transient Comparator<? super E> comparator;

    /**
     * Lock used for all public operations,public操作保护lock
     */
    private final ReentrantLock lock;

    /**
     * Condition for blocking when empty,队列非空条件
     */
    private final Condition notEmpty;

    /**
     * Spinlock for allocation, acquired via CAS,通过CAS获取分配的自旋锁
     */
    private transient volatile int allocationSpinLock;

    /**
     * A plain PriorityQueue used only for serialization,
     * to maintain compatibility with previous versions
     * of this class. Non-null only during serialization/deserialization.
     空有限级队列用于序列化
     */
    private PriorityQueue q;
    /**
     * Creates a {@code PriorityBlockingQueue} with the default
     * initial capacity (11) that orders its elements according to
     * their {@linkplain Comparable natural ordering}.
     */
    public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }
     /**
     * Creates a {@code PriorityBlockingQueue} with the specified
     * initial capacity that orders its elements according to their
     * {@linkplain Comparable natural ordering}.
     *
     * @param initialCapacity the initial capacity for this priority queue
     * @throws IllegalArgumentException if {@code initialCapacity} is less
     *         than 1
     */
    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }
    
    /**
     * Creates a {@code PriorityBlockingQueue} with the specified initial
     * capacity that orders its elements according to the specified
     * comparator.
     *
     待比较器和容量参数的构造
     * @param initialCapacity the initial capacity for this priority queue
     * @param  comparator the comparator that will be used to order this
     *         priority queue.  If {@code null}, the {@linkplain Comparable
     *         natural ordering} of the elements will be used.
     * @throws IllegalArgumentException if {@code initialCapacity} is less
     *         than 1
     */
     public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }

来看put操作:
 
 /**
     * Inserts the specified element into this priority queue.
     * As the queue is unbounded, this method will never block.
     *
     * @param e the element to add
     * @throws ClassCastException if the specified element cannot be compared
     *         with elements currently in the priority queue according to the
     *         priority queue's ordering
     * @throws NullPointerException if the specified element is null
     */
    public void put(E e) {
        //委托给put操作
        offer(e); // never need to block
    }

//offer操作
 
/**
     * Inserts the specified element into this priority queue.
     * As the queue is unbounded, this method will never return {@code false}.
     *
     * @param e the element to add
     * @return {@code true} (as specified by {@link Queue#offer})
     * @throws ClassCastException if the specified element cannot be compared
     *         with elements currently in the priority queue according to the
     *         priority queue's ordering
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        while ((n = size) >= (cap = (array = queue).length))
	    //如果队列已满,释放锁,获取扩容锁,成功则扩容,扩容后则重新获取锁,将原始队列拷贝的新的队列中。
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
	    //比较,确定元素存储的位置
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
	    //容量自增
            size = n + 1;
	    //唤醒等待take的线程
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }

这里有几点要关注:
1.
 while ((n = size) >= (cap = (array = queue).length))
	 //如果队列已满
          tryGrow(array, cap);

2.
 Comparator<? super E> cmp = comparator;
 if (cmp == null)
    siftUpComparable(n, e, array);

3.
 else
    siftUpUsingComparator(n, e, array, cmp);

先看第一点
1.
while ((n = size) >= (cap = (array = queue).length))
	 //如果队列已满
          tryGrow(array, cap);

/**
     * Tries to grow array to accommodate at least one more element
     * (but normally expand by about 50%), giving up (allowing retry)
     * on contention (which we expect to be rare). Call only while
     * holding lock.
     尝试调整至少一个元素,释放锁。在需要的时候再持有锁
     * @param array the heap array
     * @param oldCap the length of the array
     */
    private void tryGrow(Object[] array, int oldCap) {
        lock.unlock(); // must release and then re-acquire main lock,先释放锁,需要时,在重新获取
        Object[] newArray = null;
	//获取扩容锁,成功则,重新扩展队列容量,扩容后则重新获取锁,将原始队列拷贝的新的队列中。
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
	        //当容量小于64时,则增长容量为原来的2倍,大于64则每次增长两个。
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
		    //如果添加元素后,容量移除,或大于MAX_ARRAY_SIZE,则抛出OutOfMemoryError
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }
        if (newArray == null) // back off if another thread is allocating
	    //如果分配失败,则暂定当前线程
            Thread.yield();
	//自旋后,重新获取锁
        lock.lock();
        if (newArray != null && queue == array) {
            queue = newArray;
	    //将原始队列拷贝的新的队列中
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }

这一步,首先释放锁,获取扩容锁,成功则,重新扩展队列容量,扩容后则重新获取锁,将原始队列拷贝的新的队列中。
2.
Comparator<? super E> cmp = comparator;
 if (cmp == null)
    siftUpComparable(n, e, array);

 /**
     * Inserts item x at position k, maintaining heap invariant by
     * promoting x up the tree until it is greater than or equal to
     * its parent, or is the root.
     *
     * To simplify and speed up coercions and comparisons. the
     * Comparable and Comparator versions are separated into different
     * methods that are otherwise identical. (Similarly for siftDown.)
     * These methods are static, with heap state as arguments, to
     * simplify use in light of possible comparator exceptions.
     *
     * @param k the position to fill
     * @param x the item to insert
     * @param array the heap array
     * @param n heap size
     */
    private static <T> void siftUpComparable(int k, T x, Object[] array) {
        //获取元素的比较器
        Comparable<? super T> key = (Comparable<? super T>) x;
        while (k > 0) {
	    //数组k叶节点的父节点为(k - 1) >>> 1,无符号右移,左补0
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
	    //比较,确定元素存储的位置
            if (key.compareTo((T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = key;
    }

3.
 else
    siftUpUsingComparator(n, e, array, cmp);

这个以上一步没有什么太大的区别,唯一区别是,使用的自定义的比较器
   
  private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                                       Comparator<? super T> cmp) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (cmp.compare(x, (T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = x;
    }

小节:
offer操作,获取锁,如果队列已满,释放锁,获取扩容锁,成功则扩容,扩容后则重新获取锁,将原始队列拷贝的新的队列中。获取当前队列中的尾元素的父节点,将当前要添加的元素与父节点比较,确定存储的位置。比较,确定元素存储的位置。最后容量自增,唤醒等待take的线程。从offer操作来看,数组队列存放的逻辑结构实际上是一个平衡二叉树(堆排序)。
add操作:
 /**
     * Inserts the specified element into this priority queue.
     *
     * @param e the element to add
     * @return {@code true} (as specified by {@link Collection#add})
     * @throws ClassCastException if the specified element cannot be compared
     *         with elements currently in the priority queue according to the
     *         priority queue's ordering
     * @throws NullPointerException if the specified element is null
     */
    public boolean add(E e) {
        return offer(e);
    }

超时offer操作:
**
     * Inserts the specified element into this priority queue.
     * As the queue is unbounded, this method will never block or
     * return {@code false}.
     *
     * @param e the element to add
     * @param timeout This parameter is ignored as the method never blocks
     * @param unit This parameter is ignored as the method never blocks
     * @return {@code true} (as specified by
     *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
     * @throws ClassCastException if the specified element cannot be compared
     *         with elements currently in the priority queue according to the
     *         priority queue's ordering
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e, long timeout, TimeUnit unit) {
        return offer(e); // never need to block
    }


从上面来看无论是add,put,还是超时offer操作,都是委托给offer操作。
再来看take操作:
 public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
	//以可中断方式获取锁
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = extract()) == null)
	        //如果队列为空,则等待非空条件notEmpty
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }


这里我们有一点要关注的是
while ( (result = extract()) == null)
	//如果队列为空,则等待非空条件notEmpty
        notEmpty.await();

 /**
     * Mechanics for poll().  Call only while holding lock.
     */
    private E extract() {
        E result;
        int n = size - 1;
        if (n < 0)
	    //队列为空
            result = null;
        else {
            Object[] array = queue;
	    从队列头获取元素
            result = (E) array[0];
            E x = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
	    //从队列头去除元素,则重新调整平衡二叉树
            if (cmp == null)
                siftDownComparable(0, x, array, n);
            else
                siftDownUsingComparator(0, x, array, n, cmp);
            size = n;
        }
        return result;
    }
//调整平衡二叉树
 /**
     * Inserts item x at position k, maintaining heap invariant by
     * demoting x down the tree repeatedly until it is less than or
     * equal to its children or is a leaf.
     *
     * @param k the position to fill
     * @param x the item to insert
     * @param array the heap array
     * @param n heap size
     */
    private static <T> void siftDownComparable(int k, T x, Object[] array,
                                               int n) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;           // loop while a non-leaf
        while (k < half) {
            int child = (k << 1) + 1; // assume left child is least
            Object c = array[child];
            int right = child + 1;
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            if (key.compareTo((T) c) <= 0)
                break;
            array[k] = c;
            k = child;
        }
        array[k] = key;
    }
//使用比较器调整平衡二叉树
    private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
                                                    int n,
                                                    Comparator<? super T> cmp) {
        int half = n >>> 1;
        while (k < half) {
            int child = (k << 1) + 1;
            Object c = array[child];
            int right = child + 1;
            if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
                c = array[child = right];
            if (cmp.compare(x, (T) c) <= 0)
                break;
            array[k] = c;
            k = child;
        }
        array[k] = x;
    }

从上面可以看出,take操首先以可中断方式获取锁,如果获取成功,则从队列头部获取元素,
并重新调整平衡二叉树,如果从队列头取的元素为null,则等待非空条件notEmpty。

poll操作
 public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        E result;
        try {
            result = extract();
        } finally {
            lock.unlock();
        }
        return result;
    }


poll操作与take的区别时,直接从队列头部获取元素为null,直接返回,而不是等待非空条件notEmpty。

再看超时poll,
 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = extract()) == null && nanos > 0)
                nanos = notEmpty.awaitNanos(nanos);
        } finally {
            lock.unlock();
        }
        return result;
    }

超时poll与take的区别为当队列为空时,超时等待非空条件notEmpty。

再看peek操作:
   public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        E result;
        try {
            result = size > 0 ? (E) queue[0] : null;
        } finally {
            lock.unlock();
        }
        return result;
    }

peek操作获取锁,返回队头元素。
再来看remove操作:
/**
     * Removes a single instance of the specified element from this queue,
     * if it is present.  More formally, removes an element {@code e} such
     * that {@code o.equals(e)}, if this queue contains one or more such
     * elements.  Returns {@code true} if and only if this queue contained
     * the specified element (or equivalently, if this queue changed as a
     * result of the call).
     *
     * @param o element to be removed from this queue, if present
     * @return {@code true} if this queue changed as a result of the call
     */
    public boolean remove(Object o) {
        boolean removed = false;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
	    //定位数据在队列中的索引
            int i = indexOf(o);
            if (i != -1) {
	        //如果存在对应的元素,则移除
                removeAt(i);
                removed = true;
            }
        } finally {
            lock.unlock();
        }
        return removed;
    }


有两点要看:
1.
 //定位数据在队列中的索引
      int i = indexOf(o);


 private int indexOf(Object o) {
        if (o != null) {
            Object[] array = queue;
            int n = size;
            for (int i = 0; i < n; i++)
                if (o.equals(array[i]))
                    return i;
        }
        return -1;
    }

2.
 if (i != -1) {
    //如果存在对应的元素,则移除
           removeAt(i);
           removed = true;
       }


 /**
     * Removes the ith element from queue.
     */
    private void removeAt(int i) {
        Object[] array = queue;
        int n = size - 1;
	//移除元素
        if (n == i) // removed last element
            array[i] = null;
        else {
            E moved = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
	    //左旋
            if (cmp == null)
                siftDownComparable(i, moved, array, n);
            else
                siftDownUsingComparator(i, moved, array, n, cmp);
            //右旋
            if (array[i] == moved) {
                if (cmp == null)
                    siftUpComparable(i, moved, array);
                else
                    siftUpUsingComparator(i, moved, array, cmp);
            }
        }
        size = n;
    }

从上可以看出,remove操作,首先获取锁,再次定位元素的位置,移除元素,调整平衡二叉树。
再看contains
public boolean contains(Object o) {
        int index;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            index = indexOf(o);
        } finally {
            lock.unlock();
        }
        return index != -1;
    }

这个不需要多讲了吧。
有了上面的分析,下面的drainTo和clear应该很容易理解了吧。
drainTo操作:
/**
     * @throws UnsupportedOperationException {@inheritDoc}
     * @throws ClassCastException            {@inheritDoc}
     * @throws NullPointerException          {@inheritDoc}
     * @throws IllegalArgumentException      {@inheritDoc}
     */
    public int drainTo(Collection<? super E> c) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int n = 0;
            E e;
            while ( (e = extract()) != null) {
                c.add(e);
                ++n;
            }
            return n;
        } finally {
            lock.unlock();
        }
    }

    /**
     * @throws UnsupportedOperationException {@inheritDoc}
     * @throws ClassCastException            {@inheritDoc}
     * @throws NullPointerException          {@inheritDoc}
     * @throws IllegalArgumentException      {@inheritDoc}
     */
    public int drainTo(Collection<? super E> c, int maxElements) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int n = 0;
            E e;
            while (n < maxElements && (e = extract()) != null) {
                c.add(e);
                ++n;
            }
            return n;
        } finally {
            lock.unlock();
        }
    }

clear操作:
  
 /**
     * Atomically removes all of the elements from this queue.
     * The queue will be empty after this call returns.
     */
    public void clear() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] array = queue;
            int n = size;
            size = 0;
            for (int i = 0; i < n; i++)
                array[i] = null;
        } finally {
            lock.unlock();
        }
    }


序列化:
 /**
     * Saves the state to a stream (that is, serializes it).  For
     * compatibility with previous version of this class,
     * elements are first copied to a java.util.PriorityQueue,
     * which is then serialized.
     */
    private void writeObject(java.io.ObjectOutputStream s)
        throws java.io.IOException {
        lock.lock();
        try {
            int n = size; // avoid zero capacity argument
            q = new PriorityQueue<E>(n == 0 ? 1 : n, comparator);
            q.addAll(this);
            s.defaultWriteObject();
        } finally {
            q = null;
            lock.unlock();
        }
    }

反序列:
  
 /**
     * Reconstitutes the {@code PriorityBlockingQueue} instance from a stream
     * (that is, deserializes it).
     *
     * @param s the stream
     */
    private void readObject(java.io.ObjectInputStream s)
        throws java.io.IOException, ClassNotFoundException {
        try {
            s.defaultReadObject();
            this.queue = new Object[q.size()];
            comparator = q.comparator();
            addAll(q);
        } finally {
            q = null;
        }
    }

序列化与反序列主要是将元素放在一个PriorityQueue中,进行序列化与反序列操作。
总结:
offer操作,获取锁,如果队列已满,释放锁,获取扩容锁,成功则扩容,扩容后则重新获取锁,将原始队列拷贝的新的队列中。获取当前队列中的尾元素的父节点,将当前要添加的元素与父节点比较,确定存储的位置。比较,确定元素存储的位置。最后容量自增,唤醒等待take的线程。从offer操作来看,数组队列存放的逻辑结构实际上是一个平衡二叉树(堆排序)。无论是add,put,还是超时offer操作,都是委托给offer操作。
take操首先以可中断方式获取锁,如果获取成功,则从队列头部获取元素,
并重新调整平衡二叉树,如果从队列头取的元素为null,则等待非空条件notEmpty。
poll操作与take的区别时,直接从队列头部获取元素为null,直接返回,而不是等待非空条件notEmpty。超时poll与take的区别为当队列为空时,超时等待非空条件notEmpty。
peek操作获取锁,返回队头元素。
remove操作,首先获取锁,再次定位元素的位置,移除元素,调整平衡二叉树。
序列化与反序列主要是将元素放在一个PriorityQueue中,进行序列化与反序列操作。



0
0
分享到:
评论
5 楼 zhanggang807 2017-04-01  
嗯。我也想明白了,是这个意思
4 楼 Donald_Draper 2017-03-31  
你说的是offer操作吧,这个不是自旋,感谢提醒,从allocationSpinLock(Spinlock for allocation, acquired via CAS.)的定义来看,字面上的意思为是否允许分配的自旋锁,并且为volatile,可以简单理解为锁。当队列满,需要扩容,因为队列可能有多个线程同时访问,没有allocationSpinLock锁控制,那么,将会有两个线程同时扩容队列,这个肯定不是我们希望看到的。allocationSpinLock可以理解为锁,获取锁,则可以扩容队列,否则不可。希望可以帮到您。
3 楼 Donald_Draper 2017-03-31  
您说的是哪个地方?
2 楼 zhanggang807 2017-03-31  
为什么要自旋呢?
1 楼 tairan_0729 2017-03-20  
朋友,你写的这些文章能否做成PDF,方便离线拜读,非常感谢

相关推荐

    priorityblockingqueue的使用

    这个demo主要讲解了 priorityblockingqueue的具体使用,希望可以帮助需要的同学!

    阻塞队列阻塞队列阻塞队列

    java中,常用的阻塞式队列Demo。包含:ArrayBlockingQueue、LinkedQueue、PriorityBlockingQueue

    一个小的java Demo , 非常适合Java初学者学习阅读.rar

    链阻塞队列 LinkedBlockingQueue,具有优先级的阻塞队列 PriorityBlockingQueue, 同步队列 SynchronousQueue,阻塞双端队列 BlockingDeque, 链阻塞双端队列 LinkedBlockingDeque,并发 Map(映射) ConcurrentMap, ...

    Java并发编程相关源码集 包括多任务线程,线程池等.rar

    多个线程竞争问题、多个线程多个锁问题、创建一个缓存的线程池、多线程使用Vector或者HashTable的示例(简单线程同步问题)、PriorityBlockingQueue示例、高性能无阻塞无界队列: ConcurrentLinkedQueue、DelayQueue...

    java并发工具包详解

    6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 Synchronou sQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航映射 ...

    java并发工具包 java.util.concurrent中文版用户指南pdf

    6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航映射 ...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版.pdf

    具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航 映射 ...

    JAVA高并发_学习笔记

    JAVA学习高并发的学习笔记。...BlockingQueue:ArrayBlockingQueue , DelayQueue , LinkedBlockingDeque , LinkedBlockingQueue , LinkedTransferQueue , PriorityBlockingQueue , SynchronousQueue

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版

    6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航映射 ...

    java并发包资源

    6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航映射 ...

    javabitset源码-JerrySoundCode:杰瑞声码

    java bitset源码Java源码分析 基础集合列表 ArrayList (done) Vector (done) LinkedList (done) Stack ...PriorityBlockingQueue (done) ConcurrentHashMap (done) ConcurrentLinkedQueue (done) C

    javaforkjoin源码-gitbook-BAT-interview:本文综合自己在一线互联网工作感悟,经验。记录开源框架的源码解读,数据

    java forkjoin 源码 -- -- geomesa -- spring -- 算法 ...[PriorityBlockingQueue] [DelayQueue] 并发安全集合 [HashMap, ConcurrentHashMap源码] [ArrayList, LinkedList, CopyOnWriteArrayList源码]

    pixonic-test-task:Pixonic的测试任务

    主要算法: 客户端提交的任务被放入PriorityBlockingQueue中,并在其中按localDateTime进行排序单独的任务执行线程偷看队列中的第一个项目(必要时等待一个项目) 如果还没有时间执行任务,线程将在监视器上等待...

    ResourceSchedulerExercise:考生练习

    指的是线程安全类 [PriorityBlockingQueue] ( ) 文档页面中有一个示例,其中定义了一个Comparable自定义类。 这就是我要找的。 首先我定义网关和消息接口。 我定义了GatewayImplTest类和testGatewaySend()测试,...

    architect-awesome:初步架构师技术图谱

    双向队列:ArrayBlockingQueue(有界),LinkedBlockingQueue(无界),DelayQueue,PriorityBlockingQueue,采用锁机制;使用ReentrantLock锁。 集合 链表,纹理 字典,关联数组 栈 Stack是线程安全的。 内部...

    javabitset源码-java_master:后端架构师技术图谱

    阻塞队列:ArrayBlockingQueue(有界)、LinkedBlockingQueue(无界)、DelayQueue、PriorityBlockingQueue,采用锁机制;使用 ReentrantLock 锁。 集合 链表、数组 字典、关联数组 栈 Stack 是线程安全的。 内部使用...

    亚信java笔试题-Book:架构师之路

    阻塞队列:ArrayBlockingQueue(有界)、LinkedBlockingQueue(无界)、DelayQueue、PriorityBlockingQueue,采用锁机制;使用 ReentrantLock 锁。 集合 链表、数组 字典、关联数组 栈 Stack 是线程安全的。 内部使用...

    高级java笔试题-architect-awesome:后端架构师技术图谱

    阻塞队列:ArrayBlockingQueue(有界)、LinkedBlockingQueue(无界)、DelayQueue、PriorityBlockingQueue,采用锁机制;使用 ReentrantLock 锁。 集合 链表、数组 字典、关联数组 栈 Stack 是线程安全的。 内部使用...

    java简易投票系统源码下载-study-way:学习路线

    阻塞队列:ArrayBlockingQueue(有界)、LinkedBlockingQueue(无界)、DelayQueue、PriorityBlockingQueue,采用锁机制;使用 ReentrantLock 锁。 集合 链表、数组 字典、关联数组 栈 Stack 是线程安全的。 内部使用...

    leetcode中国-architect-go:后端架构师技术图谱-go

    阻塞队列:ArrayBlockingQueue(有界)、LinkedBlockingQueue(无界)、DelayQueue、PriorityBlockingQueue,采用锁机制;使用 ReentrantLock 锁。 集合 链表、数组 字典、关联数组 栈 Stack 是线程安全的。 内部使用...

Global site tag (gtag.js) - Google Analytics