首页 > 编程 > Java > 正文

Java高并发

2019-11-10 17:19:13
字体:
来源:转载
供稿:网友

java高并发常用基础知识:

1.CountDownLatch的使用

应用场景:当初始化需要多个操作的时候。也就是事先需要多个准备工作,然后等所有准备工作做完之后再开始一个新的工作。

重点:多个线程在执行,一个线程在等待。

package concurrent;import java.util.concurrent.CountDownLatch;/** * CountDownLatch主要用来解决多个初始化操作的情况 */public class UserCountDownLatch {    public static void main(String[] args) {        final CountDownLatch countDownLatch = new CountDownLatch(2);        Thread t1 = new Thread(new Runnable() {            public void run() {                System.out.PRintln("主线程初始化开始");                try {                    countDownLatch.await();                } catch (InterruptedException e) {                    e.printStackTrace();                }                System.out.println("主线程初始化完成,开始执行操作"+Thread.currentThread().getName());            }        },"t1");        Thread t2 = new Thread(new Runnable() {            public void run() {                System.out.println("初始化t2开始");                try {                    Thread.sleep(200);                } catch (InterruptedException e) {                    e.printStackTrace();                }                countDownLatch.countDown();                System.out.println("初始化t2完成");            }        });        Thread t3 = new Thread(new Runnable() {            public void run() {                System.out.println("初始化t3开始");                try {                    Thread.sleep(300);                } catch (InterruptedException e) {                    e.printStackTrace();                }                countDownLatch.countDown();                System.out.println("初始化t3完成");            }        });        t1.start();        t2.start();        t3.start();    }}    

2. CyclicBarrier的使用

应用场景:运动员比赛赛跑,需要所有运动员都准备好后,才能同时开始进行比赛。

重点:多个线程在等待,完成后多个线程执行各自的逻辑。

package concurrent;import java.util.Random;import java.util.concurrent.CyclicBarrier;import java.util.concurrent.Executor;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** * Created by lfeng 2017/2/6. */public class UserCyclicBarrier {    static class Runner implements Runnable{        private CyclicBarrier cyclicBarrier;        private String name;        public Runner(CyclicBarrier cyclicBarrier,String name) {            this.cyclicBarrier = cyclicBarrier;            this.name = name;        }        public void run() {            try {                Thread.sleep(1000 * new Random().nextInt(5));                System.out.println(name + "准备就绪");                cyclicBarrier.await();            } catch (Exception e) {                e.printStackTrace();            }            System.out.println(name + "Go...");        }    }    public static void main(String[] args) {        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);        ExecutorService executors = Executors.newFixedThreadPool(3);        executors.submit(new Runner(cyclicBarrier,"张三"));        executors.submit(new Runner(cyclicBarrier,"李四"));        executors.submit(new Runner(cyclicBarrier,"王五"));        executors.shutdown();    }}

3.FutureTask的使用

