首页 > 编程 > Java > 正文

【java总结】多线程进阶篇之辅助类

2019-11-08 01:44:04
字体:
来源:转载
供稿:网友

CompletionService

将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者 submit 执行的任务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。例如,CompletionService 可以用来管理异步 IO ,执行读操作的任务作为程序或系统的一部分提交,然后,当完成读操作时,会在程序的不同部分执行其他操作,执行操作的顺序可能与所请求的顺序不同。 通常,CompletionService 依赖于一个单独的 Executor 来实际执行任务,在这种情况下,CompletionService 只管理一个内部完成队列。ExecutorCompletionService 类提供了此方法的一个实现。

public class CompletionServiceTest {	public static void main(String[] args){		ExecutorService service=Executors.newCachedThreadPool();		CompletionService<String> completion=new ExecutorCompletionService<String>(service);		for(int i=0;i<10;i++){			completion.submit(new MyCompletionService(i));		}		try{			//会按照异步IO完成的顺序			for(int i=0;i<10;i++){				System.out.PRintln(completion.take().get());			}		}catch(Exception e){			e.printStackTrace();		}		service.shutdown();	}}

CountDownLatch

一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier。CountDownLatch 是一个通用同步工具,它有很多用途。将计数 1 初始化的 CountDownLatch 用作一个简单的开/关锁存器, 或入口:在通过调用 countDown() 的线程打开入口前,所有调用 await 的线程都一直在入口处等待。用 N 初始化的 CountDownLatch 可以使一个线程在 N 个线程完成某项操作之前一直等待,或者使其在某项操作完成 N 次之前一直等待。 CountDownLatch 的一个有用特性是,它不要求调用 countDown 方法的线程等到计数到达零时才继续,而在所有线程都能通过之前,它只是阻止任何线程继续通过一个 await。 CountDownLatch最重要的方法是countDown()await()前者主要是倒数一次,后者是等待倒数到0,如果没有到达0,就只有阻塞等待了。

public class CountDownLatchTest {	public static void main(String[] args) throws InterruptedException{		final CountDownLatch begin=new CountDownLatch(1);		final CountDownLatch end=new CountDownLatch(10);		final ExecutorService service=Executors.newCachedThreadPool();		for(int i=0;i<10;i++){			final int NO=i+1;			Runnable run=new Runnable(){				public void run(){					try{						begin.await();						Thread.sleep((long)(Math.random()*10000));						System.out.println("No."+NO+"到达终点!");					}catch(InterruptedException e){						e.printStackTrace();					}finally{						end.countDown();					}				}			};			service.submit(run);		}		System.out.println("比赛开始!");		begin.countDown();		end.await();		System.out.println("比赛结束!");		service.shutdown();	}}	

CyclicBarrier一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环的 barrier。CyclicBarrier支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作很有用。示例用法:下面是一个在并行分解设计中使用 barrier 的例子,很经典的旅行团例子:

public class CyclicBarrierTest {	private static int[]timeWalk = { 5,8,12,12,10 };	private static int[]timeSelf = { 1,3,4,4,5 };	private static int[]timeBus = { 2,4,6,6,7 };	 static String now() {		 SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");	     return sdf.format(new Date())+ ":";	  }	 static class Tour implements Runnable{		 private int[] times;		 private CyclicBarrier barrier;		 private String tourName;		 public Tour(CyclicBarrier barrier, String tourName, int[]times) {			this.times= times;			this.tourName= tourName;			this.barrier= barrier;			}		 public void run(){			 try{				 Thread.sleep(times[0]*1000);				 System.out.println(now()+tourName+"到达深圳!");				 barrier.await();				 Thread.sleep(times[1]*1000);				 System.out.println(now()+tourName+"到达广州!");				 barrier.await();				 Thread.sleep(times[2]*1000);				 System.out.println(now()+tourName+"到达韶关!");				 barrier.await();				 Thread.sleep(times[3]*1000);				 System.out.println(now()+tourName+"到达长沙!");				 barrier.await();				 Thread.sleep(times[0]*1000);				 System.out.println(now()+tourName+"到达武汉!");				 barrier.await();			 }catch(InterruptedException e){				 e.printStackTrace();			 }catch(BrokenBarrierException e){				 e.printStackTrace();			 }		 }	 }	 public static void main(String[] args) {          //创建障碍器,并设置MainTask为所有定数量的线程都达到障碍点时候所要执行的任务(Runnable) 		 CyclicBarrier barrier = new CyclicBarrier(3);// 三个旅行团		 ExecutorService exec = Executors.newFixedThreadPool(3);		 exec.submit(new Tour(barrier,"徒步旅游的旅行团",timeWalk));		 exec.submit(new Tour(barrier,"自驾游的旅行团",timeSelf));		 exec.submit(new Tour(barrier,"旅游大巴的旅行团",timeBus));		 exec.shutdown(); } }Exchanger

适合于两个线程需要进行数据交换的场景。(一个线程完成后,把结果交给另一个线程继续处理)java.util.concurrent.Exchanger类,提供了这种对象交换能力,两个线程共享一个Exchanger类的对象,一个线程完成对数据的处理之后,调用Exchanger类的exchange()方法把处理之后的数据作为参数发送给另外一个线程。而exchange方法的返回结果是另外一个线程锁提供的相同类型的对象。如果另外一个线程未完成对数据的处理,那么exchange()会使当前线程进入等待状态,直到另外一个线程也调用了exchange方法来进行数据交换。

public class ExchangerTest {	private final Exchanger<StringBuilder> exchanger = new Exchanger<StringBuilder>();	   private class Sender implements Runnable{	      public void run(){	         try{	            StringBuilder content = new StringBuilder("Hello");	            content = exchanger.exchange(content);	            System.out.println("Sender:"+content);	         }catch(InterruptedException e){	            Thread.currentThread().interrupt();	         }	      }	   }	   private class Receiver implements Runnable{	      public void run(){	         try{	            StringBuilder content = new StringBuilder("World");	            content = exchanger.exchange(content);	            System.out.println("Receiver:"+content);	         }catch(InterruptedException e){	            Thread.currentThread().interrupt();	         }	      }	   }	   public void exchange(){	      new Thread(new Sender()).start();	      new Thread(new Receiver()).start();	   }	   public static void main(String[] args){		   new ExchangerTest().exchange();	   }}

Semaphore

一个计数信号量。从概念上讲,信号量维护了一个许可集合。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。例如,下面的类使用信号量控制对内容池的访问

public class SemaphoreTest {	public static void main(String[] args){		ExecutorService list=Executors.newCachedThreadPool();		//Semaphore bathroom=new Semaphore(1);		Semaphore bathroom=new Semaphore(1,true);		for(int i=0;i<4;i++){			list.submit(new MySemaphore(bathroom,i+1));		}		list.shutdown();		bathroom.acquireUninterruptibly(1);		//System.out.println("大家都洗好了!");		bathroom.release(1);	}}
public class MySemaphore extends Thread{	Semaphore bathroom;	private int id;	public MySemaphore(Semaphore bathroom,int id){		this.bathroom=bathroom;		this.id=id;	}	@Override	public void run(){		try{			if(bathroom.availablePermits()>0){				System.out.println("室友"+this.id+"进入浴室,有空位");			}else{				System.out.println("室友"+this.id+"进入浴室,没空位,排队");			}			bathroom.acquire();			System.out.println("室友"+this.id+"进入浴室洗澡");			Thread.sleep((int)Math.random()*1000);			System.out.println("室友"+this.id+"洗完了");			bathroom.release();		}catch(Exception e){			e.printStackTrace();		}	}}


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表