首页 > 学院 > 开发设计 > 正文

阻塞队列

2019-11-06 08:06:16
字体:
来源:转载
供稿:网友

队列是一种数据结构:

先进先出(FIFO)

后进先出(LIFO)

普通的队列,并不会对当前的线程产生阻塞的作用,如果作用在生产者和消费者,需要额外的实现线程间同步以及唤醒策略。

阻塞队列,当队列是空时,消费者所有的线程将被阻塞,当队列是满时,所有的生产者的线程阻塞

as中快捷键 Ctrl+H可以查看:

具体的方法 Ctrl + F12(Alt + 7):

add:当队列满了添加会抛异常

remove : 当对列为空时,会抛出异常

put方法用来向队尾存入元素,如果队列满,则等待;

take方法用来从队首取元素,如果队列为空,则等待;

offer(e,time,unit)方法用来向队尾存入元素,如果队列满,则等待一定的时间,当时间期限达到时,如果还没有插入成功,则返回false;否则返回true;

poll方法用来从队首取元素,如果队列空,则等待一定的时间,当时间期限达到时,如果取到,则返回null;否则返回取得的元素

注意: 如果是offer/poll当不满足条件时不会被阻塞,而是直接返回true或者false null或者具体值,而put/take会一直被阻塞

例子:

生产者:

import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;public class PRoducer implements Runnable {private volatile boolean isRunning = true;private BlockingQueue queue;private int name;private AtomicInteger count = new AtomicInteger();public Producer(BlockingQueue queue, int name) { this.queue = queue; this.name = name;}@Overridepublic void run() { String data = null; System.out.println(Thread.currentThread().getName() + "----" + name + "启动生产者线程"); try { while (isRunning) { System.out.println(Thread.currentThread().getName() + "----" + name + "正在生产数据"); Thread.sleep(2 * 1000); data = "data: " + count.incrementAndGet(); System.out.println(Thread.currentThread().getName() + "----" + name + " 将数据 " + data + " 放入队列"); if (!queue.offer(data, 2, TimeUnit.SECONDS)) { System.out.println(Thread.currentThread().getName() + "----" + name + "放入数据失败"); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println(Thread.currentThread().getName() + "----" + name + "退出生产"); }}public void stop() { isRunning = false;}}

消费者:

import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit;public class Consumer implements Runnable {private BlockingQueue<String> queue;public Consumer(BlockingQueue<String> queue) { this.queue = queue;}@Overridepublic void run() { System.out.println("启动消费者"); boolean isRunning = true; try { while (isRunning) { System.out.println("正从队列中取数据"); String data = queue.poll(2, TimeUnit.SECONDS); if (data != null) { System.out.println("拿到数据 : " + data); System.out.println("正在消费数据"); Thread.sleep(2*1000); } else { isRunning = false; } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println("退出消费者"); }}}

测试:

import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingDeque;//psvm 生成main方法public class BlockQueueTest {public static void main(String[] args) throws InterruptedException { BlockingQueue<String> queue = new LinkedBlockingDeque<>(10); Producer producer1 = new Producer(queue,1); Producer producer2 = new Producer(queue,2); Producer producer3 = new Producer(queue,3); Consumer consumer = new Consumer(queue); // 借助Executors ExecutorService service = Executors.newCachedThreadPool(); // 启动线程 service.execute(producer1); service.execute(producer2); service.execute(producer3); service.execute(consumer); // 执行10s Thread.sleep(10 * 1000); producer1.stop(); producer2.stop(); producer3.stop(); Thread.sleep(2000); // 退出Executor service.shutdown();}}

AtomicInteger

应用场景:高并发

在并发场景下:

Integer: 需要手动加synchronized

AtomicInteger:加减是同步的

方法:

incrementAndGet() 以原子方式将当前值加 1。返回的是加后的值getAndIncrement() 以原子方式将当前值加 1。 返回的是加之前的值

更多的方法可以参考在线的jdk文档: http://tool.oschina.net/apidocs/apidoc?api=jdk-zh


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表