privatestaticvoidtestTestCyclicBarrier(){ Thread[] threads = new Thread[5]; CyclicBarrier barrier = new CyclicBarrier(threads.length, () -> { System.out.println("============ wake up all ============"); });
for (int i = 0; i < threads.length; i++) { threads[i] = new Thread(()->{ String name = Thread.currentThread().getName(); long l = System.currentTimeMillis(); try { System.out.println(name + ": await"); barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println(name + ": wake up wait:" + (System.currentTimeMillis() - l)); }); threads[i].start(); } }
/** 实现的Lock */ privatefinal ReentrantLock lock = new ReentrantLock(); /** Condition用来实现wait */ privatefinal Condition trip = lock.newCondition(); /** 等待的屏障数 */ privatefinalint parties; /* 到达屏障要执行的Runnable */ privatefinal Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation();
/** * Number of parties still waiting. Counts down from parties to 0 * on each generation. It is reset to parties on each new * generation or when broken. */ privateint count;
privateintdowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { /**获取CyclicBaerrier的内部锁*/ final ReentrantLock lock = this.lock; /**获取锁*/ lock.lock(); try { /**存储当前的Generation*/ final Generation g = generation; /**判断当前的屏障是否被破坏,如果破坏则抛出BrokenBarrierException异常*/ if (g.broken) thrownew BrokenBarrierException(); /**判断当前线程是否被interrupted,如果被打断,则breakBarrier破坏屏障*/ if (Thread.interrupted()) { breakBarrier(); thrownew InterruptedException(); } /**记录当前屏障等待个数*/ int index = --count; if (index == 0) { // 最后一个预留到达屏障的线程 boolean ranAction = false; try { final Runnable command = barrierCommand; /**执行barrierCommand指令*/ if (command != null) command.run(); ranAction = true; /**执行下一个Generation*/ nextGeneration(); return0; } finally { /**如果barrierCommand执行失败,进行屏障破坏处理*/ if (!ranAction) breakBarrier(); } }
// 如果当前线程不是最后一个到达的线程 for (;;) { try { if (!timed)///调用Condition的await()方法阻塞 trip.await(); elseif (nanos > 0L)///调用Condition的awaitNanos()方法阻塞 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { /**如果当前线程被中断,则判断是否有其他线程已经使屏障破坏。若没有则进行屏障破坏处理,并抛出异常;否则再次中断当前线程*/ if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } }
privatevoidnextGeneration(){ // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }
privatestaticvoidtestSemaphore()throws InterruptedException { int solt = 10; Map<Integer, List<String>> washRoom= new HashMap<>(solt);
for (int i = 0; i < solt; i++) { washRoom.put(i, new ArrayList<String>()); } // 是有10个槽位,所以每次是能有10个线程执行 Semaphore semaphore = new Semaphore(solt); Runnable r = () -> {
try { // 如果没有获取到坑位,则阻塞 semaphore.acquire(); int name = Integer.parseInt(Thread.currentThread().getName()); int index = name % 10; List<String> list = washRoom.get(index); list.add(name + ""); TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); }finally { semaphore.release(); } }; Thread[] threads = new Thread[solt * 3]; for (int i = 0; i < threads.length; i++) { Thread thread = new Thread(r); thread.setName(i + ""); threads[i] = thread; thread.start(); }
for (Thread t : threads) { t.join(); } System.out.println(washRoom); }
详解
构造函数
1 2 3 4 5 6 7
publicSemaphore(int permits){ sync = new NonfairSync(permits); }
publicSemaphore(int permits, boolean fair){ sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
for (int i = 0; i < taskers.length; i++) { // 执行任务 Thread thread = new Thread(taskers[i]); thread.start(); } }
privatestaticvoidbuild(Tasker[] taskers, int lo, int hi, Phaser phaser){ if (hi - lo > TASKS_PER_PHASER) { for (int i = lo; i < hi; i += TASKS_PER_PHASER) { int j = Math.min(i + TASKS_PER_PHASER, hi); build(taskers, i, j, new Phaser(phaser)); } } else { for (int i = lo; i < hi; ++i) taskers[i] = new Tasker(i, phaser); }
publicstaticvoidexchangerTestExample()throws Exception { final Exchanger<String> exchanger = new Exchanger<String>(); Runnable r = () ->{ try { String myName = Thread.currentThread().getName(); Random random = new Random(); TimeUnit.SECONDS.sleep(random.nextInt(5)); System.out.println(myName + " want to exchange data and i waiting"); String hisName = exchanger.exchange(Thread.currentThread().getName()); System.out.println(myName + ": " + hisName); } catch (InterruptedException e) { e.printStackTrace(); } }; Thread[] threads = new Thread[2]; for (int i = 0; i < 2; i++) { threads[i] = new Thread(r); threads[i].setName("exchangerTestExample-" + i); threads[i].start(); } }
输出
1 2 3 4 5
exchangerTestExample-1 want to exchange data and i waiting [...] exchangerTestExample-0 want to exchange data and i waiting exchangerTestExample-0: exchangerTestExample-1 exchangerTestExample-1: exchangerTestExample-0
publicExchanger(){ participant = new Participant(); }
核心方法
1 2 3 4 5
//除非当前线程被中断,否则一直等待另一个线程到达这个交换点,然后将交换的数据 x传输给它,并收到另一个线程传过来的数据。 public V exchange(V x)throws InterruptedException // 和上一个方法功能基本一样,只不过这个方法增加了等待超时时间 public V exchange(V x, long timeout, TimeUnit unit)throws InterruptedException, TimeoutException
Participant 和 Node
Participant 是将 Node 封装成了 ThreadLocal 对象。
1 2 3 4 5 6 7 8 9 10 11 12 13
staticfinalclassParticipantextendsThreadLocal<Node> { public Node initialValue(){ returnnew Node(); } } // Node 结构如下 @sun.misc.Contended staticfinalclassNode{ int index; // arena的下标,多个槽位的时候利用 int bound; // 上一次记录的Exchanger.bound int collides; // 在当前bound下CAS失败的次数; int hash; // 用于自旋; Object item; // 这个线程的当前项,也就是需要交换的数据; volatile Object match; //做releasing操作的线程传递的项; volatile Thread parked; //挂起时设置线程值,其他情况下为null; }
这里我们讲几个概念
index:arena的下标,多个槽位的时候利用;
bound:上一次记录的Exchanger.bound;
collides:在当前bound下CAS失败的次数;
hash:伪随机数,用于自旋;
item:这个线程的当前项,也就是需要交换的数据;
match:交换的数据;
parked:挂起时设置线程值,其他情况下为null;
我们现阶段先对这个几个有个印象,接下来我们详细分析一下交换的过程。
exchange 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
public V exchange(V x)throws InterruptedException { Object v; // NULL_ITEM 是一个替代的对象,这里可以简单理解为默认值 // 主要是因为 slotExchange 和 arenaExchange 并没有做非空的判断 Object item = (x == null) ? NULL_ITEM : x; // translate null args if ((arena != null || (v = slotExchange(item, false, 0L)) == null) && ((Thread.interrupted() || // disambiguates null return (v = arenaExchange(item, false, 0L)) == null))) thrownew InterruptedException(); return (v == NULL_ITEM) ? null : (V)v; } // /** * Elimination array; null until enabled (within slotExchange). * Element accesses use emulation of volatile gets and CAS. */ privatevolatile Node[] arena;