首页 > 学院 > 开发设计 > 正文

线程池技术

2019-11-08 03:20:58
字体:
来源:转载
供稿:网友

1、线程池技术的简单概括

       对于服务端的程序,经常面对的是客户端传入的短小(执行时间短、工作内容较为单一)任务,需要服务端快速处理并返回结果。如果服务端每次接受到一个任务,创建一个线程,然后进行执行,这在原型阶段是个不错的选择,但是面对成千上万的任务递交进服务器时,如果还是采用一个任务一个线程的方式,那么将会创建数以万记的线程,这不是一个好的选择。因为这会使操作系统频繁的进行线程上下文切换,无故增加系统的负载,而线程的创建和消亡都是需要耗费系统资源的,也无疑浪费了系统资源。

       线程池技术能够很好地解决这个问题,它预先创建了若干数量的线程,并且不能由用户直接对线程的创建进行控制,在这个前提下重复使用固定或较为固定数目的线程来完成任务的执行。这样做的好处是,一方面,消除了频繁创建和消亡线程的系统资源开销,另一方面,面对过量任务的提交能够平缓的劣化。

2、简单的线程池接口定义

     客户端可以通过execute(Job)方法将Job提交入线程池执行,而客户端自身不用等待Job的执行完成。除了execute(Job)方法以外,线程池接口提供了增大/减少工作者线程以及关闭线程池的方法。这里工作者线程代表着一个重复执行Job的线程,而每个由客户端提交的Job都将进入到一个工作队列中等待工作者线程的处理。

public interface ThreadPool<Job extends Runnable> {	// 执行一个Job,这个Job需要实现Runnable	void execute(Job job);	// 关闭线程池销毁线程池,该方法保证在所有任务都完成的情况下才销毁所有线程,否则等待任务完成才销毁  	void shutdown();	// 增加工作者线程	void addWorkers(int num);	// 减少工作者线程	void removeWorker(int num);	// 得到正在等待执行的任务数量	int getJobSize();	// 获得当前线程池中线程的个数	int getThreadSize();}

3、线程池接口的默认实现

import java.util.LinkedList;import java.util.List;import java.util.concurrent.atomic.AtomicLong;public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {	// 线程池最大限制数	PRivate static final int MAX_WORKER_NUMBERS = 10;	// 线程池默认的数量	private static final int DEFAULT_WORKER_NUMBERS = 5;	// 线程池最小的数量	private static final int MIN_WORKER_NUMBERS = 1;	// 这是一个工作列表,将会向里面插入工作	private final LinkedList<Job> jobs = new LinkedList<Job>();	// 工作者列表	private List<Worker> workers = new LinkedList<Worker>();	// 工作者线程的数量	private int workerNum = DEFAULT_WORKER_NUMBERS;	// 线程编号生成	private AtomicLong threadNum = new AtomicLong();	public DefaultThreadPool() {		initializeWokers(DEFAULT_WORKER_NUMBERS);	}	public DefaultThreadPool(int num) {		workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS				: num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;		initializeWokers(workerNum);	}	public void execute(Job job) {		if (job != null) {			// 添加一个工作,然后进行通知			synchronized (jobs) {				jobs.addLast(job);				jobs.notify();			}		}	}	public void shutdown() {		// 如果还有任务没执行完成,就先睡会吧		while (!jobs.isEmpty()) {			try {				Thread.sleep(10);			} catch (InterruptedException e) {				e.printStackTrace();			}		}		for (Worker worker : workers) {			worker.shutdown();			worker = null;		}		workers = null;		// for (int i = 0; i < workers.size(); i++) {		// workers.get(i).shutdown();		// }		// for (int i = 0; i < workers.size() + 1; i++) {		// workers.get(i) = null;		// }		jobs.clear();// 清空任务队列	}	public void addWorkers(int num) {		synchronized (jobs) {			// 限制新增的Worker数量不能超过最大值			if (num + this.workerNum > MAX_WORKER_NUMBERS) {				num = MAX_WORKER_NUMBERS - this.workerNum;			}			initializeWokers(num);			this.workerNum += num;		}	}	public void removeWorker(int num) {		synchronized (jobs) {			if (num >= this.workerNum) {				throw new IllegalArgumentException("beyond workNum");			}			// 按照给定的数量停止Worker			int count = 0;			while (count < num) {				Worker worker = workers.get(count);				if (workers.remove(worker)) {					worker.shutdown();					count++;				}			}			this.workerNum -= count;		}	}	public int getJobSize() {		return jobs.size();	}	// 初始化线程工作者	private void initializeWokers(int num) {		for (int i = 0; i < num; i++) {			Worker worker = new Worker();			workers.add(worker);			Thread thread = new Thread(worker, "ThreadPool-Worker-"					+ threadNum.incrementAndGet());			thread.start();		}	}	// 工作者,负责消费任务	class Worker implements Runnable {		// 该工作线程是否有效,用于结束该工作线程		private volatile boolean running = true;		/*		 * 关键所在啊,如果任务队列不空,则取出任务执行,若任务队列空,则等待		 */		@Override		public void run() {			Runnable job = null;			while (running) {				synchronized (jobs) {					// 线程还在运行					// 并且,如果工作者列表是空的,那么就wait					while (running && jobs.isEmpty()) { // 队列为空						try {							jobs.wait(20);						} catch (InterruptedException ex) {							// 感知到外部对WorkerThread的中断操作,返回							Thread.currentThread().interrupt();							return;						}					}					if (!jobs.isEmpty())						job = jobs.remove(0);// 取出任务				}				if (job != null) {					job.run(); // 执行任务				}				// 释放资源				job = null;			}		}		// 停止工作,让该线程自然执行完run方法,自然结束		public void shutdown() {			running = false;		}	}	@Override	public int getThreadSize() {		return workerNum;	}}

4、线程池的实现分析

        从线程池的实现可以看到,当客户端调用execute(Job)方法时,会不断地向任务列表jobs中添加Job,而每个工作者线程会不断地从jobs上取出一个Job进行执行,当jobs为空时,工作者线程进入等待状态。添加一个Job后,对工作队列jobs调用了其notify()方法,而不是notifyAll()方法,因为能够确定有工作者线程被唤醒,这时使用notify()方法将会比notifyAll()方法获得更小的开销(避免将等待队列中的线程全部移动到阻塞队列中)。       可以看到,线程池的本质就是使用了一个线程安全的工作队列连接工作者线程和客户端线程,客户端线程将任务放入工作队列后便返回,而工作者线程则不断地从工作队列上取出工作并执行。当工作队列为空时,所有的工作者线程均等待在工作队列上,当有客户端提交了一个任务之后会通知任意一个工作者线程,随着大量的任务被提交,更多的工作者线程会被唤醒。

5、测试代码

    5.1  要执行的任务

public class MyJob implements Runnable {	private int age;	public MyJob(int age) {		this.age = age;	}	@Override	public void run() {		System.out.println(age);	}}

    5.2  测试代码 

public class TestThreadPool {	public static void main(String[] args) {		// 初始化线程池个数		ThreadPool<Runnable> threadPool = new DefaultThreadPool<Runnable>(2);		// 提交任务		threadPool.execute(new MyJob(3));		threadPool.execute(new MyJob(4));		threadPool.execute(new MyJob(5));		// 销毁线程池		threadPool.shutdown();	}}

6、参考的书籍

Java并发编程的艺术

7、代码参考地址

https://git.oschina.net/baowei/MyThreadPool


上一篇:134. Gas Station

下一篇:268. Missing Number

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表