首页 > 编程 > Java > 正文

spring与disruptor集成的简单示例

2019-11-26 10:14:38
字体:
来源:转载
供稿:网友

disruptor不过多介绍了,描述下当前的业务场景,两个应用A,B,应用 A 向应用 B 传递数据 . 数据传送比较快,如果用http直接push数据然后入库,效率不高.有可能导致A应用比较大的压力. 使用mq 太重量级,所以选择了disruptor. 也可以使用Reactor

BaseQueueHelper.java

/** * lmax.disruptor 高效队列处理模板. 支持初始队列,即在init()前进行发布。 * * 调用init()时才真正启动线程开始处理 系统退出自动清理资源. * * @author xielongwang * @create 2018-01-18 下午3:49 * @email xielong.wang@nvr-china.com * @description */public abstract class BaseQueueHelper<D, E extends ValueWrapper<D>, H extends WorkHandler<E>> {  /**   * 记录所有的队列,系统退出时统一清理资源   */  private static List<BaseQueueHelper> queueHelperList = new ArrayList<BaseQueueHelper>();  /**   * Disruptor 对象   */  private Disruptor<E> disruptor;  /**   * RingBuffer   */  private RingBuffer<E> ringBuffer;  /**   * initQueue   */  private List<D> initQueue = new ArrayList<D>();  /**   * 队列大小   *   * @return 队列长度,必须是2的幂   */  protected abstract int getQueueSize();  /**   * 事件工厂   *   * @return EventFactory   */  protected abstract EventFactory<E> eventFactory();  /**   * 事件消费者   *   * @return WorkHandler[]   */  protected abstract WorkHandler[] getHandler();  /**   * 初始化   */  public void init() {    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("DisruptorThreadPool").build();    disruptor = new Disruptor<E>(eventFactory(), getQueueSize(), namedThreadFactory, ProducerType.SINGLE, getStrategy());    disruptor.setDefaultExceptionHandler(new MyHandlerException());    disruptor.handleEventsWithWorkerPool(getHandler());    ringBuffer = disruptor.start();    //初始化数据发布    for (D data : initQueue) {      ringBuffer.publishEvent(new EventTranslatorOneArg<E, D>() {        @Override        public void translateTo(E event, long sequence, D data) {          event.setValue(data);        }      }, data);    }    //加入资源清理钩子    synchronized (queueHelperList) {      if (queueHelperList.isEmpty()) {        Runtime.getRuntime().addShutdownHook(new Thread() {          @Override          public void run() {            for (BaseQueueHelper baseQueueHelper : queueHelperList) {              baseQueueHelper.shutdown();            }          }        });      }      queueHelperList.add(this);    }  }  /**   * 如果要改变线程执行优先级,override此策略. YieldingWaitStrategy会提高响应并在闲时占用70%以上CPU,   * 慎用SleepingWaitStrategy会降低响应更减少CPU占用,用于日志等场景.   *   * @return WaitStrategy   */  protected abstract WaitStrategy getStrategy();  /**   * 插入队列消息,支持在对象init前插入队列,则在队列建立时立即发布到队列处理.   */  public synchronized void publishEvent(D data) {    if (ringBuffer == null) {      initQueue.add(data);      return;    }    ringBuffer.publishEvent(new EventTranslatorOneArg<E, D>() {      @Override      public void translateTo(E event, long sequence, D data) {        event.setValue(data);      }    }, data);  }  /**   * 关闭队列   */  public void shutdown() {    disruptor.shutdown();  }}

EventFactory.java

/** * @author xielongwang * @create 2018-01-18 下午6:24 * @email xielong.wang@nvr-china.com * @description */public class EventFactory implements com.lmax.disruptor.EventFactory<SeriesDataEvent> {  @Override  public SeriesDataEvent newInstance() {    return new SeriesDataEvent();  }}

MyHandlerException.java

public class MyHandlerException implements ExceptionHandler {  private Logger logger = LoggerFactory.getLogger(MyHandlerException.class);  /*   * (non-Javadoc) 运行过程中发生时的异常   *   * @see   * com.lmax.disruptor.ExceptionHandler#handleEventException(java.lang.Throwable   * , long, java.lang.Object)   */  @Override  public void handleEventException(Throwable ex, long sequence, Object event) {    ex.printStackTrace();    logger.error("process data error sequence ==[{}] event==[{}] ,ex ==[{}]", sequence, event.toString(), ex.getMessage());  }  /*   * (non-Javadoc) 启动时的异常   *   * @see   * com.lmax.disruptor.ExceptionHandler#handleOnStartException(java.lang.   * Throwable)   */  @Override  public void handleOnStartException(Throwable ex) {    logger.error("start disruptor error ==[{}]!", ex.getMessage());  }  /*   * (non-Javadoc) 关闭时的异常   *   * @see   * com.lmax.disruptor.ExceptionHandler#handleOnShutdownException(java.lang   * .Throwable)   */  @Override  public void handleOnShutdownException(Throwable ex) {    logger.error("shutdown disruptor error ==[{}]!", ex.getMessage());  }}

SeriesData.java (代表应用A发送给应用B的消息)

public class SeriesData {  private String deviceInfoStr;  public SeriesData() {  }  public SeriesData(String deviceInfoStr) {    this.deviceInfoStr = deviceInfoStr;  }  public String getDeviceInfoStr() {    return deviceInfoStr;  }  public void setDeviceInfoStr(String deviceInfoStr) {    this.deviceInfoStr = deviceInfoStr;  }  @Override  public String toString() {    return "SeriesData{" +        "deviceInfoStr='" + deviceInfoStr + '/'' +        '}';  }}

SeriesDataEvent.java

public class SeriesDataEvent extends ValueWrapper<SeriesData> {}

SeriesDataEventHandler.java

public class SeriesDataEventHandler implements WorkHandler<SeriesDataEvent> {  private Logger logger = LoggerFactory.getLogger(SeriesDataEventHandler.class);  @Autowired  private DeviceInfoService deviceInfoService;  @Override  public void onEvent(SeriesDataEvent event) {    if (event.getValue() == null || StringUtils.isEmpty(event.getValue().getDeviceInfoStr())) {      logger.warn("receiver series data is empty!");    }    //业务处理    deviceInfoService.processData(event.getValue().getDeviceInfoStr());  }}

SeriesDataEventQueueHelper.java

@Componentpublic class SeriesDataEventQueueHelper extends BaseQueueHelper<SeriesData, SeriesDataEvent, SeriesDataEventHandler> implements InitializingBean {  private static final int QUEUE_SIZE = 1024;  @Autowired  private List<SeriesDataEventHandler> seriesDataEventHandler;  @Override  protected int getQueueSize() {    return QUEUE_SIZE;  }  @Override  protected com.lmax.disruptor.EventFactory eventFactory() {    return new EventFactory();  }  @Override  protected WorkHandler[] getHandler() {    int size = seriesDataEventHandler.size();    SeriesDataEventHandler[] paramEventHandlers = (SeriesDataEventHandler[]) seriesDataEventHandler.toArray(new SeriesDataEventHandler[size]);    return paramEventHandlers;  }  @Override  protected WaitStrategy getStrategy() {    return new BlockingWaitStrategy();    //return new YieldingWaitStrategy();  }  @Override  public void afterPropertiesSet() throws Exception {    this.init();  }}

ValueWrapper.java

public abstract class ValueWrapper<T> {  private T value;  public ValueWrapper() {}  public ValueWrapper(T value) {    this.value = value;  }  public T getValue() {    return value;  }  public void setValue(T value) {    this.value = value;  }}

DisruptorConfig.java

@Configuration@ComponentScan(value = {"com.portal.disruptor"})//多实例几个消费者public class DisruptorConfig {  /**   * smsParamEventHandler1   *   * @return SeriesDataEventHandler   */  @Bean  public SeriesDataEventHandler smsParamEventHandler1() {    return new SeriesDataEventHandler();  }  /**   * smsParamEventHandler2   *   * @return SeriesDataEventHandler   */  @Bean  public SeriesDataEventHandler smsParamEventHandler2() {    return new SeriesDataEventHandler();  }  /**   * smsParamEventHandler3   *   * @return SeriesDataEventHandler   */  @Bean  public SeriesDataEventHandler smsParamEventHandler3() {    return new SeriesDataEventHandler();  }  /**   * smsParamEventHandler4   *   * @return SeriesDataEventHandler   */  @Bean  public SeriesDataEventHandler smsParamEventHandler4() {    return new SeriesDataEventHandler();  }  /**   * smsParamEventHandler5   *   * @return SeriesDataEventHandler   */  @Bean  public SeriesDataEventHandler smsParamEventHandler5() {    return new SeriesDataEventHandler();  }}

测试

  //注入SeriesDataEventQueueHelper消息生产者  @Autowired  private SeriesDataEventQueueHelper seriesDataEventQueueHelper;  @RequestMapping(value = "/data", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE)  public DataResponseVo<String> receiverDeviceData(@RequestBody String deviceData) {    long startTime1 = System.currentTimeMillis();    if (StringUtils.isEmpty(deviceData)) {      logger.info("receiver data is empty !");      return new DataResponseVo<String>(400, "failed");    }    seriesDataEventQueueHelper.publishEvent(new SeriesData(deviceData));    long startTime2 = System.currentTimeMillis();    logger.info("receiver data ==[{}] millisecond ==[{}]", deviceData, startTime2 - startTime1);    return new DataResponseVo<String>(200, "success");  }

应用A通过/data 接口把数据发送到应用B ,然后通过seriesDataEventQueueHelper 把消息发给disruptor队列,消费者去消费,整个过程对不会堵塞应用A. 可接受消息丢失, 可以通过扩展SeriesDataEventQueueHelper来达到对disruptor队列的监控

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持武林网。

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表