package concurrent;import java.util.Random;import java.util.concurrent.*;/** * Created by lfeng on 2017/2/6. */public class UserFuture {        static class  Query implements Callable{            public Object call() throws Exception {                Thread.sleep(4000);                return "query datas";            }        }    public static void main(String[] args) {        String query = "query";        //jdk的future模式        FutureTask futureTask = new FutureTask(new Query());        FutureTask futureTask2 = new FutureTask(new Query());        ExecutorService executors = Executors.newFixedThreadPool(2);        executors.submit(futureTask);        executors.submit(futureTask2);        try {            //此处的两个任务是并行执行的            System.out.println("数据为:"+ futureTask.get());            System.out.println("数据为:"+ futureTask2.get());        } catch (InterruptedException e) {            e.printStackTrace();        } catch (ExecutionException e) {            e.printStackTrace();        }        System.out.println("aaaa");        executors.shutdown();    }}

4.Disruptor并行框架的使用

入门:http://ifeve.com/disruptor-getting-started/

进阶:http://ifeve.com/disruptor-dsl/

package disruptor.base;/** * Created by Administrator on 2017/2/8. */public class Event {    private long value;    public long getValue() {        return value;    }    public void setValue(long value) {        this.value = value;    }}

package disruptor.base;import com.lmax.disruptor.EventFactory;/** * Created by Administrator on 2017/2/8. */public class EventDataFactory implements EventFactory {    public Object newInstance() {        return new Event();    }}

package disruptor.base;import com.lmax.disruptor.EventHandler;/** * Created by Administrator on 2017/2/8. *///我们还需要一个事件消费者,也就是一个事件处理器。这个事件处理器简单地把事件中存储的数据打印到终端:class EventDataHandler implements EventHandler<Event> {    public void onEvent(Event event, long l, boolean b) throws Exception {        System.out.println(event.getValue());    }}

package disruptor.base;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.YieldingWaitStrategy;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.dsl.ProducerType;import java.util.Random;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** * Created by Administrator on 2017/2/8. */public class EventMain {    public static void main(String[] args) {        //声明固定的线程池        ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());        //声明自定义的数据工厂        EventDataFactory dataFactory = new EventDataFactory();        Disruptor<Event> disruptor = new Disruptor<Event>(dataFactory, 1024,executorService, ProducerType.SINGLE, new YieldingWaitStrategy());        // 连接消费事件方法        disruptor.handleEventsWith(new EventDataHandler());        // 启动        disruptor.start();        //Disruptor 的事件发布过程是一个两阶段提交的过程:        //发布事件        RingBuffer<Event> ringBuffer = disruptor.getRingBuffer();        EventProduct eventProduct = new EventProduct(ringBuffer);        for(long l = 0; l<100; l++){            eventProduct.onData(new Random().nextInt(100));        }        disruptor.shutdown();//关闭 disruptor,方法会堵塞,直至所有的事件都得到处理;        executorService.shutdown();//关闭 disruptor 使用的线程池;如果需要的话,必须手动关闭, disruptor 在 shutdown 时不会自动关闭;    }}

package disruptor.base;import com.lmax.disruptor.RingBuffer;/** * Created by Administrator on 2017/2/8. */public class EventProduct {    private final RingBuffer<Event> ringBuffer;    public EventProduct(RingBuffer<Event> ringBuffer){        this.ringBuffer = ringBuffer;    }    /**     * onData用来发布事件,每调用一次就发布一次事件     * 它的参数会用过事件传递给消费者     */    public void onData(long data){        //可以把ringBuffer看做一个事件队列,那么next就是得到下面一个事件槽        long sequence = ringBuffer.next();        try {            //用上面的索引取出一个空的事件用于填充(获取该序号对应的事件对象)            Event event = ringBuffer.get(sequence);            //获取要通过事件传递的业务数据            event.setValue(data);        } finally {            //发布事件            //注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须得到调用;如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。            ringBuffer.publish(sequence);        }    }}

高级用法:

package disruptor.multi;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import com.lmax.disruptor.BusySpinWaitStrategy;import com.lmax.disruptor.EventFactory;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.dsl.EventHandlerGroup;import com.lmax.disruptor.dsl.ProducerType;public class Main {      public static void main(String[] args) throws InterruptedException {             	long beginTime=System.currentTimeMillis();          int bufferSize=1024;          ExecutorService executor=Executors.newFixedThreadPool(8);          Disruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {            public Trade newInstance() {                  return new Trade();              }          }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());                  //菱形操作        //使用disruptor创建消费者组C1,C2          EventHandlerGroup<Trade> handlerGroup =        		disruptor.handleEventsWith(new Handler1(), new Handler2());        //声明在C1,C2完事之后执行JMS消息发送操作 也就是流程走到C3         handlerGroup.then(new Handler3());        //顺序操作        /**        disruptor.handleEventsWith(new Handler1()).        	handleEventsWith(new Handler2()).        	handleEventsWith(new Handler3());        */                //六边形操作.         /**        Handler1 h1 = new Handler1();        Handler2 h2 = new Handler2();        Handler3 h3 = new Handler3();        Handler4 h4 = new Handler4();        Handler5 h5 = new Handler5();        disruptor.handleEventsWith(h1, h2);        disruptor.after(h1).handleEventsWith(h4);        disruptor.after(h2).handleEventsWith(h5);        disruptor.after(h4, h5).handleEventsWith(h3);        */                                disruptor.start();//启动          CountDownLatch latch=new CountDownLatch(1);          //生产者准备          executor.submit(new TradePublisher(latch, disruptor));                latch.await();//等待生产者完事.                disruptor.shutdown();          executor.shutdown();          System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));      }  }  


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表