众所周知,创建线程的方法有两种,一种是通过继承Thread并重写run()方法,一种是实现Runnable接口,由于run()方法是没有返回值的,因此,这样创建出来的线程也是没有返回值的。为了创造有返回值的线程,java提供了Callable和Future,通过这两个接口,可以在线程完成后得到结果。
Callable
泛型接口,它含有一个方法:V call() throws Exception;它是有返回值的,通常通过ExecutorService来使用Callable。
ExecutorService
在ExecutorService中含有这几个重载方法:
一、<T> Future<T> submit(Callable<T> task);二 、<T> Future<T> submit(Runnable task, T result);三、Future<?> submit(Runnable task);
在第一个方法中我们需要的就是Callable类型的任务。那么,这里的返回值Future类型是什么呢?
Future
Future同样是一个接口,该接口的作用就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。它含有的方法如下:
1、boolean cancel(boolean mayInterruptIfRunning);cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。2、boolean isCancelled();isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。3、boolean isDone();isDone方法表示任务是否已经完成,若任务完成,则返回true;4、 V get() throws InterruptedException, ExecutionException;用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;5、V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。
FutureTask(Future接口的实现类)
FutureTask实现了RunnableFuture接口,而Runnable接口继承了Runnable接口和Future<T>接口,所以它既可以被当做Runnable,也可以可以作为Future得到Callable的返回值。
以下给出一个使用的例子:
public class MyCallable implements Callable{ PRivate String oid; public MyCallable(String oid) { this.oid = oid; } @Override public Object call() throws Exception { return oid+"任务返回的内容"; } }public class MyCallableTest { /* * 有返回值的线程 * 可返回值的线程必须实现Callable接口,类似的,无返回值的线程必须Runnable接口。 * 执行Callable任务后,可以获取一个Future的对象,在该对象上调用get就可以获取 * 到Callable任务返回的Object了。 */ public static void main(String[] args) throws ExecutionException, InterruptedException { //创建一个线程池 ExecutorService pool = Executors.newFixedThreadPool(2); //创建两个有返回值的任务 Callable c1 = new MyCallable("A"); Callable c2 = new MyCallable("B"); Callable c3 = new MyCallable("C"); //执行任务并获取Future对象 Future f1 = pool.submit(c1); Future f2 = pool.submit(c2); Future f3 = pool.submit(c3); //从Future对象上获取任务的返回值,并输出到控制台 System.out.println(">>>"+f1.get().toString()); System.out.println(">>>"+f2.get().toString()); System.out.println(">>>"+f3.get().toString()); //关闭线程池 pool.shutdown(); } }Excutor接口
Executor接口的对象是一种执行任务的方式。它能够使提交的任务和执行任务的线程的管理解耦。我们通常用Executor来代替new一个Thread对象来执行任务。这样可以省略底层线程的管理细节。
public interface Executor { void execute(Runnable command);}ExecutorService接口
该接口继承Executor接口,主要提供了一些终结线程的方法,以及一些能够返回Future对象的方法,从而用来追踪一个或者多个异步任务的执行情况。从而提供执行任务和管理生命周期的方法。
public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;}ScheduledExecutorService
该接口继承ExecutorService接口,提供一些方法来使任务延后执行或者周期执行。
public interface ScheduledExecutorService extends ExecutorService { public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit); public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit); public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit); public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay, TimeUnit unit);}ThreadPoolExecutor
java.uitl.concurrent.ThreadPoolExecutorThreadPoolExecutor类是线程池中最核心的一个类,它继承了AbstractExecutorService类,并提供了四个构造器,我们来看看最主要的一个。 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
corePoolSize:
核心池的大小,池中所保存的线程数,包括空闲线程。
maximumPoolSize:
池中允许的最大线程数。
keepAliveTime:
当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
unit:
keepAliveTime 参数的时间单位。
workQueue:
执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。threadFactory:
线程工厂,主要用来创建线程;
handler:
表示当拒绝处理任务时的策略,有以下四种取值:ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
Executors:工具类,通过这个类的静态方法能够获得多种线程池的实例。
关于execute方法和submit方法:
execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。submit()方法是在ExecutorService中声明的方法,也是向线程池提交一个任务交由线程池去执行,不过它能够返回任务执行的结果。(Future)
public class ExecutorServiceTest { public static void main(String[] args) { /* * 固定大小的线程池 * 创建一个可重用固定线程数的线程池 * ,顾名思义,线程池的线程是不会释放的,这就会产生性能问题, * 比如如果线程池的大小为200,当全部使用完毕后,所有的线程 * 会继续留在池中,相应的内存和线程切换(while(true)+sleep循环)都会增加。 * 如果要避免这个问题,就必须直接使用ThreadPoolExecutor()来构造。可以像 * 通用的线程池一样设置“最大线程数”、“最小线程数”和“空闲线程keepAlive的时间”。 */ //ExecutorService pool = Executors.newFixedThreadPool(2); /* * 单任务线程池 * 创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。 * 对于以上两种连接池,大小都是固定的,当要加入的池的线程(或者任务)超过池最 * 大尺寸时候,则入此线程池需要排队等待。一旦池中有线程完毕,则排队等待的某个 * 线程会入池执行。 */ //ExecutorService pool = Executors.newSingleThreadExecutor(); /* * 可变尺寸的线程池 */ ExecutorService pool = Executors.newCachedThreadPool(); Thread t1 = new Thread1(); Thread t2 = new Thread1(); Thread t3 = new Thread1(); Thread t4 = new Thread1(); Thread t5 = new Thread1(); //将线程放入池中进行执行 pool.execute(t1); pool.execute(t2); pool.execute(t3); pool.execute(t4); pool.execute(t5); //关闭线程池 pool.shutdown(); } }public class ThreadPoolExecutorTest { public static void main(String[] args) { //创建等待队列 BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20); /* * 创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。 * 参数: * corePoolSize - 池中所保存的线程数,包括空闲线程。 * maximumPoolSize - 池中允许的最大线程数。 * keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。 * unit - keepAliveTime 参数的时间单位。 * workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。 */ ThreadPoolExecutor pool = new ThreadPoolExecutor(2,3,2,TimeUnit.MILLISECONDS,bqueue); Thread t1 = new Thread1(); Thread t2 = new Thread1(); Thread t3 = new Thread1(); Thread t4 = new Thread1(); Thread t5 = new Thread1(); Thread t6 = new Thread1(); Thread t7 = new Thread1(); //将线程放入池中进行执行 pool.execute(t1); pool.execute(t2); pool.execute(t3); pool.execute(t4); pool.execute(t5); pool.execute(t6); pool.execute(t7); //关闭线程池 pool.shutdown(); } }public class ScheduledExecutorServiceTest { public static void main(String[] args) { final Runnable thread1=new Runnable(){ int count=0; public void run(){ System.out.println(new Date()+" "+(++count)); } }; /* * 延迟连接池 * 创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。 */ ScheduledExecutorService pool = Executors.newScheduledThreadPool(2); /* * 单任务延迟连接池 */ //ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor(); //1秒钟后运行,并每隔2秒运行一次 final ScheduledFuture future1=pool.scheduleAtFixedRate(thread1, 1, 2, TimeUnit.SECONDS); //2秒钟后运行,并每次在上次任务运行完后等待5秒后重新运行 final ScheduledFuture future2 = pool.scheduleWithFixedDelay(thread1, 2,5,TimeUnit.SECONDS); //关闭线程池 pool.schedule(new Runnable(){ public void run(){ future1.cancel(true); future2.cancel(true); pool.shutdown(); } }, 30, TimeUnit.SECONDS); } }
新闻热点
疑难解答