首页 > 编程 > Java > 正文

【java总结】多线程进阶篇之任务执行

2019-11-08 01:59:47
字体:
来源:转载
供稿:网友

众所周知,创建线程的方法有两种,一种是通过继承Thread并重写run()方法,一种是实现Runnable接口,由于run()方法是没有返回值的,因此,这样创建出来的线程也是没有返回值的。为了创造有返回值的线程,java提供了Callable和Future,通过这两个接口,可以在线程完成后得到结果。

Callable

泛型接口,它含有一个方法:V call() throws Exception;它是有返回值的,通常通过ExecutorService来使用Callable。

ExecutorService

在ExecutorService中含有这几个重载方法:

一、<T> Future<T> submit(Callable<T> task);二 、<T> Future<T> submit(Runnable task, T result);三、Future<?> submit(Runnable task);

在第一个方法中我们需要的就是Callable类型的任务。那么,这里的返回值Future类型是什么呢?

Future

Future同样是一个接口,该接口的作用就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。它含有的方法如下:

1、boolean cancel(boolean mayInterruptIfRunning);cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。2、boolean isCancelled();isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。3、boolean isDone();isDone方法表示任务是否已经完成,若任务完成,则返回true;4、 V get() throws InterruptedException, ExecutionException;用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;5、V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。

FutureTask(Future接口的实现类)

FutureTask实现了RunnableFuture接口,而Runnable接口继承了Runnable接口和Future<T>接口,所以它既可以被当做Runnable,也可以可以作为Future得到Callable的返回值。

以下给出一个使用的例子:

public class MyCallable implements Callable{     PRivate String oid;     public MyCallable(String oid) {             this.oid = oid;     }     @Override     public Object call() throws Exception {             return oid+"任务返回的内容";     } }

public class MyCallableTest {	/*	 * 有返回值的线程 	 * 可返回值的线程必须实现Callable接口,类似的,无返回值的线程必须Runnable接口。	 * 执行Callable任务后,可以获取一个Future的对象,在该对象上调用get就可以获取	 * 到Callable任务返回的Object了。	 */	public static void main(String[] args) throws ExecutionException, InterruptedException {         //创建一个线程池         ExecutorService pool = Executors.newFixedThreadPool(2);         //创建两个有返回值的任务         Callable c1 = new MyCallable("A");         Callable c2 = new MyCallable("B");        Callable c3 = new MyCallable("C");         //执行任务并获取Future对象         Future f1 = pool.submit(c1);         Future f2 = pool.submit(c2);         Future f3 = pool.submit(c3);         //从Future对象上获取任务的返回值,并输出到控制台         System.out.println(">>>"+f1.get().toString());         System.out.println(">>>"+f2.get().toString());         System.out.println(">>>"+f3.get().toString());         //关闭线程池         pool.shutdown(); } }

Excutor接口

Executor接口的对象是一种执行任务的方式。它能够使提交的任务和执行任务的线程的管理解耦。我们通常用Executor来代替new一个Thread对象来执行任务。这样可以省略底层线程的管理细节。

public interface Executor {    void execute(Runnable command);}

ExecutorService接口

该接口继承Executor接口,主要提供了一些终结线程的方法,以及一些能够返回Future对象的方法,从而用来追踪一个或者多个异步任务的执行情况。从而提供执行任务和管理生命周期的方法。

public interface ExecutorService extends Executor {    void shutdown();    List<Runnable> shutdownNow();    boolean isShutdown();    boolean isTerminated();    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;    <T> Future<T> submit(Runnable task, T result);    Future<?> submit(Runnable task);    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;    <T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;    <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;}

ScheduledExecutorService

该接口继承ExecutorService接口,提供一些方法来使任务延后执行或者周期执行。

public interface ScheduledExecutorService extends ExecutorService {    public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);    public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay, TimeUnit unit);}

ThreadPoolExecutor

java.uitl.concurrent.ThreadPoolExecutorThreadPoolExecutor类是线程池中最核心的一个类,它继承了AbstractExecutorService类,并提供了四个构造器,我们来看看最主要的一个。 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);

corePoolSize:

