首页 > 编程 > Java > 正文

【java总结】多线程进阶篇之数据结构

2019-11-08 02:05:05
字体:
来源:转载
供稿:网友

BlockingQueue阻塞队列

java5线程新特征中的内容,Java定义了阻塞队列的接口 java.util.concurrent.BlockingQueue,阻塞队列的概念是,一个指定长度的队列,如果队列满了,添加新元素的操作会被阻塞等待,直到有空位为止。同样,当队列为空时候,请求队列元素的操作同样会阻塞等待,直到有可用元素为止。

public class BlockingQueueTest {	public static void main(String[] args) throws InterruptedException {         BlockingQueue bqueue = new ArrayBlockingQueue(20);         for (int i = 0; i < 30; i++) {                 //将指定元素添加到此队列中,如果没有可用空间,将一直等待(如果有必要)。                 bqueue.put(i);                 System.out.PRintln("向阻塞队列中添加了元素:" + i);         }         //程序没有运行至此,说明被线程被阻塞        System.out.println("程序到此运行结束,即将退出----"); } }BlockingDeque阻塞栈

与阻塞队列相似。不同点在于栈是“后入先出”的结构,每次操作的是栈顶,而队列是“先进先出”的结构,每次操作的是队列头。

public class BlockingDequeTest {	public static void main(String[] args) throws InterruptedException {         BlockingDeque bDeque = new LinkedBlockingDeque(10);         for (int i = 0; i < 10; i++) {                 bDeque.putFirst(i);                 System.out.println("向阻塞栈中添加了元素:" + i);         }         for (int i = 0; i < 10; i++) {              System.out.println("从阻塞栈中弹出了元素:" +bDeque.pop());         }         for (int i = 0; i < 11; i++) {             //将指定元素添加到此阻塞栈中,如果没有可用空间,将一直等待(如果有必要)。             bDeque.putFirst(i);             System.out.println("向阻塞栈中添加了元素:" + i);     }         //程序没有运行至此,说明线程被阻塞        System.out.println("程序到此运行结束,即将退出----"); } }

LinkedBlockingQueue 

实现了 BlockingQueue 接口。LinkedBlockingQueue 内部以一个链式结构(链接节点)对其元素进行存储。如果需要的话,这一链式结构可以选择一个上限。如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限。LinkedBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。

LinkedBlockingDeque

LinkedBlockingDeque是双向链表实现的双向并发阻塞队列。该阻塞队列同时支持FIFO和FILO两种操作方式,即可以从队列的头和尾同时操作(插入/删除);并且,该阻塞队列是支持线程安全。此外,LinkedBlockingDeque还是可选容量的(防止过度膨胀),即可以指定队列的容量。如果不指定,默认容量大小等于Integer.MAX_VALUE。

lock是控制对LinkedBlockingDeque的互斥锁,当多个线程竞争同时访问LinkedBlockingDeque时,某线程获取到了互斥锁lock,其

它线程则需要阻塞等待,直到该线程释放lock,其它线程才有机会获取lock从而获取cpu执行权。notEmpty和notFull分别是“非空条件”和“未满条件”。通过它们能够更加细腻进行并发控制。-- 若某线程(线程A)要取出数据时,队列正好为空,则该线程会执行notEmpty.await()进行等待;当其它某个线程(线程B)向队列中插

入了数据之后,会调用notEmpty.signal()唤醒“notEmpty上的等待线程”。此时,线程A会被唤醒从而得以继续运行。 此外,线程A在

执行取操作前,会获取takeLock,在取操作执行完毕再释放takeLock。-- 若某线程(线程H)要插入数据时,队列已满,则该线程会它执行notFull.await()进行等待;当其它某个线程(线程I)取出数据之后,

会调用notFull.signal()唤醒“notFull上的等待线程”。此时,线程H就会被唤醒从而得以继续运行。 此外,线程H在执行插入操作前,

会获取putLock,在插入操作执行完毕才释放putLock。

public class LinkedBlockingDequeTest {    private static Queue<String> queue = new LinkedBlockingDeque<String>();    public static void main(String[] args) {            // 同时启动两个线程对queue进行操作!        new MyThread("ta").start();        new MyThread("tb").start();    }    private static void printAll() {        String value;        Iterator iter = queue.iterator();        while(iter.hasNext()) {            value = (String)iter.next();            System.out.print(value+", ");        }        System.out.println();    }    private static class MyThread extends Thread {        MyThread(String name) {            super(name);        }        @Override        public void run() {                int i = 0;            while (i++ < 5) {                // “线程名” + "-" + "序号"                String val = Thread.currentThread().getName()+i;                queue.add(val);                // 通过“Iterator”遍历queue。                printAll();            }        }    }}

PriorityBlockingQueue类

是JDK提供的优先级队列 本身是线程安全的 内部使用显示锁 保证线程安全PriorityBlockingQueue存储的对象必须是实现Comparable接口的 因为PriorityBlockingQueue队列会根据内部存储的每一个元素的compareTo方法比较每个元素的大小这样在take出来的时候会根据优先级 将优先级最小的最先取出 

public class PriorityBlockingQueueTest {	public static PriorityBlockingQueue<User> queue = new PriorityBlockingQueue<User>();    public static void main(String[] args) {        queue.add(new User(1,"wu"));        queue.add(new User(5,"wu5"));        queue.add(new User(23,"wu23"));        queue.add(new User(55,"wu55"));        queue.add(new User(9,"wu9"));        queue.add(new User(3,"wu3"));        for (User user : queue) {            try {            	User s=queue.take();                System.out.println("名字:"+s.name+" 岁数:"+s.age);            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }    //静态内部类    static class User implements Comparable<User>{        int age;        String name;        public User(int age,String name) {            this.age = age;            this.name = name;        }        @Override        public int compareTo(User o) {            return this.age > o.age ? -1 : 1;        }    }}DelayQueue

一个无界阻塞队列,是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间。

只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的Delayed 元素。DelayQueue是一个用来延时处理的队列,所谓延时处理就是说可以为队列中元素设定一个过期时间,相关的操作受到这个设定时间

的控制。DelayQueue使用场景a) 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。b) 缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。c) 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。队列的元素需要实现Delayed接口,获得延迟的时长。

public class Person implements  Delayed {	private String name;	//身份证	private String id;	//截止时间	private long endTime;	public Person(String name,String id,long endTime){		this.name=name;		this.id=id;		this.endTime=endTime;	}		public String getName(){		return this.name;	}		public String getId(){		return this.id;	}	/**	 * 用来判断是否到了截止时间	 */	@Override	public long getDelay(TimeUnit unit) {		// TODO Auto-generated method stub		return endTime-System.currentTimeMillis();	}	/**	 * 相互批较排序用	 */	@Override	public int compareTo(Delayed o) {		// TODO Auto-generated method stub		Person jia = (Person)o;		return endTime-jia.endTime>0?1:0;	}}
public class DelayQueueTest implements Runnable{	private DelayQueue<Person> queue = new DelayQueue<Person>();	public boolean yinye =true;		public void shangji(String name,String id,int money){		Person man = new Person(name,id,1000*5*money+System.currentTimeMillis());				//System.out.println(System.currentTimeMillis());		//System.out.println(1000*60*money);		System.out.println("网名"+man.getName()+" 身份证"+man.getId()+"交钱"+money+"块,开始上机...");		this.queue.add(man);	}		public void xiaji(Person man){		System.out.println("网名"+man.getName()+" 身份证"+man.getId()+"时间到下机...");	}	@Override	public void run() {		// TODO Auto-generated method stub		while(yinye){			try {				System.out.println("检查ing");				Person man = queue.take();				xiaji(man);			} catch (InterruptedException e) {				// TODO Auto-generated catch block				e.printStackTrace();			}		}	}		public static void main(String args[]){		try{			System.out.println("网吧开始营业");			DelayQueueTest siyu = new DelayQueueTest();			Thread shangwang = new Thread(siyu);			shangwang.start();			siyu.shangji("路人甲", "123", 1);			siyu.shangji("路人乙", "234", 2);			siyu.shangji("路人丙", "345", 1);		}		catch(Exception ex){			ex.printStackTrace();		}	}}

SynchronousQueue

首先要知道SynchronousQueue没有容纳元素的能力,即它的isEmpty()方法总是返回true,但是给人的感觉却像是只能容纳一个元素。其中每个 put 必须等待一个 take,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。不能在同步队列上进行

peek,因为仅在试图要取得元素时,该元素才存在;它非常适合于传递性设计,在这种设计中,在一个线程中运行的对象要将某些信息、事件或任务传递给在另一个线程中运行的对象,

它就必须与该对象同步。另外在创建SynchronousQueue时可以传递一个boolean参数来指定它是否是访问它的线程按遵守FIFO顺序处理,true表示遵守FIFO。 

public class SynchronousQueueTest {	public static void main(String[] args) {		SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();		new Customer(queue).start();		new Product(queue).start();	}	static class Product extends Thread{		SynchronousQueue<Integer> queue;		public Product(SynchronousQueue<Integer> queue){			this.queue = queue;		}		@Override		public void run(){			while(true){				int rand = new Random().nextInt(1000);				System.out.println("生产了一个产品:"+rand);				System.out.println("等待2秒后运送出去...");				try {					TimeUnit.SECONDS.sleep(2);					System.out.println("运送完成!");				} catch (InterruptedException e) {					e.printStackTrace();				}				queue.offer(rand);			}		}	}	static class Customer extends Thread{		SynchronousQueue<Integer> queue;		public Customer(SynchronousQueue<Integer> queue){			this.queue = queue;		}		@Override		public void run(){			while(true){				try {					System.out.println("消费了一个产品:"+queue.take());				} catch (InterruptedException e) {					e.printStackTrace();				}				System.out.println("------------------------------------------");			}		}	}}

ConcurrentHashMap

原理:一个ConcurrentHashMap 由多个segment 组成,每个segment 包含一个Entity 的数组。这里比HashMap 多了一个segment 类

。该类继承了ReentrantLock 类,所以本身是一个锁。当多线程对ConcurrentHashMap 操作时,不是完全锁住map, 而是锁住相应的

segment 。这样提高了并发效率。ConcurrentHashMap除了有更好的并行性,它与java.util.HashTable类时非常相似的。当你从ConcurrentHashMap读取数据的时候

ConcurrentHashMap是不会加锁的。另外,ConcurrenthashMap在写的时候不会全部加锁。它仅仅锁住Map中正在被写入的部分。

ConcurrentHashMap的另一个不同之处是,如果在迭代的时候ConcurrentHashMap被修改了,ConcurrentHashMap是不会抛出

ConcurrentModificationException异常的。迭代器不能被超过一个以上的线程使用。

ConcurrentNavigableMap(Concurrent中 的可操作Map)

public class ConcurrentNavigableMapTest {	/*	 * ConcurrentNavigableMap	 */	public static void main(String[] args){		ConcurrentNavigableMap map = new ConcurrentSkipListMap();		map.put("1", "one");		map.put("2", "two");		map.put("3", "three");		/*		 * haedMap()		 * headMap(T toKey) 返回一个包含许多键的map,其中所有键值是严格少于给定的toKey。		 * 如果你改变了原始的map,这些改变也会反映到head map上。		 * headMap将会指向一个仅仅包含key为“1”的ConcurrentNavigableMap,因为		 * 只有这个key是严格小于“2”的。ConcurrentSkipListMap是		 * ConcurrentNavigableMap的实现类。		 */		System.out.println("headMap:");		ConcurrentNavigableMap headMap = map.headMap("2");		for(Object x:headMap.keySet()){			System.out.println(headMap.get(x));		}		/*		 * tailMap()		 * tailMap(T fromKey)返回一个包含许多键的map,其中所有键值是大于或者等于给定的fromKey的。		 */		System.out.println("tailMap:");		ConcurrentNavigableMap tailMap = map.tailMap("2");		for(Object x:tailMap.keySet()){			System.out.println(tailMap.get(x));		}		/*		 * subMap()		 * subMap()方法返回一个包含许多键的map,这些键值是介于传给该方法的(including)和(excluding)两个参数之间的		 */		System.out.println("subMap:");		ConcurrentNavigableMap subMap = map.subMap("2", "3");		for(Object x:subMap.keySet()){			System.out.println(subMap.get(x));		}	}}

ConcurrentSkipListSet:类似treeSet的有序集合

public class ConcurrentSkipListSetTest {	// TODO: set是TreeSet对象时,程序会出错。    //private static Set<String> set = new TreeSet<String>();    private static Set<String> set = new ConcurrentSkipListSet<String>();    public static void main(String[] args) {            // 同时启动两个线程对set进行操作!        new MyThread("a").start();        new MyThread("b").start();    }    private static void printAll() {        String value = null;        Iterator iter = set.iterator();        while(iter.hasNext()) {            value = (String)iter.next();            System.out.print(value+", ");        }        System.out.println();    }    private static class MyThread extends Thread {        MyThread(String name) {            super(name);        }        @Override        public void run() {                int i = 0;            while (i++ < 10) {                // “线程名” + "序号"                String val = Thread.currentThread().getName() + (i%6);                set.add(val);                // 通过“Iterator”遍历set。                printAll();            }        }    }}

CopyOnWriteArrayList

Copy-On-Write简称COW,是一种用于程序设计中的优化策略。其基本思路是,从一开始大家都在共享同一个内容,当某个人想要修

改这个内容的时候,才会真正把内容Copy出去形成一个新的内容然后再改,这是一种延时懒惰策略。CopyOnWrite容器即写时复制的

容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器

,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。CopyOnWriteArrayList类最大的特点就是,在对其实例进行修改操作(add/remove等)会新建一个数据并修改,修改完毕之后,再将

原来的引用指向新的数组。这样,修改过程没有修改原来的数组。也就没有了ConcurrentModificationException错误。CopyOnWrite

并发容器用于读多写少的并发场景。比如白名单,黑名单,商品类目的访问和更新场景!

数据一致性问题。CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上

能读到,请不要使用CopyOnWrite容器。

public class CopyOnWriteArrayListTest {	public static void main(String[] args) throws InterruptedException {		List<String> a = new ArrayList<String>();		a.add("a");		a.add("b");		a.add("c");		final CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<String>(a);		Thread t = new Thread(new Runnable() {			int count = -1;			@Override			public void run() {				while (true) {					list.add(count++ + "");				}			}		});		t.setDaemon(true);		t.start();		Thread.currentThread().sleep(3);		for (String s : list) {			System.out.println(list.hashCode());			System.out.println(s);		}	}}

public class CopyOnWriteArraySetTest {	// TODO: set是HashSet对象时,程序会出错。    //private static Set<String> set = new HashSet<String>();    private static Set<String> set = new CopyOnWriteArraySet<String>();    public static void main(String[] args) {        // 同时启动两个线程对set进行操作!        new MyThread("ta").start();        new MyThread("tb").start();    }    private static void printAll() {        String value = null;        Iterator iter = set.iterator();        while(iter.hasNext()) {            value = (String)iter.next();            System.out.print(value+", ");        }        System.out.println();    }    private static class MyThread extends Thread {        MyThread(String name) {            super(name);        }        @Override        public void run() {                int i = 0;            while (i++ < 10) {                // “线程名” + "-" + "序号"                String val = Thread.currentThread().getName() + "-" + (i%6);                set.add(val);                // 通过“Iterator”遍历set。                printAll();            }        }    }}


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表