`
Donald_Draper
  • 浏览: 953361 次
社区版块
存档分类
最新评论

SynchronousQueue解析下-TransferQueue

    博客分类:
  • JUC
阅读更多
Queue接口定义:http://donald-draper.iteye.com/blog/2363491
AbstractQueue简介:http://donald-draper.iteye.com/blog/2363608
ConcurrentLinkedQueue解析:http://donald-draper.iteye.com/blog/2363874
BlockingQueue接口的定义:http://donald-draper.iteye.com/blog/2363942
LinkedBlockingQueue解析:http://donald-draper.iteye.com/blog/2364007
ArrayBlockingQueue解析:http://donald-draper.iteye.com/blog/2364034
PriorityBlockingQueue解析:http://donald-draper.iteye.com/blog/2364100
SynchronousQueue解析上-TransferStack:http://donald-draper.iteye.com/blog/2364622
由于篇幅问题,上一篇只讲到TransferStack,而没讲到TransferQueue和其余部分,试着看看这篇能不能
讲完,不能讲完的话,再分一篇讲。
先回顾一下上一篇:
SynchronousQueue阻塞队列,每次插入操作必须等待一个协同的移除线程,反之亦然。
SynchronousQueue同步队列没有容量,可以说,没有一个容量。由于队列中只有在消费线程,
尝试消费元素的时候,才会出现元素,所以不能进行peek操作;不能用任何方法,生产元素,除非有消费者在尝试消费元素,同时由于队列中没有元素,所以不能迭代。head是第一个生产线程尝试生产的元素;如果没有这样的生产线程,那么没有元素可利用,remove和poll操作将会返回null。SynchronousQueue实际一个空集合类。同时同步队列不允许为null。同步队列支持生产者和消费者等待的公平性策略。默认情况下,不能保证生产消费的顺序。如果一个同步队列构造为公平性,则可以线程以FIFO访问队列元素。当时非公平策略用的是
TransferStack,公平策略用的是TransferQueue;TransferStack和TransferQueue是存放等待操作线程的描述,从TransferStack中Snode节点可以看出:节点关联一个等待线程waiter,后继next,匹配节点match,节点元素item和模式mode;模式由三种,REQUEST节点表示消费者等待消费资源,DATA表示生产者等待生产资源。FULFILLING节点表示生产者正在给等待资源的消费者补给资源,或生产者在等待消费者消费资源。当有线程take/put操作时,查看栈头,如果是空队列,或栈头节点的模式与要放入的节点模式相同;如果是超时等待,判断时间是否小于0,小于0则取消节点等待;如果非超时,则将创建的新节点入栈成功,即放在栈头,自旋等待匹配节点(timed决定超时,不超时);如果匹配返回的是自己,节点取消等待,从栈中移除,并遍历栈移除取消等待的节点;匹配成功,两个节点同时出栈,REQUEST模式返回,匹配到的节点元素(DATA),DATA模式返回节点元素(REQUEST)。如果与栈头节点的模式不同且不为FULFILLING,匹配节点,成功者,两个节点同时出栈,REQUEST模式返回,匹配到的节点元素(DATA),DATA模式返回节点元素(REQUEST)。如果栈头为FULFILLING,找出栈头的匹配节点,栈头与匹配到的节点同时出栈。从分析非公平模式下的TransferStack,可以看出一个REQUEST操作必须同时伴随着一个DATA操作,一个DATA操作必须同时伴随着一个REQUEST操作,这也是同步队列的命名中含Synchronous原因。SynchronousQueue像一个管道,一个操作必须等待另一个操作的发生。

 /** Dual Queue */
    static final class TransferQueue extends Transferer {
        /*
         * This extends Scherer-Scott dual queue algorithm, differing,
         * among other ways, by using modes within nodes rather than
         * marked pointers. The algorithm is a little simpler than
         * that for stacks because fulfillers do not need explicit
         * nodes, and matching is done by CAS'ing QNode.item field
         * from non-null to null (for put) or vice versa (for take).
	 本算法实现拓展了Scherer-Scott双队列算法,不同的是用节点模式,
	 而不是标记指针来区分节点操作类型。这个算法比栈算法的实现简单,
	 因为fulfillers需要明确指定节点,同时匹配节点用CAS操作QNode的
	 元素field即可,put操作从非null到null,反则亦然,take从null到非null。
         */

        /** Node class for TransferQueue. */
        static final class QNode {
            volatile QNode next;          // next node in queue 后继
            volatile Object item;         // CAS'ed to or from null 节点元素
            volatile Thread waiter;       // to control park/unpark 等待线程
            final boolean isData; //是否为DATA模式
            //设置元素和模式
            QNode(Object item, boolean isData) {
                this.item = item;
                this.isData = isData;
            }
            //设置节点的后继
            boolean casNext(QNode cmp, QNode val) {
                return next == cmp &&
                    UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
            }
	    //设置节点的元素
            boolean casItem(Object cmp, Object val) {
                return item == cmp &&
                    UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
            }

            /**
             * Tries to cancel by CAS'ing ref to this as item.
	     取消节点等待,元素指向自己
             */
            void tryCancel(Object cmp) {
                UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
            }
            //是否取消等待
            boolean isCancelled() {
                return item == this;
            }

            /**
             * Returns true if this node is known to be off the queue
             * because its next pointer has been forgotten due to
             * an advanceHead operation.
	     是否出队列
             */
            boolean isOffList() {
                return next == this;
            }

            // Unsafe mechanics
            private static final sun.misc.Unsafe UNSAFE;
            private static final long itemOffset;
            private static final long nextOffset;

            static {
                try {
                    UNSAFE = sun.misc.Unsafe.getUnsafe();
                    Class k = QNode.class;
                    itemOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("item"));
                    nextOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("next"));
                } catch (Exception e) {
                    throw new Error(e);
                }
            }
        }

        /** Head of queue  队列头节点*/
        transient volatile QNode head;
        /** Tail of queue 队列尾节点*/
        transient volatile QNode tail;
        /**
         * Reference to a cancelled node that might not yet have been
         * unlinked from queue because it was the last inserted node
         * when it cancelled.
	 刚入队列的节点,取消等待,但还没有出队列的节点,
         */
        transient volatile QNode cleanMe;

        TransferQueue() {
	    //构造队列
            QNode h = new QNode(null, false); // initialize to dummy node.
            head = h;
            tail = h;
        }

        /**
         * Tries to cas nh as new head; if successful, unlink
         * old head's next node to avoid garbage retention.
	 尝试设置新的队头节点为nh,并比较旧头节点,成功则,解除旧队列头节点的next链接,及指向自己
         */
        void advanceHead(QNode h, QNode nh) {
            if (h == head &&
                UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
                h.next = h; // forget old next
        }

        /**
         * Tries to cas nt as new tail.
	 尝试设置队尾
         */
        void advanceTail(QNode t, QNode nt) {
            if (tail == t)
                UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
        }

        /**
         * Tries to CAS cleanMe slot.
	 尝试设置取消等待节点为val。并比较旧的等待节点是否为cmp
         */
        boolean casCleanMe(QNode cmp, QNode val) {
            return cleanMe == cmp &&
                UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
        }

        /**
         * Puts or takes an item.
	 生产或消费一个元素
         */
        Object transfer(Object e, boolean timed, long nanos) {
            /* Basic algorithm is to loop trying to take either of
             * two actions:
             *
	     基本算法是循环尝试,执行下面两个步中的,其中一个:
             * 1. If queue apparently empty or holding same-mode nodes,
             *    try to add node to queue of waiters, wait to be
             *    fulfilled (or cancelled) and return matching item.
             *
	     1.如果队列为空,或队列中为相同模式的节点,尝试节点入队列等待,
	     直到fulfilled,返回匹配元素,或者由于中断,超时取消等待。
             * 2. If queue apparently contains waiting items, and this
             *    call is of complementary mode, try to fulfill by CAS'ing
             *    item field of waiting node and dequeuing it, and then
             *    returning matching item.
             *
	     2.如果队列中包含节点,transfer方法被一个协同模式的节点调用,
	     则尝试补给或填充等待线程节点的元素,并出队列,返回匹配元素。
             * In each case, along the way, check for and try to help
             * advance head and tail on behalf of other stalled/slow
             * threads.
             *
	     在每一种情况,执行的过程中,检查和尝试帮助其他stalled/slow线程移动队列头和尾节点
             * The loop starts off with a null check guarding against
             * seeing uninitialized head or tail values. This never
             * happens in current SynchronousQueue, but could if
             * callers held non-volatile/final ref to the
             * transferer. The check is here anyway because it places
             * null checks at top of loop, which is usually faster
             * than having them implicitly interspersed.
	     循环开始,首先进行null检查,防止为初始队列头和尾节点。当然这种情况,
	     在当前同步队列中,不可能发生,如果调用持有transferer的non-volatile/final引用,
	     可能出现这种情况。一般在循环的开始,都要进行null检查,检查过程非常快,不用过多担心
	     性能问题。
             */

            QNode s = null; // constructed/reused as needed
	    //如果元素e不为null,则为DATA模式,否则为REQUEST模式
            boolean isData = (e != null);

            for (;;) {
                QNode t = tail;
                QNode h = head;
		//如果队列头或尾节点没有初始化,则跳出本次自旋
                if (t == null || h == null)         // saw uninitialized value
                    continue;                       // spin

                if (h == t || t.isData == isData) { // empty or same-mode
		    //如果队列为空,或当前节点与队尾模式相同
                    QNode tn = t.next;
                    if (t != tail)                  // inconsistent read
		        //如果t不是队尾,非一致性读取,跳出本次自旋
                        continue;
                    if (tn != null) {               // lagging tail
		        //如果t的next不为null,设置新的队尾,跳出本次自旋
                        advanceTail(t, tn);
                        continue;
                    }
                    if (timed && nanos <= 0)        // can't wait
		        //如果超时,且超时时间小于0,则返回null
                        return null;
                    if (s == null)
		        //根据元素和模式构造节点
                        s = new QNode(e, isData);
                    if (!t.casNext(null, s))        // failed to link in
		        //新节点入队列
                        continue;
                    //设置队尾为当前节点
                    advanceTail(t, s);              // swing tail and wait
		    //自旋或阻塞直到节点被fulfilled
                    Object x = awaitFulfill(s, e, timed, nanos);
                    if (x == s) {                   // wait was cancelled
		        //如果s指向自己,s出队列,并清除队列中取消等待的线程节点
                        clean(t, s);
                        return null;
                    }

                    if (!s.isOffList()) {           // not already unlinked
		        //如果s节点已经不再队列中,移除
                        advanceHead(t, s);          // unlink if head
                        if (x != null)              // and forget fields
                            s.item = s;
                        s.waiter = null;
                    }
		    //如果自旋等待匹配的节点元素不为null,则返回x,否则返回e
                    return (x != null) ? x : e;

                } else {                            // complementary-mode
		    //如果队列不为空,且与队头的模式不同,及匹配成功
                    QNode m = h.next;               // node to fulfill
                    if (t != tail || m == null || h != head)
		        //如果h不为当前队头,则返回,即读取不一致
                        continue;                   // inconsistent read
                    Object x = m.item;
                    if (isData == (x != null) ||    // m already fulfilled
                        x == m ||                   // m cancelled
                        !m.casItem(x, e)) {         // lost CAS
			//如果队头后继,取消等待,则出队列
                        advanceHead(h, m);          // dequeue and retry
                        continue;
                    }
		    //否则匹配成功
                    advanceHead(h, m);              // successfully fulfilled
		    //unpark等待线程
                    LockSupport.unpark(m.waiter);
		    //如果匹配节点元素不为null,则返回x,否则返回e,即take操作,返回等待put线程节点元素,
		    //put操作,返回put元素
                    return (x != null) ? x : e;
                }
            }
        }

        /**
         * Spins/blocks until node s is fulfilled.
         *
	 自旋或阻塞直到节点被fulfilled
         * @param s the waiting node,等待节点
         * @param e the comparison value for checking match,检查匹配的比较元素
         * @param timed true if timed wait 是否超时等待
         * @param nanos timeout value 超时等待时间
         * @return matched item, or s if cancelled 成功返回匹配元素,取消返回等待元素
         */
        Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
            /* Same idea as TransferStack.awaitFulfill 这里与栈中的实现思路是一样的*/
	    //获取超时的当前时间,当前线程,自旋数
            long lastTime = timed ? System.nanoTime() : 0;
            Thread w = Thread.currentThread();
            int spins = ((head.next == s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                if (w.isInterrupted())
		    //如果中断,则取消等待
                    s.tryCancel(e);
                Object x = s.item;
                if (x != e)
                    return x;//如果s的节点的元素不相等,则返回x,即s节点指向自身,等待clean
                if (timed) {
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                    if (nanos <= 0) {
		        //如果超时,则取消等待
                        s.tryCancel(e);
                        continue;
                    }
                }
                if (spins > 0)
		    //自旋数减一
                    --spins;
                else if (s.waiter == null)
		     //如果是节点的等待线程为空,则设置为当前线程
                    s.waiter = w;
                else if (!timed)
		    //非超时,则park
                    LockSupport.park(this);
                else if (nanos > spinForTimeoutThreshold)
		    //超时时间大于自旋时间,则超时park
                    LockSupport.parkNanos(this, nanos);
            }
        }

        /**
         * Gets rid of cancelled node s with original predecessor pred.
	 移除队列中取消等待的线程节点
         */
        void clean(QNode pred, QNode s) {
            s.waiter = null; // forget thread
            /*
             * At any given time, exactly one node on list cannot be
             * deleted -- the last inserted node. To accommodate this,
             * if we cannot delete s, we save its predecessor as
             * "cleanMe", deleting the previously saved version
             * first. At least one of node s or the node previously
             * saved can always be deleted, so this always terminates.
	     在任何时候,最后一个节点入队列时,队列中都有可能存在取消等待,但没有删除的节点。
	     为了将这些节点删除,如果我们不能删除最后入队列的节点,我们可以用cleanMe记录它的前驱,
	     删除cleanMe后继节点。s节点和cleanMe后继节点至少一个删除,则停止。
             */
            while (pred.next == s) { // Return early if already unlinked
	        //如果s为队尾节点,且前驱为旧队尾
                QNode h = head;
                QNode hn = h.next;   // Absorb cancelled first node as head
                if (hn != null && hn.isCancelled()) {
		    //如果队头不为空,且取消等待,设置后继为新的队头元素
                    advanceHead(h, hn);
                    continue;
                }
                QNode t = tail;      // Ensure consistent read for tail
                if (t == h)
		    //空队列,则返回
                    return;
                QNode tn = t.next;
                if (t != tail)
		    //如果队尾有变化,跳出循环
                    continue;
                if (tn != null) {
		    //如果队尾后继不为null,则设置新的队尾
                    advanceTail(t, tn);
                    continue;
                }
                if (s != t) {        // If not tail, try to unsplice
                    QNode sn = s.next;
                    if (sn == s || pred.casNext(s, sn))
		        //s节点指向自己,则返回
                        return;
                }
                QNode dp = cleanMe;
                if (dp != null) {    // Try unlinking previous cancelled node
		    //移除前一个取消等待的节点
                    QNode d = dp.next;
                    QNode dn;
                    if (d == null ||               // d is gone or
                        d == dp ||                 // d is off list or
                        !d.isCancelled() ||        // d not cancelled or
                        (d != t &&                 // d not tail and
                         (dn = d.next) != null &&  //   has successor
                         dn != d &&                //   that is on list
                         dp.casNext(d, dn)))       // d unspliced
                        casCleanMe(dp, null);
                    if (dp == pred)
                        return;      // s is already saved node
                } else if (casCleanMe(null, pred))
		    //先前取消等待的节点为null,则将cleanMe设为刚取消等待节点的前驱
                    return;          // Postpone cleaning s
            }
        }

        private static final sun.misc.Unsafe UNSAFE;
        private static final long headOffset;
        private static final long tailOffset;
        private static final long cleanMeOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class k = TransferQueue.class;
                headOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("head"));
                tailOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("tail"));
                cleanMeOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("cleanMe"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

到这里TransferQueue我们已经看完,我们简单的总结一下:
TransferQueue在执行take/put操作时,首先根据元素是否判断当前节点的模式,
如果元素为null则为REQUEST(take)模式,否则为DATA模式(put)。
然后自旋匹配节点,如果队列头或尾节点没有初始化,则跳出本次自旋,
如果队列为空,或当前节点与队尾模式相同,自旋或阻塞直到节点被fulfilled;
如果队列不为空,且与队头的模式不同,及匹配成功,出队列,如果是REQUEST操作,
返回匹配到节点的元素,如果为DATA操作,返回当前节点元素。
TransferQueue相对于TransferStack来说,操作匹配过程更简单,TransferStack为非公平策略下的实现LIFO,TransferQueue是公平策略下的实现FIFO。TransferQueue中的QNODE与TransferStack的SNODE节点有所不同处理后继next,等待线程,节点元素外,SNODE还有一个对应的模式REQUEST,DATA或FULFILLING,而QNODE中用一个布尔值isData来表示模式,这个模式的判断主要根据是元素是否为null,如果为null,则为REQUEST(take)模式,否则为DATA模式(put)。

再来看SynchronousQueue的构造和相关操作
构造:
先看内部Transferer的声明:
 
/**
     * The transferer. Set only in constructor, but cannot be declared
     * as final without further complicating serialization.  Since
     * this is accessed only at most once per public method, there
     * isn't a noticeable performance penalty for using volatile
     * instead of final here.
      transferer在构造函数中初始化,没有进一步的复杂序列化的情况下,不需要
      声明为final。由于transferer至多在public方法中用一次,所以volatile取代final不会有
      太多的性能代价。
     */
    private transient volatile Transferer transferer;

    /**
     * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
     */
    public SynchronousQueue() {
       //默认为非公平,栈
        this(false);
    }

    /**
     * Creates a <tt>SynchronousQueue</tt> with the specified fairness policy.
     *
     * @param fair if true, waiting threads contend in FIFO order for
     *        access; otherwise the order is unspecified.
     */
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue() : new TransferStack();
    }

如果公平则为TransferQueue,否则为TransferStack。
再看其他操作:
put操作:
 /**
     * Adds the specified element to this queue, waiting if necessary for
     * another thread to receive it.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E o) throws InterruptedException {
        if (o == null) throw new NullPointerException();
        if (transferer.transfer(o, false, 0) == null) {
	    //返回为null,则put失败,中断当前线程
            Thread.interrupted();
            throw new InterruptedException();
        }
    }

超时offer:
/**
     * Inserts the specified element into this queue, waiting if necessary
     * up to the specified wait time for another thread to receive it.
     *
     * @return <tt>true</tt> if successful, or <tt>false</tt> if the
     *         specified waiting time elapses before a consumer appears.
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public boolean offer(E o, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (o == null) throw new NullPointerException();
        if (transferer.transfer(o, true, unit.toNanos(timeout)) != null)
            return true;
        if (!Thread.interrupted())
            return false;
        throw new InterruptedException();
    }

再看offer
 /**
     * Inserts the specified element into this queue, if another thread is
     * waiting to receive it.
     *
     * @param e the element to add
     * @return <tt>true</tt> if the element was added to this queue, else
     *         <tt>false</tt>
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        return transferer.transfer(e, true, 0) != null;
    }

take操作:
 /**
     * Retrieves and removes the head of this queue, waiting if necessary
     * for another thread to insert it.
     *
     * @return the head of this queue
     * @throws InterruptedException {@inheritDoc}
     */
    public E take() throws InterruptedException {
        Object e = transferer.transfer(null, false, 0);
        if (e != null)
            return (E)e;
        Thread.interrupted();
        t
hrow new InterruptedException();
    }
超时poll操作:
    /**
     * Retrieves and removes the head of this queue, waiting
     * if necessary up to the specified wait time, for another thread
     * to insert it.
     *
     * @return the head of this queue, or <tt>null</tt> if the
     *         specified waiting time elapses before an element is present.
     * @throws InterruptedException {@inheritDoc}
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        Object e = transferer.transfer(null, true, unit.toNanos(timeout));
        if (e != null || !Thread.interrupted())
            return (E)e;
        throw new InterruptedException();
    }

poll操作:
/**
     * Retrieves and removes the head of this queue, if another thread
     * is currently making an element available.
     *
     * @return the head of this queue, or <tt>null</tt> if no
     *         element is available.
     */
    public E poll() {
        return (E)transferer.transfer(null, true, 0);
    }

是否为空
    /**
     * Always returns <tt>true</tt>.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     *
     * @return <tt>true</tt>
     */
    public boolean isEmpty() {
        return true;
    }

总是返回true,说明同步队列总是为空。
size:
 /**
     * Always returns zero.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     *
     * @return zero.
     */
    public int size() {
        return 0;
    }
remainingCapacity:
    /**
     * Always returns zero.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     *
     * @return zero.
     */
    public int remainingCapacity() {
        return 0;
    }
clear:
    /**
     * Does nothing.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     */
    public void clear() {
    }
contains:
    /**
     * Always returns <tt>false</tt>.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     *
     * @param o the element
     * @return <tt>false</tt>
     */
    public boolean contains(Object o) {
        return false;
    }
remove:
    /**
     * Always returns <tt>false</tt>.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     *
     * @param o the element to remove
     * @return <tt>false</tt>
     */
    public boolean remove(Object o) {
        return false;
    }

    /**
     * Returns <tt>false</tt> unless the given collection is empty.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     *
     * @param c the collection
     * @return <tt>false</tt> unless given collection is empty
     */
    public boolean containsAll(Collection<?> c) {
        return c.isEmpty();
    }

    /**
     * Always returns <tt>false</tt>.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     *
     * @param c the collection
     * @return <tt>false</tt>
     */
    public boolean removeAll(Collection<?> c) {
        return false;
    }

    /**
     * Always returns <tt>false</tt>.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     *
     * @param c the collection
     * @return <tt>false</tt>
     */
    public boolean retainAll(Collection<?> c) {
        return false;
    }

    /**
     * Always returns <tt>null</tt>.
     * A <tt>SynchronousQueue</tt> does not return elements
     * unless actively waited on.
     *
     * @return <tt>null</tt>
     */
    public E peek() {
        return null;
    }

    /**
     * Returns an empty iterator in which <tt>hasNext</tt> always returns
     * <tt>false</tt>.
     *
     * @return an empty iterator
     */
    public Iterator<E> iterator() {
        return Collections.emptyIterator();
    }


从上面这些方法可以出,由于同步队列总是为空,所以size为0.剩余容量为0,peek返回false,contains返回false,remove返回false。
drainTo操作:
**
     * @throws UnsupportedOperationException {@inheritDoc}
     * @throws ClassCastException            {@inheritDoc}
     * @throws NullPointerException          {@inheritDoc}
     * @throws IllegalArgumentException      {@inheritDoc}
     */
    public int drainTo(Collection<? super E> c) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        int n = 0;
        E e;
        while ( (e = poll()) != null) {
            c.add(e);
            ++n;
        }
        return n;
    }

    /**
     * @throws UnsupportedOperationException {@inheritDoc}
     * @throws ClassCastException            {@inheritDoc}
     * @throws NullPointerException          {@inheritDoc}
     * @throws IllegalArgumentException      {@inheritDoc}
     */
    public int drainTo(Collection<? super E> c, int maxElements) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        int n = 0;
        E e;
        while (n < maxElements && (e = poll()) != null) {
            c.add(e);
            ++n;
        }
        return n;
    }


下面再看序列化与反序列化:
/*
     * To cope with serialization strategy in the 1.5 version of
     * SynchronousQueue, we declare some unused classes and fields
     * that exist solely to enable serializability across versions.
     * These fields are never used, so are initialized only if this
     * object is ever serialized or deserialized.
     */

    static class WaitQueue implements java.io.Serializable { }
    static class LifoWaitQueue extends WaitQueue {
        private static final long serialVersionUID = -3633113410248163686L;
    }
    static class FifoWaitQueue extends WaitQueue {
        private static final long serialVersionUID = -3623113410248163686L;
    }
    private ReentrantLock qlock;
    private WaitQueue waitingProducers;
    private WaitQueue waitingConsumers;

    /**
     * Save the state to a stream (that is, serialize it).
     *
     * @param s the stream
     */
    private void writeObject(java.io.ObjectOutputStream s)
        throws java.io.IOException {
        boolean fair = transferer instanceof TransferQueue;
        if (fair) {
            qlock = new ReentrantLock(true);
            waitingProducers = new FifoWaitQueue();
            waitingConsumers = new FifoWaitQueue();
        }
        else {
            qlock = new ReentrantLock();
            waitingProducers = new LifoWaitQueue();
            waitingConsumers = new LifoWaitQueue();
        }
        s.defaultWriteObject();
    }

    private void readObject(final java.io.ObjectInputStream s)
        throws java.io.IOException, ClassNotFoundException {
        s.defaultReadObject();
        if (waitingProducers instanceof FifoWaitQueue)
            transferer = new TransferQueue();
        else
            transferer = new TransferStack();
    }

序列化与反序列的作用主要是判断同步队列到底是公平的,还是非公平的。

总结:
TransferQueue在执行take/put操作时,首先根据元素是否判断当前节点的模式,如果元素为null则为REQUEST(take)模式,否则为DATA模式(put)。然后自旋匹配节点,如果队列头或尾节点没有初始化,则跳出本次自旋,如果队列为空,或当前节点与队尾模式相同,自旋或阻塞直到节点被fulfilled;如果队列不为空,且与队头的模式不同,及匹配成功,出队列,如果是REQUEST操作,返回匹配到节点的元素,如果为DATA操作,返回当前节点元素。TransferQueue相对于TransferStack来说,操作匹配过程更简单,TransferStack为非公平策略下的实现LIFO,TransferQueue是公平策略下的实现FIFO。TransferQueue中的QNODE与TransferStack的SNODE节点有所不同处理后继next,等待线程,节点元素外,SNODE还有一个对应的模式REQUEST,DATA或FULFILLING,而QNODE中用一个布尔值isData来表示模式,这个模式的判断主要根据是元素是否为null,如果为null,则为REQUEST(take)模式,否则为DATA模式(put)。SynchronousQueue根据构造公平参数,确定transferer为TransferStack还是TransferQueue,默认为TransferStack,SynchronousQueue的put/offer和take/poll统一委托给transferer,即通过TransferStack和TransferQueue的transfer(Object e, boolean timed, long nanos) 方法。由于同步队列一个take伴随着一个put,反之亦然,所有队列总是为空,所以size为0.剩余容量为0,peek返回false,contains返回false,remove返回false。



1
0
分享到:
评论

相关推荐

    java 同步器SynchronousQueue详解及实例

    主要介绍了java 同步器SynchronousQueue详解及实例的相关资料,需要的朋友可以参考下

    SynchronousQueue实现原理.pdf

    转载的一篇博客资源

    SynchronousQueue详解

    SynchronousQueue是BlockingQueue的一种,所以SynchronousQueue是线程安全的。SynchronousQueue和其他的BlockingQueue不同的是SynchronousQueue的capacity是0。即SynchronousQueue不存储任何元素。 也就是说...

    SynchronousQueue核心属性和方法源码的分析

    SynchronousQueue核心属性和方法源码的分析的代码

    2004_DISC_dual_DS.pdf

    SynchronousQueue 底层算法相关实现论文

    汪文君高并发编程实战视频资源全集

    │ 高并发编程第二阶段46讲、ClassLoader链接阶段(验证,准备,解析)过程详细介绍.mp4 │ 高并发编程第二阶段47讲、ClassLoader初始化阶段详细介绍clinit.mp4 │ 高并发编程第二阶段48讲、JVM内置三大类加载器...

    汪文君高并发编程实战视频资源下载.txt

    │ 高并发编程第二阶段46讲、ClassLoader链接阶段(验证,准备,解析)过程详细介绍.mp4 │ 高并发编程第二阶段47讲、ClassLoader初始化阶段详细介绍clinit.mp4 │ 高并发编程第二阶段48讲、JVM内置三大类加载器...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版

    7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...

    java并发工具包 java.util.concurrent中文版用户指南pdf

    7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版.pdf

    同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航 映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...

    java并发包资源

    7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...

    一个小的java Demo , 非常适合Java初学者学习阅读.rar

    同步队列 SynchronousQueue,阻塞双端队列 BlockingDeque, 链阻塞双端队列 LinkedBlockingDeque,并发 Map(映射) ConcurrentMap, 并发导航映射 ConcurrentNavigableMap,交换机 Exchanger, 信号量 Semaphore,执行器...

    常见的Java笔试题-JVM-JUC-Core:JUCJVM核心知识点

    SynchronousQueue Callable接口 阻塞队列的应用——生产者消费者 传统模式 阻塞队列模式 阻塞队列的应用——线程池 线程池基本概念 线程池三种常用创建方式 线程池创建的七个参数 线程池底层原理

    第7章-JUC多线程v1.1.pdf

    首先判断任务是否为空, 为空抛出空指针异常, 否则执行下一个判断: 当前线程数量是否小于核心线程池线程数量, 是,则执行addIfUbderCorePollSize(command)方法, 在核心线程池中创建新的线程, 并且执行这个任务 ...

    JAVA高并发_学习笔记

    JAVA学习高并发的学习笔记。...BlockingQueue:ArrayBlockingQueue , DelayQueue , LinkedBlockingDeque , LinkedBlockingQueue , LinkedTransferQueue , PriorityBlockingQueue , SynchronousQueue

    java线程池概念.txt

    默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用;即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过...

    java核心知识点整理.pdf

    1. 目录 1. 2. 目录 .........................................................................................................................................................1 JVM .........................

    JAVA核心知识点整理(有效)

    1. 目录 1. 2. 目录 .........................................................................................................................................................1 JVM ........................

Global site tag (gtag.js) - Google Analytics