核心池的大小,池中所保存的线程数,包括空闲线程。

maximumPoolSize:

池中允许的最大线程数。

keepAliveTime:

当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

unit:

keepAliveTime 参数的时间单位。

workQueue:

执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。threadFactory:

线程工厂,主要用来创建线程;

handler:

表示当拒绝处理任务时的策略,有以下四种取值:ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

Executors:工具类,通过这个类的静态方法能够获得多种线程池的实例。

关于execute方法和submit方法:

execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。submit()方法是在ExecutorService中声明的方法,也是向线程池提交一个任务交由线程池去执行,不过它能够返回任务执行的结果。(Future)

public class ExecutorServiceTest {	public static void main(String[] args) {         /*         * 固定大小的线程池         * 创建一个可重用固定线程数的线程池          * ,顾名思义,线程池的线程是不会释放的,这就会产生性能问题,         * 比如如果线程池的大小为200,当全部使用完毕后,所有的线程         * 会继续留在池中,相应的内存和线程切换(while(true)+sleep循环)都会增加。         * 如果要避免这个问题,就必须直接使用ThreadPoolExecutor()来构造。可以像         * 通用的线程池一样设置“最大线程数”、“最小线程数”和“空闲线程keepAlive的时间”。         */        //ExecutorService pool = Executors.newFixedThreadPool(2);		/*		 * 单任务线程池		 * 创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。 		 * 对于以上两种连接池,大小都是固定的,当要加入的池的线程(或者任务)超过池最		 * 大尺寸时候,则入此线程池需要排队等待。一旦池中有线程完毕,则排队等待的某个		 * 线程会入池执行。		 */        //ExecutorService pool = Executors.newSingleThreadExecutor();         /*         * 可变尺寸的线程池         */        ExecutorService pool = Executors.newCachedThreadPool();         Thread t1 = new Thread1();         Thread t2 = new Thread1();         Thread t3 = new Thread1();         Thread t4 = new Thread1();         Thread t5 = new Thread1();         //将线程放入池中进行执行         pool.execute(t1);         pool.execute(t2);         pool.execute(t3);         pool.execute(t4);         pool.execute(t5);         //关闭线程池         pool.shutdown(); } }
public class ThreadPoolExecutorTest {	 public static void main(String[] args) {          //创建等待队列          BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20);             /*          * 创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。           * 参数:          * corePoolSize - 池中所保存的线程数,包括空闲线程。          * maximumPoolSize - 池中允许的最大线程数。          * keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。          * unit - keepAliveTime 参数的时间单位。          * workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。          */         ThreadPoolExecutor pool = new ThreadPoolExecutor(2,3,2,TimeUnit.MILLISECONDS,bqueue);          Thread t1 = new Thread1();          Thread t2 = new Thread1();          Thread t3 = new Thread1();          Thread t4 = new Thread1();          Thread t5 = new Thread1();          Thread t6 = new Thread1();          Thread t7 = new Thread1();          //将线程放入池中进行执行          pool.execute(t1);          pool.execute(t2);          pool.execute(t3);          pool.execute(t4);          pool.execute(t5);          pool.execute(t6);          pool.execute(t7);          //关闭线程池          pool.shutdown();  } }
public class ScheduledExecutorServiceTest {	 public static void main(String[] args) { 		 final Runnable thread1=new Runnable(){				int count=0;				public void run(){					System.out.println(new Date()+" "+(++count));				}			};         /*          * 延迟连接池          * 创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。           */         ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);          /*          * 单任务延迟连接池          */		          //ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();         //1秒钟后运行,并每隔2秒运行一次         final ScheduledFuture future1=pool.scheduleAtFixedRate(thread1, 1, 2, TimeUnit.SECONDS);         //2秒钟后运行,并每次在上次任务运行完后等待5秒后重新运行         final ScheduledFuture future2 = pool.scheduleWithFixedDelay(thread1, 2,5,TimeUnit.SECONDS);         //关闭线程池          pool.schedule(new Runnable(){        	 public void run(){        		 future1.cancel(true);        		 future2.cancel(true);        		 pool.shutdown();         	 }         }, 30, TimeUnit.SECONDS); } }


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表