`
Donald_Draper
  • 浏览: 951929 次
社区版块
存档分类
最新评论

CyclicBarrier详解

    博客分类:
  • JUC
阅读更多
AtomicInteger解析:http://donald-draper.iteye.com/blog/2359555
锁持有者管理器AbstractOwnableSynchronizer:http://donald-draper.iteye.com/blog/2360109
AQS线程挂起辅助类LockSupport:http://donald-draper.iteye.com/blog/2360206
AQS详解-CLH队列,线程等待状态:http://donald-draper.iteye.com/blog/2360256
AQS-Condition详解:http://donald-draper.iteye.com/blog/2360381
可重入锁ReentrantLock详解:http://donald-draper.iteye.com/blog/2360411
CountDownLatch使用场景:http://donald-draper.iteye.com/blog/2348106
CountDownLatch详解:http://donald-draper.iteye.com/blog/2360597

package java.util.concurrent;
import java.util.concurrent.locks.*;

/**
 * A synchronization aid that allows a set of threads to all wait for
 * each other to reach a common barrier point.  CyclicBarriers are
 * useful in programs involving a fixed sized party of threads that
 * must occasionally wait for each other. The barrier is called
 * [i]cyclic[/i] because it can be re-used after the waiting threads
 * are released.
 *
 同步工具CyclicBarrier,一个集合线程,等待每一个线程达到共同的屏障点。
CyclicBarriers对一个复杂的线程集合必须互相等待完成任务,场景非常有用。
同步工具的屏障可以循环利用,因为在所有等待线程释放锁时,他可以被重新使用。


 * <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command
 * that is run once per barrier point, after the last thread in the party
 * arrives, but before any threads are released.
 * This [i]barrier action[/i] is useful
 * for updating shared-state before any of the parties continue.
 *
CyclicBarrier的构造函数中,有一个带Runnable,在所有线程到达屏障点,并且共享锁没有完全释放,
这个功能,对于在其他线程继续执行任务前,更新共享状态非常有用。

 * <p><b>Sample usage:</b> Here is an example of
 *  using a barrier in a parallel decomposition design:
 * <pre>
 简单的一个实例用,在并行的分解任务中,使用barrier
 * class Solver {
 *   final int N;
 *   final float[][] data;
 *   final CyclicBarrier barrier;
 *
 *   class Worker implements Runnable {
 *     int myRow;
 *     Worker(int row) { myRow = row; }
 *     public void run() {
 *       while (!done()) {
 *         processRow(myRow);
 *
 *         try {
 *           barrier.await();
 *         } catch (InterruptedException ex) {
 *           return;
 *         } catch (BrokenBarrierException ex) {
 *           return;
 *         }
 *       }
 *     }
 *   }
 *
 *   public Solver(float[][] matrix) {
 *     data = matrix;
 *     N = matrix.length;
 *     barrier = new CyclicBarrier(N,
 *                                 new Runnable() {
 *                                   public void run() {
 *                                     mergeRows(...);
 *                                   }
 *                                 });
 *     for (int i = 0; i < N; ++i)
 *       new Thread(new Worker(i)).start();
 *
 *     waitUntilDone();
 *   }
 * }
 * </pre>
 * Here, each worker thread processes a row of the matrix then waits at the
 * barrier until all rows have been processed. When all rows are processed
 * the supplied {@link Runnable} barrier action is executed and merges the
 * rows. If the merger
 * determines that a solution has been found then <tt>done()</tt> will return
 * <tt>true</tt> and each worker will terminate.
 上述实例,描述的每个线程处理矩阵的每一行数据,当线程处理完一行数据时,等待其他线程处理完各自
 的一行数据。当所有的线程处理完各自行数据时,屏障点线程Runnable,执行合并矩阵的行数据。
 当屏障点线程Runnable,决定执行合并是,每个线程的done函数返回true,结束每个线程工作。

 * <p>If the barrier action does not rely on the parties being suspended when
 * it is executed, then any of the threads in the party could execute that
 * action when it is released. To facilitate this, each invocation of
 * {@link #await} returns the arrival index of that thread at the barrier.
 * You can then choose which thread should execute the barrier action, for
 * example:
屏障点action动作线程的执行,不能依赖于组线程中将要暂定的线程,分组中的每一个线程,都可以
执行action,在共享锁被释放之前。为了优化action的执行,我们可以利用,在每个线程调用await方法时,
返回线程到达屏障点的index,来决定,那个线程执行屏障动作。
 * <pre>  if (barrier.await() == 0) {
         //最后一个到达屏障点的线程,执行屏障action
 *     // log the completion of this iteration
 *   }</pre>
 *
 * <p>The <tt>CyclicBarrier</tt> uses an all-or-none breakage model
 * for failed synchronization attempts: If a thread leaves a barrier
 * point prematurely because of interruption, failure, or timeout, all
 * other threads waiting at that barrier point will also leave
 * abnormally via {@link BrokenBarrierException} (or
 * {@link InterruptedException} if they too were interrupted at about
 * the same time).
 *
CyclicBarrier对于失败同步的尝试,用all-or-none breakage model:
如果一个线程,因为中断,失败,超时,永久的离开屏障点,那么其他在屏障点等待的线程,
通过BrokenBarrierException,abnormally离开。

 * <p>Memory consistency effects: Actions in a thread prior to calling
 * {@code await()}
 * [url=package-summary.html#MemoryVisibility]<i>happen-before</i>[/url]
 * actions that are part of the barrier action, which in turn
 * <i>happen-before</i> actions following a successful return from the
 * corresponding {@code await()} in other threads.
 *
 内存一致性:actions优先call await函数,这个基于内存可见机制-happen-before法则。
 屏障点的分组线程,返回happen-before,协调分组线程工作的线程,await的成功返回。
 * @since 1.5
 * @see CountDownLatch
 *
 * @author Doug Lea
 */
public class CyclicBarrier {
    /**
     * Each use of the barrier is represented as a generation instance.
     * The generation changes whenever the barrier is tripped, or
     * is reset. There can be many generations associated with threads
     * using the barrier - due to the non-deterministic way the lock
     * may be allocated to waiting threads - but only one of these
     * can be active at a time (the one to which <tt>count</tt> applies)
     * and all the rest are either broken or tripped.
     * There need not be an active generation if there has been a break
     * but no subsequent reset.
     */
    每次屏障点,表示一代实例。当屏障点被打开或者重置时,generation将会改变。
    由于锁以不确定的方式,分配给等待线程,线程可以多代屏障点的方式,使用barrier。
    如果线程组存在break,并且没有reset,则不需要激活一代。
    Generation可以这么理解,当有线程有多个分组,一个分组执行完,执行下一组;每一组
    我们可以理解为Generation,当线程组出现break,且没有reset,则Generation不会被激活。
    private static class Generation {
        boolean broken = false;
	
    }

    /** The lock for guarding barrier entry */
    屏障点保护锁
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    条件等待,直到所有的线程打开锁,
    private final Condition trip = lock.newCondition();
    /** The number of parties */
    共享锁数量
    private final int parties;
    /* The command to run when tripped */
    障碍点执行的命令
    private final Runnable barrierCommand;
    /** The current generation */
    当前代
    private Generation generation = new Generation();

    /**
     * Number of parties still waiting. Counts down from parties to 0
     * on each generation.  It is reset to parties on each new
     * generation or when broken.
     */
    表示分组中,还有多少个在等待。在每一代,count从parties to 0。
    在每一次创建新生代中或broken时,count重置为parties
    private int count;
 }


先看构造:
 
/**
     * Creates a new <tt>CyclicBarrier</tt> that will trip when the
     * given number of parties (threads) are waiting upon it, and which
     * will execute the given barrier action when the barrier is tripped,
     * performed by the last thread entering the barrier.
     *常见一个屏障点,当所有parties线程在等待时,将会打开,同时最后一个进入
     屏障点的线程,将会执行barrierAction。
     * @param parties the number of threads that must invoke {@link #await}
     *        before the barrier is tripped
     * @param barrierAction the command to execute when the barrier is
     *        tripped, or {@code null} if there is no action
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    /**
     * Creates a new <tt>CyclicBarrier</tt> that will trip when the
     * given number of parties (threads) are waiting upon it, and
     * does not perform a predefined action when the barrier is tripped.
     *
     * @param parties the number of threads that must invoke {@link #await}
     *        before the barrier is tripped
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
    public CyclicBarrier(int parties) {
        this(parties, null);
    }

线程代broken处理
     
 /**
     * Sets current barrier generation as broken and wakes up everyone.
     * Called only while holding lock.
     */
    当线程持有锁,设置当前线程代broken,唤醒当前代线程
    private void breakBarrier() {
        //
        generation.broken = true;
	//重置共享锁状态
        count = parties;
	//唤醒所有在屏障点,等待的线程
        trip.signalAll();
    }

创建下一代
   
  /**
     * Updates state on barrier trip and wakes up everyone.
     * Called only while holding lock.
     */
     线程持有锁,更新屏障点状态,唤醒所有等待,线程
    private void nextGeneration() {
        // signal completion of last generation
	//唤醒上一代,完成的线程
        trip.signalAll();
        // set up next generation
	//重置共享锁状态
        count = parties;
	//创建下一代
        generation = new Generation();
    }
}

我们来看屏障等待

/**
     * Waits until all {@linkplain #getParties parties} have invoked
     * <tt>await</tt> on this barrier.
     * 等待所享有的线程到达屏障点
     * <p>If the current thread is not the last to arrive then it is
     * disabled for thread scheduling purposes and lies dormant until
     * one of the following things happens:
     当线程不是最后一个到达屏障点,线程将会不会被调度,直到以下情况发生
     * [list]
     * <li>The last thread arrives; or最后一个线程到达屏障点
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread; or其他线程中断当前线程
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * one of the other waiting threads; or其他等待线程,被中断
     * <li>Some other thread times out while waiting for barrier; or
     * <li>Some other thread invokes {@link #reset} on this barrier.
     * [/list]
     *一些线程等待屏障点超时,或其他以下线程调用reset
     * <p>If the current thread:
     * [list]
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
     * [/list]
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     当前线程带着中断状态,在等待屏障点,当中断异常抛出时,当前线程中断消除。
     * <p>If the barrier is {@link #reset} while any thread is waiting,
     * or if the barrier {@linkplain #isBroken is broken} when
     * <tt>await</tt> is invoked, or while any thread is waiting, then
     * {@link BrokenBarrierException} is thrown.
     *当其他线程在等待,如果屏障点被重置,或broke,则抛出BrokenBarrierException
     * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
     * then all other waiting threads will throw
     * {@link BrokenBarrierException} and the barrier is placed in the broken
     * state.
     *在等待的过程中,如果其他线程中断,则抛出BrokenBarrierException,屏障点
     设置为broken状态。
     * <p>If the current thread is the last thread to arrive, and a
     * non-null barrier action was supplied in the constructor, then the
     * current thread runs the action before allowing the other threads to
     * continue.
     如果当前线程,是最后一个到达屏障点的,如果屏障点动作线程不为null,
     则执行action,在下一代线程组执行任务前。
     * If an exception occurs during the barrier action then that exception
     * will be propagated in the current thread and the barrier is placed in
     * the broken state.
     *如果在执行action的过程中,出现异常,则当前线程将会抛出异常,屏障点处于破位状态
     * @return the arrival index of the current thread, where index
     *         <tt>{@link #getParties()} - 1</tt> indicates the first
     *         to arrive and zero indicates the last to arrive
     * @throws InterruptedException if the current thread was interrupted
     *         while waiting
     * @throws BrokenBarrierException if [i]another[/i] thread was
     *         interrupted or timed out while the current thread was
     *         waiting, or the barrier was reset, or the barrier was
     *         broken when {@code await} was called, or the barrier
     *         action (if present) failed due an exception.
     */
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
	    //委托给dowait
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen;
        }
    }
    
  /**
     * Main barrier code, covering the various policies.
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
	    //获取线程代
            final Generation g = generation;
            //如果屏障点破位,则抛出BrokenBarrierException
            if (g.broken)
                throw new BrokenBarrierException();
            //如果线程中断,则设置屏障点破位,重置count为parties,
	    //唤醒所有在屏障点,等待的线程,抛出中断异常
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
          //共享锁数量,自减
           int index = --count;
           if (index == 0) {  // tripped
               boolean ranAction = false;
               try {
                   final Runnable command = barrierCommand;
                   if (command != null)
		       //如果所有线程达到屏障点,则执行action
                       command.run();
                   ranAction = true;
		   //创建一下代
                   nextGeneration();
		   //返回0,屏障点解除
                   return 0;
               } finally {
                   if (!ranAction)
                       breakBarrier();
               }
           }
            
            // loop until tripped, broken, interrupted, or timed out
	    //自旋,直到所有线程到达屏障点,当前代broken,中断,或超时
            for (;;) {
                try {
		    //非超时等待await,否则awaitNanos
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
		   //如果超时,解除屏障点
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

小节:
线程到达屏障点时,首先检查线程代,有没有broken,如果broken,
则抛出BrokenBarrierException,如果线程中断,则当前代broken,
重置共享锁状态,唤醒所有等待线程。如果上述条件不满足,则释放
count,判断是否当前代线程,是否都到达屏障点,如果是,判断action
是否为null,不为null,则执行action;当释放count,当前代线程,仍有在执行的,
自旋等待屏障点条件trip,如果是超时等待,则判断时间是否超时,超时则breakBarrier。

再看
 public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

与await基本相同,都是委托给dowait
/**
     * Returns the number of parties required to trip this barrier.
     *
     * @return the number of parties required to trip this barrier
     */
  public int getParties() {
        return parties;
    }
   /**
     * Queries if this barrier is in a broken state.
     *
     * @return {@code true} if one or more parties broke out of this
     *         barrier due to interruption or timeout since
     *         construction or the last reset, or a barrier action
     *         failed due to an exception; {@code false} otherwise.
     */
    public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }
     /**
     * Resets the barrier to its initial state.  If any parties are
     * currently waiting at the barrier, they will return with a
     * {@link BrokenBarrierException}. Note that resets [i]after[/i]
     * a breakage has occurred for other reasons can be complicated to
     * carry out; threads need to re-synchronize in some other way,
     * and choose one to perform the reset.  It may be preferable to
     * instead create a new barrier for subsequent use.
     */
    
    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }
     /**
     * Returns the number of parties currently waiting at the barrier.
     * This method is primarily useful for debugging and assertions.
     *返回在屏障点等待线程数
     * @return the number of parties currently blocked in {@link #await}
     */
    public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }

总结:
屏障点思想,当每个线程完成任务时,自旋等待条件Condition trip,释放共享锁,count减1;当线程代的最后一个线程到达屏障点时,唤醒线程代中所有等待的线程,
如果有action,执行action,然后创建下一代线程。如果在线程代未结束之前,有等待线程中断或超时,则结束当前代,唤醒所有等待线程,重置count为parties。
0
0
分享到:
评论

相关推荐

    28 人齐了,一起行动—CyclicBarrier详解.pdf

    Java并发编程学习宝典(漫画版),Java并发编程学习宝典(漫画版)Java并发编程学习宝典(漫画版)Java并发编程学习宝典(漫画版)Java并发编程学习宝典(漫画版)Java并发编程学习宝典(漫画版)Java并发编程学习...

    JAVA CyclicBarrier类详解.docx

    在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时CyclicBarrier很有用。因为该barrier在释放等待线程后可以重用,所以称它为 循环 的barrier。 CyclicBarrier 支持一个可选的 Runnable命令,...

    Java并发编程原理与实战

    并发工具类CyclicBarrier 详解.mp4 并发工具类Semaphore详解.mp4 并发工具类Exchanger详解.mp4 CountDownLatch,CyclicBarrier,Semaphore源码解析.mp4 提前完成任务之FutureTask使用.mp4 Future设计模式实现(实现...

    龙果 java并发编程原理实战

    第38节并发工具类CyclicBarrier 详解00:11:52分钟 | 第39节并发工具类Semaphore详解00:17:27分钟 | 第40节并发工具类Exchanger详解00:13:47分钟 | 第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57...

    Java 并发编程原理与实战视频

    第38节并发工具类CyclicBarrier 详解00:11:52分钟 | 第39节并发工具类Semaphore详解00:17:27分钟 | 第40节并发工具类Exchanger详解00:13:47分钟 | 第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57...

    龙果java并发编程完整视频

    第38节并发工具类CyclicBarrier 详解00:11:52分钟 | 第39节并发工具类Semaphore详解00:17:27分钟 | 第40节并发工具类Exchanger详解00:13:47分钟 | 第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57...

    java并发编程

    第38节并发工具类CyclicBarrier 详解00:11:52分钟 | 第39节并发工具类Semaphore详解00:17:27分钟 | 第40节并发工具类Exchanger详解00:13:47分钟 | 第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57...

    CyclicBarrier的用法

    NULL 博文链接:https://hubowei1.iteye.com/blog/2312471

    Java并发编程(CyclicBarrier)实例详解

    主要介绍了Java并发编程(CyclicBarrier)实例详解的相关资料,JAVA编写并发程序的时候,我们需要仔细去思考一下并发流程的控制,如何让各个线程之间协作完成某项工作。

    CountDownLatch 和 CyclicBarrier 的运用(含AQS详解)

    CountDownLatch 和 CyclicBarrier 为线程同步的辅助工具,通过它可以做到使一条线程一直阻塞等待,直到其他线程完成其所处理的任务。

    详解java CountDownLatch和CyclicBarrier在内部实现和场景上的区别

    主要介绍了详解java CountDownLatch和CyclicBarrier在内部实现和场景上的区别,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

    Java并发编程:CountDownLatch与CyclicBarrier和Semaphore的实例详解

    主要介绍了Java并发编程:CountDownLatch与CyclicBarrier和Semaphore的实例详解的相关资料,需要的朋友可以参考下

    java多线程之CyclicBarrier的使用方法

    主要介绍了java多线程之CyclicBarrier的使用方法的相关资料,希望通过本文能帮助到大家,让大家理解掌握这部分内容,需要的朋友可以参考下

    Java多线程下的其他组件之CyclicBarrier、Callable、Future和FutureTask详解

    主要介绍了Java多线程下的其他组件之CyclicBarrier、Callable、Future和FutureTask详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

    Java并发编程一CountDownLatch、CyclicBarrier、Semaphore初使用

    Java并发之AQS详解 CountDownLatch CountDownLatch可以实现一个线程等待多个线程、多个线程等待一个线程、多个线程等待多个线程(这里不涉及)。 我们首先来看看怎么实现一个线程等待多个线程吧。 工厂中,对产品...

    java并发工具包详解

    13. 栅栏 CyclicBarrier 14. 交换机 Exchanger 15. 信号量 Semaphore 16. 执行器服务 ExecutorService 17. 线程池执行者 ThreadPoolExecutor 18. 定时执行者服务 ScheduledExecutorService 19. 使用 ForkJoinPool ...

    java并发工具包 java.util.concurrent中文版用户指南pdf

    13. 栅栏 CyclicBarrier 14. 交换机 Exchanger 15. 信号量 Semaphore 16. 执行器服务 ExecutorService 17. 线程池执行者 ThreadPoolExecutor 18. 定时执行者服务 ScheduledExecutorService 19. 使用 ForkJoinPool ...

    JAVA 多线程之信号量(Semaphore)实例详解

    主要介绍了JAVA 多线程之信号量(Semaphore)实例详解的相关资料,需要的朋友可以参考下

Global site tag (gtag.js) - Google Analytics