CompletionService
将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者 submit 执行的任务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。例如,CompletionService 可以用来管理异步 IO ,执行读操作的任务作为程序或系统的一部分提交,然后,当完成读操作时,会在程序的不同部分执行其他操作,执行操作的顺序可能与所请求的顺序不同。 通常,CompletionService 依赖于一个单独的 Executor 来实际执行任务,在这种情况下,CompletionService 只管理一个内部完成队列。ExecutorCompletionService 类提供了此方法的一个实现。
public class CompletionServiceTest { public static void main(String[] args){ ExecutorService service=Executors.newCachedThreadPool(); CompletionService<String> completion=new ExecutorCompletionService<String>(service); for(int i=0;i<10;i++){ completion.submit(new MyCompletionService(i)); } try{ //会按照异步IO完成的顺序 for(int i=0;i<10;i++){ System.out.PRintln(completion.take().get()); } }catch(Exception e){ e.printStackTrace(); } service.shutdown(); }}CountDownLatch
一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier。CountDownLatch 是一个通用同步工具,它有很多用途。将计数 1 初始化的 CountDownLatch 用作一个简单的开/关锁存器, 或入口:在通过调用 countDown() 的线程打开入口前,所有调用 await 的线程都一直在入口处等待。用 N 初始化的 CountDownLatch 可以使一个线程在 N 个线程完成某项操作之前一直等待,或者使其在某项操作完成 N 次之前一直等待。 CountDownLatch 的一个有用特性是,它不要求调用 countDown 方法的线程等到计数到达零时才继续,而在所有线程都能通过之前,它只是阻止任何线程继续通过一个 await。 CountDownLatch最重要的方法是countDown()和await(),前者主要是倒数一次,后者是等待倒数到0,如果没有到达0,就只有阻塞等待了。
public class CountDownLatchTest { public static void main(String[] args) throws InterruptedException{ final CountDownLatch begin=new CountDownLatch(1); final CountDownLatch end=new CountDownLatch(10); final ExecutorService service=Executors.newCachedThreadPool(); for(int i=0;i<10;i++){ final int NO=i+1; Runnable run=new Runnable(){ public void run(){ try{ begin.await(); Thread.sleep((long)(Math.random()*10000)); System.out.println("No."+NO+"到达终点!"); }catch(InterruptedException e){ e.printStackTrace(); }finally{ end.countDown(); } } }; service.submit(run); } System.out.println("比赛开始!"); begin.countDown(); end.await(); System.out.println("比赛结束!"); service.shutdown(); }}CyclicBarrier一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环的 barrier。CyclicBarrier支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作很有用。示例用法:下面是一个在并行分解设计中使用 barrier 的例子,很经典的旅行团例子:
public class CyclicBarrierTest { private static int[]timeWalk = { 5,8,12,12,10 }; private static int[]timeSelf = { 1,3,4,4,5 }; private static int[]timeBus = { 2,4,6,6,7 }; static String now() { SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); return sdf.format(new Date())+ ":"; } static class Tour implements Runnable{ private int[] times; private CyclicBarrier barrier; private String tourName; public Tour(CyclicBarrier barrier, String tourName, int[]times) { this.times= times; this.tourName= tourName; this.barrier= barrier; } public void run(){ try{ Thread.sleep(times[0]*1000); System.out.println(now()+tourName+"到达深圳!"); barrier.await(); Thread.sleep(times[1]*1000); System.out.println(now()+tourName+"到达广州!"); barrier.await(); Thread.sleep(times[2]*1000); System.out.println(now()+tourName+"到达韶关!"); barrier.await(); Thread.sleep(times[3]*1000); System.out.println(now()+tourName+"到达长沙!"); barrier.await(); Thread.sleep(times[0]*1000); System.out.println(now()+tourName+"到达武汉!"); barrier.await(); }catch(InterruptedException e){ e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } } } public static void main(String[] args) { //创建障碍器,并设置MainTask为所有定数量的线程都达到障碍点时候所要执行的任务(Runnable) CyclicBarrier barrier = new CyclicBarrier(3);// 三个旅行团 ExecutorService exec = Executors.newFixedThreadPool(3); exec.submit(new Tour(barrier,"徒步旅游的旅行团",timeWalk)); exec.submit(new Tour(barrier,"自驾游的旅行团",timeSelf)); exec.submit(new Tour(barrier,"旅游大巴的旅行团",timeBus)); exec.shutdown(); } }Exchanger适合于两个线程需要进行数据交换的场景。(一个线程完成后,把结果交给另一个线程继续处理)java.util.concurrent.Exchanger类,提供了这种对象交换能力,两个线程共享一个Exchanger类的对象,一个线程完成对数据的处理之后,调用Exchanger类的exchange()方法把处理之后的数据作为参数发送给另外一个线程。而exchange方法的返回结果是另外一个线程锁提供的相同类型的对象。如果另外一个线程未完成对数据的处理,那么exchange()会使当前线程进入等待状态,直到另外一个线程也调用了exchange方法来进行数据交换。
public class ExchangerTest { private final Exchanger<StringBuilder> exchanger = new Exchanger<StringBuilder>(); private class Sender implements Runnable{ public void run(){ try{ StringBuilder content = new StringBuilder("Hello"); content = exchanger.exchange(content); System.out.println("Sender:"+content); }catch(InterruptedException e){ Thread.currentThread().interrupt(); } } } private class Receiver implements Runnable{ public void run(){ try{ StringBuilder content = new StringBuilder("World"); content = exchanger.exchange(content); System.out.println("Receiver:"+content); }catch(InterruptedException e){ Thread.currentThread().interrupt(); } } } public void exchange(){ new Thread(new Sender()).start(); new Thread(new Receiver()).start(); } public static void main(String[] args){ new ExchangerTest().exchange(); }}Semaphore
一个计数信号量。从概念上讲,信号量维护了一个许可集合。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。例如,下面的类使用信号量控制对内容池的访问
public class SemaphoreTest { public static void main(String[] args){ ExecutorService list=Executors.newCachedThreadPool(); //Semaphore bathroom=new Semaphore(1); Semaphore bathroom=new Semaphore(1,true); for(int i=0;i<4;i++){ list.submit(new MySemaphore(bathroom,i+1)); } list.shutdown(); bathroom.acquireUninterruptibly(1); //System.out.println("大家都洗好了!"); bathroom.release(1); }}public class MySemaphore extends Thread{ Semaphore bathroom; private int id; public MySemaphore(Semaphore bathroom,int id){ this.bathroom=bathroom; this.id=id; } @Override public void run(){ try{ if(bathroom.availablePermits()>0){ System.out.println("室友"+this.id+"进入浴室,有空位"); }else{ System.out.println("室友"+this.id+"进入浴室,没空位,排队"); } bathroom.acquire(); System.out.println("室友"+this.id+"进入浴室洗澡"); Thread.sleep((int)Math.random()*1000); System.out.println("室友"+this.id+"洗完了"); bathroom.release(); }catch(Exception e){ e.printStackTrace(); } }}
新闻热点
疑难解答