在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景。
认识BlockingQueue阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示:1234567891011121314151617181920212223242526272829303132333435363738 | import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; /** * @author jackyuj */ public class BlockingQueueTest { public static void main(String[] args) throws InterruptedException { // 声明一个容量为10的缓存队列 BlockingQueue<String> queue = new LinkedBlockingQueue<String>( 10 ); PRoducer producer1 = new Producer(queue); Producer producer2 = new Producer(queue); Producer producer3 = new Producer(queue); Consumer consumer = new Consumer(queue); // 借助Executors ExecutorService service = Executors.newCachedThreadPool(); // 启动线程 service.execute(producer1); service.execute(producer2); service.execute(producer3); service.execute(consumer); // 执行10s Thread.sleep( 10 * 1000 ); producer1.stop(); producer2.stop(); producer3.stop(); Thread.sleep( 2000 ); // 退出Executor service.shutdown(); } } |
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 | import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; /** * 消费者线程 * * @author jackyuj */ public class Consumer implements Runnable { public Consumer(BlockingQueue<String> queue) { this .queue = queue; } public void run() { System.out.println( "启动消费者线程!" ); Random r = new Random(); boolean isRunning = true ; try { while (isRunning) { System.out.println( "正从队列获取数据..." ); String data = queue.poll( 2 , TimeUnit.SECONDS); if ( null != data) { System.out.println( "拿到数据:" + data); System.out.println( "正在消费数据:" + data); Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); } else { // 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。 isRunning = false ; } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println( "退出消费者线程!" ); } } private BlockingQueue<String> queue; private static final int DEFAULT_RANGE_FOR_SLEEP = 1000 ; } import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * 生产者线程 * * @author jackyuj */ public class Producer implements Runnable { public Producer(BlockingQueue queue) { this .queue = queue; } public void run() { String data = null ; Random r = new Random(); System.out.println( "启动生产者线程!" ); try { while (isRunning) { System.out.println( "正在生产数据..." ); Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); data = "data:" + count.incrementAndGet(); System.out.println( "将数据:" + data + "放入队列..." ); if (!queue.offer(data, 2 , TimeUnit.SECONDS)) { System.out.println( "放入数据失败:" + data); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println( "退出生产者线程!" ); } } public void stop() { isRunning = false ; } private volatile boolean isRunning = true ; private BlockingQueue queue; private static AtomicInteger count = new AtomicInteger(); private static final int DEFAULT_RANGE_FOR_SLEEP = 1000 ; } |
新闻热点
疑难解答