BlockingQueue阻塞队列
是java5线程新特征中的内容,Java定义了阻塞队列的接口 java.util.concurrent.BlockingQueue,阻塞队列的概念是,一个指定长度的队列,如果队列满了,添加新元素的操作会被阻塞等待,直到有空位为止。同样,当队列为空时候,请求队列元素的操作同样会阻塞等待,直到有可用元素为止。
public class BlockingQueueTest { public static void main(String[] args) throws InterruptedException { BlockingQueue bqueue = new ArrayBlockingQueue(20); for (int i = 0; i < 30; i++) { //将指定元素添加到此队列中,如果没有可用空间,将一直等待(如果有必要)。 bqueue.put(i); System.out.PRintln("向阻塞队列中添加了元素:" + i); } //程序没有运行至此,说明被线程被阻塞 System.out.println("程序到此运行结束,即将退出----"); } }BlockingDeque阻塞栈与阻塞队列相似。不同点在于栈是“后入先出”的结构,每次操作的是栈顶,而队列是“先进先出”的结构,每次操作的是队列头。
public class BlockingDequeTest { public static void main(String[] args) throws InterruptedException { BlockingDeque bDeque = new LinkedBlockingDeque(10); for (int i = 0; i < 10; i++) { bDeque.putFirst(i); System.out.println("向阻塞栈中添加了元素:" + i); } for (int i = 0; i < 10; i++) { System.out.println("从阻塞栈中弹出了元素:" +bDeque.pop()); } for (int i = 0; i < 11; i++) { //将指定元素添加到此阻塞栈中,如果没有可用空间,将一直等待(如果有必要)。 bDeque.putFirst(i); System.out.println("向阻塞栈中添加了元素:" + i); } //程序没有运行至此,说明线程被阻塞 System.out.println("程序到此运行结束,即将退出----"); } }LinkedBlockingQueue
实现了 BlockingQueue 接口。LinkedBlockingQueue 内部以一个链式结构(链接节点)对其元素进行存储。如果需要的话,这一链式结构可以选择一个上限。如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限。LinkedBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。
LinkedBlockingDeque
LinkedBlockingDeque是双向链表实现的双向并发阻塞队列。该阻塞队列同时支持FIFO和FILO两种操作方式,即可以从队列的头和尾同时操作(插入/删除);并且,该阻塞队列是支持线程安全。此外,LinkedBlockingDeque还是可选容量的(防止过度膨胀),即可以指定队列的容量。如果不指定,默认容量大小等于Integer.MAX_VALUE。
lock是控制对LinkedBlockingDeque的互斥锁,当多个线程竞争同时访问LinkedBlockingDeque时,某线程获取到了互斥锁lock,其
它线程则需要阻塞等待,直到该线程释放lock,其它线程才有机会获取lock从而获取cpu执行权。notEmpty和notFull分别是“非空条件”和“未满条件”。通过它们能够更加细腻进行并发控制。-- 若某线程(线程A)要取出数据时,队列正好为空,则该线程会执行notEmpty.await()进行等待;当其它某个线程(线程B)向队列中插
入了数据之后,会调用notEmpty.signal()唤醒“notEmpty上的等待线程”。此时,线程A会被唤醒从而得以继续运行。 此外,线程A在
执行取操作前,会获取takeLock,在取操作执行完毕再释放takeLock。-- 若某线程(线程H)要插入数据时,队列已满,则该线程会它执行notFull.await()进行等待;当其它某个线程(线程I)取出数据之后,
会调用notFull.signal()唤醒“notFull上的等待线程”。此时,线程H就会被唤醒从而得以继续运行。 此外,线程H在执行插入操作前,
会获取putLock,在插入操作执行完毕才释放putLock。
public class LinkedBlockingDequeTest { private static Queue<String> queue = new LinkedBlockingDeque<String>(); public static void main(String[] args) { // 同时启动两个线程对queue进行操作! new MyThread("ta").start(); new MyThread("tb").start(); } private static void printAll() { String value; Iterator iter = queue.iterator(); while(iter.hasNext()) { value = (String)iter.next(); System.out.print(value+", "); } System.out.println(); } private static class MyThread extends Thread { MyThread(String name) { super(name); } @Override public void run() { int i = 0; while (i++ < 5) { // “线程名” + "-" + "序号" String val = Thread.currentThread().getName()+i; queue.add(val); // 通过“Iterator”遍历queue。 printAll(); } } }}PriorityBlockingQueue类
是JDK提供的优先级队列 本身是线程安全的 内部使用显示锁 保证线程安全PriorityBlockingQueue存储的对象必须是实现Comparable接口的 因为PriorityBlockingQueue队列会根据内部存储的每一个元素的compareTo方法比较每个元素的大小这样在take出来的时候会根据优先级 将优先级最小的最先取出
public class PriorityBlockingQueueTest { public static PriorityBlockingQueue<User> queue = new PriorityBlockingQueue<User>(); public static void main(String[] args) { queue.add(new User(1,"wu")); queue.add(new User(5,"wu5")); queue.add(new User(23,"wu23")); queue.add(new User(55,"wu55")); queue.add(new User(9,"wu9")); queue.add(new User(3,"wu3")); for (User user : queue) { try { User s=queue.take(); System.out.println("名字:"+s.name+" 岁数:"+s.age); } catch (InterruptedException e) { e.printStackTrace(); } } } //静态内部类 static class User implements Comparable<User>{ int age; String name; public User(int age,String name) { this.age = age; this.name = name; } @Override public int compareTo(User o) { return this.age > o.age ? -1 : 1; } }}DelayQueue一个无界阻塞队列,是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间。
只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的Delayed 元素。DelayQueue是一个用来延时处理的队列,所谓延时处理就是说可以为队列中元素设定一个过期时间,相关的操作受到这个设定时间
的控制。DelayQueue使用场景a) 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。b) 缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。c) 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。队列的元素需要实现Delayed接口,获得延迟的时长。
public class Person implements Delayed { private String name; //身份证 private String id; //截止时间 private long endTime; public Person(String name,String id,long endTime){ this.name=name; this.id=id; this.endTime=endTime; } public String getName(){ return this.name; } public String getId(){ return this.id; } /** * 用来判断是否到了截止时间 */ @Override public long getDelay(TimeUnit unit) { // TODO Auto-generated method stub return endTime-System.currentTimeMillis(); } /** * 相互批较排序用 */ @Override public int compareTo(Delayed o) { // TODO Auto-generated method stub Person jia = (Person)o; return endTime-jia.endTime>0?1:0; }}public class DelayQueueTest implements Runnable{ private DelayQueue<Person> queue = new DelayQueue<Person>(); public boolean yinye =true; public void shangji(String name,String id,int money){ Person man = new Person(name,id,1000*5*money+System.currentTimeMillis()); //System.out.println(System.currentTimeMillis()); //System.out.println(1000*60*money); System.out.println("网名"+man.getName()+" 身份证"+man.getId()+"交钱"+money+"块,开始上机..."); this.queue.add(man); } public void xiaji(Person man){ System.out.println("网名"+man.getName()+" 身份证"+man.getId()+"时间到下机..."); } @Override public void run() { // TODO Auto-generated method stub while(yinye){ try { System.out.println("检查ing"); Person man = queue.take(); xiaji(man); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public static void main(String args[]){ try{ System.out.println("网吧开始营业"); DelayQueueTest siyu = new DelayQueueTest(); Thread shangwang = new Thread(siyu); shangwang.start(); siyu.shangji("路人甲", "123", 1); siyu.shangji("路人乙", "234", 2); siyu.shangji("路人丙", "345", 1); } catch(Exception ex){ ex.printStackTrace(); } }}SynchronousQueue
首先要知道SynchronousQueue没有容纳元素的能力,即它的isEmpty()方法总是返回true,但是给人的感觉却像是只能容纳一个元素。其中每个 put 必须等待一个 take,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。不能在同步队列上进行
peek,因为仅在试图要取得元素时,该元素才存在;它非常适合于传递性设计,在这种设计中,在一个线程中运行的对象要将某些信息、事件或任务传递给在另一个线程中运行的对象,
它就必须与该对象同步。另外在创建SynchronousQueue时可以传递一个boolean参数来指定它是否是访问它的线程按遵守FIFO顺序处理,true表示遵守FIFO。
public class SynchronousQueueTest { public static void main(String[] args) { SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>(); new Customer(queue).start(); new Product(queue).start(); } static class Product extends Thread{ SynchronousQueue<Integer> queue; public Product(SynchronousQueue<Integer> queue){ this.queue = queue; } @Override public void run(){ while(true){ int rand = new Random().nextInt(1000); System.out.println("生产了一个产品:"+rand); System.out.println("等待2秒后运送出去..."); try { TimeUnit.SECONDS.sleep(2); System.out.println("运送完成!"); } catch (InterruptedException e) { e.printStackTrace(); } queue.offer(rand); } } } static class Customer extends Thread{ SynchronousQueue<Integer> queue; public Customer(SynchronousQueue<Integer> queue){ this.queue = queue; } @Override public void run(){ while(true){ try { System.out.println("消费了一个产品:"+queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("------------------------------------------"); } } }}ConcurrentHashMap
原理:一个ConcurrentHashMap 由多个segment 组成,每个segment 包含一个Entity 的数组。这里比HashMap 多了一个segment 类
。该类继承了ReentrantLock 类,所以本身是一个锁。当多线程对ConcurrentHashMap 操作时,不是完全锁住map, 而是锁住相应的
segment 。这样提高了并发效率。ConcurrentHashMap除了有更好的并行性,它与java.util.HashTable类时非常相似的。当你从ConcurrentHashMap读取数据的时候
ConcurrentHashMap是不会加锁的。另外,ConcurrenthashMap在写的时候不会全部加锁。它仅仅锁住Map中正在被写入的部分。
ConcurrentHashMap的另一个不同之处是,如果在迭代的时候ConcurrentHashMap被修改了,ConcurrentHashMap是不会抛出
ConcurrentModificationException异常的。迭代器不能被超过一个以上的线程使用。
ConcurrentNavigableMap(Concurrent中 的可操作Map)
public class ConcurrentNavigableMapTest { /* * ConcurrentNavigableMap */ public static void main(String[] args){ ConcurrentNavigableMap map = new ConcurrentSkipListMap(); map.put("1", "one"); map.put("2", "two"); map.put("3", "three"); /* * haedMap() * headMap(T toKey) 返回一个包含许多键的map,其中所有键值是严格少于给定的toKey。 * 如果你改变了原始的map,这些改变也会反映到head map上。 * headMap将会指向一个仅仅包含key为“1”的ConcurrentNavigableMap,因为 * 只有这个key是严格小于“2”的。ConcurrentSkipListMap是 * ConcurrentNavigableMap的实现类。 */ System.out.println("headMap:"); ConcurrentNavigableMap headMap = map.headMap("2"); for(Object x:headMap.keySet()){ System.out.println(headMap.get(x)); } /* * tailMap() * tailMap(T fromKey)返回一个包含许多键的map,其中所有键值是大于或者等于给定的fromKey的。 */ System.out.println("tailMap:"); ConcurrentNavigableMap tailMap = map.tailMap("2"); for(Object x:tailMap.keySet()){ System.out.println(tailMap.get(x)); } /* * subMap() * subMap()方法返回一个包含许多键的map,这些键值是介于传给该方法的(including)和(excluding)两个参数之间的 */ System.out.println("subMap:"); ConcurrentNavigableMap subMap = map.subMap("2", "3"); for(Object x:subMap.keySet()){ System.out.println(subMap.get(x)); } }}ConcurrentSkipListSet:类似treeSet的有序集合
public class ConcurrentSkipListSetTest { // TODO: set是TreeSet对象时,程序会出错。 //private static Set<String> set = new TreeSet<String>(); private static Set<String> set = new ConcurrentSkipListSet<String>(); public static void main(String[] args) { // 同时启动两个线程对set进行操作! new MyThread("a").start(); new MyThread("b").start(); } private static void printAll() { String value = null; Iterator iter = set.iterator(); while(iter.hasNext()) { value = (String)iter.next(); System.out.print(value+", "); } System.out.println(); } private static class MyThread extends Thread { MyThread(String name) { super(name); } @Override public void run() { int i = 0; while (i++ < 10) { // “线程名” + "序号" String val = Thread.currentThread().getName() + (i%6); set.add(val); // 通过“Iterator”遍历set。 printAll(); } } }}CopyOnWriteArrayList
Copy-On-Write简称COW,是一种用于程序设计中的优化策略。其基本思路是,从一开始大家都在共享同一个内容,当某个人想要修
改这个内容的时候,才会真正把内容Copy出去形成一个新的内容然后再改,这是一种延时懒惰策略。CopyOnWrite容器即写时复制的
容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器
,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。CopyOnWriteArrayList类最大的特点就是,在对其实例进行修改操作(add/remove等)会新建一个数据并修改,修改完毕之后,再将
原来的引用指向新的数组。这样,修改过程没有修改原来的数组。也就没有了ConcurrentModificationException错误。CopyOnWrite
并发容器用于读多写少的并发场景。比如白名单,黑名单,商品类目的访问和更新场景!
数据一致性问题。CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上
能读到,请不要使用CopyOnWrite容器。
public class CopyOnWriteArrayListTest { public static void main(String[] args) throws InterruptedException { List<String> a = new ArrayList<String>(); a.add("a"); a.add("b"); a.add("c"); final CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<String>(a); Thread t = new Thread(new Runnable() { int count = -1; @Override public void run() { while (true) { list.add(count++ + ""); } } }); t.setDaemon(true); t.start(); Thread.currentThread().sleep(3); for (String s : list) { System.out.println(list.hashCode()); System.out.println(s); } }}public class CopyOnWriteArraySetTest { // TODO: set是HashSet对象时,程序会出错。 //private static Set<String> set = new HashSet<String>(); private static Set<String> set = new CopyOnWriteArraySet<String>(); public static void main(String[] args) { // 同时启动两个线程对set进行操作! new MyThread("ta").start(); new MyThread("tb").start(); } private static void printAll() { String value = null; Iterator iter = set.iterator(); while(iter.hasNext()) { value = (String)iter.next(); System.out.print(value+", "); } System.out.println(); } private static class MyThread extends Thread { MyThread(String name) { super(name); } @Override public void run() { int i = 0; while (i++ < 10) { // “线程名” + "-" + "序号" String val = Thread.currentThread().getName() + "-" + (i%6); set.add(val); // 通过“Iterator”遍历set。 printAll(); } } }}
新闻热点
疑难解答