首页 > 编程 > Java > 正文

SpringBoot集成RabbitMQ的方法(死信队列)

2019-11-26 09:02:57
字体:
来源:转载
供稿:网友

介绍

死信队列:没有被及时消费的消息存放的队列,消息没有被及时消费有以下几点原因:
1.有消息被拒绝(basic.reject/ basic.nack)并且requeue=false
2.队列达到最大长度
3.消息TTL过期

场景

1.小时进入初始队列,等待30分钟后进入5分钟队列
2.消息等待5分钟后进入执行队列
3.执行失败后重新回到5分钟队列
4.失败5次后,消息进入2小时队列
5.消息等待2小时进入执行队列
6.失败5次后,将消息丢弃或做其他处理

使用

安装MQ

使用docker方式安装,选择带mangement的版本

docker pull rabbitmq:managementdocker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management

访问 localhost: 15672,默认账号密码guest/guest

项目配置

(1)创建springboot项目
(2)在application.properties配置文件中配置mq连接信息

spring.rabbitmq.host=localhostspring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guest

(3)队列配置

package com.df.ps.mq;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;import org.springframework.beans.factory.annotation.Autowire;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class MqConfig {  //time  @Value("${spring.df.buffered.min:120}")  private int springdfBufferedTime;  @Value("${spring.df.high-buffered.min:5}")  private int springdfHighBufferedTime;  @Value("${spring.df.low-buffered.min:120}")  private int springdfLowBufferedTime;  // 30min Buffered Queue  @Value("${spring.df.queue:spring-df-buffered-queue}")  private String springdfBufferedQueue;  @Value("${spring.df.topic:spring-df-buffered-topic}")  private String springdfBufferedTopic;  @Value("${spring.df.route:spring-df-buffered-route}")  private String springdfBufferedRouteKey;  // 5M Buffered Queue  @Value("${spring.df.high-buffered.queue:spring-df-high-buffered-queue}")  private String springdfHighBufferedQueue;  @Value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}")  private String springdfHighBufferedTopic;  @Value("${spring.df.high-buffered.route:spring-df-high-buffered-route}")  private String springdfHighBufferedRouteKey;  // High Queue  @Value("${spring.df.high.queue:spring-df-high-queue}")  private String springdfHighQueue;  @Value("${spring.df.high.topic:spring-df-high-topic}")  private String springdfHighTopic;  @Value("${spring.df.high.route:spring-df-high-route}")  private String springdfHighRouteKey;  // 2H Low Buffered Queue  @Value("${spring.df.low-buffered.queue:spring-df-low-buffered-queue}")  private String springdfLowBufferedQueue;  @Value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}")  private String springdfLowBufferedTopic;  @Value("${spring.df.low-buffered.route:spring-df-low-buffered-route}")  private String springdfLowBufferedRouteKey;  // Low Queue  @Value("${spring.df.low.queue:spring-df-low-queue}")  private String springdfLowQueue;  @Value("${spring.df.low.topic:spring-df-low-topic}")  private String springdfLowTopic;  @Value("${spring.df.low.route:spring-df-low-route}")  private String springdfLowRouteKey;  @Bean(autowire = Autowire.BY_NAME, value = "springdfBufferedQueue")  Queue springdfBufferedQueue() {    int bufferedTime = 1000 * 60 * springdfBufferedTime;    return createBufferedQueue(springdfBufferedQueue, springdfHighBufferedTopic, springdfHighBufferedRouteKey, bufferedTime);  }  @Bean(autowire = Autowire.BY_NAME, value = "springdfHighBufferedQueue")  Queue springdfHighBufferedQueue() {    int highBufferedTime = 1000 * 60 * springdfHighBufferedTime;    return createBufferedQueue(springdfHighBufferedQueue, springdfHighTopic, springdfHighRouteKey, highBufferedTime);  }  @Bean(autowire = Autowire.BY_NAME, value = "springdfHighQueue")  Queue springdfHighQueue() {    return new Queue(springdfHighQueue, true);  }  @Bean(autowire = Autowire.BY_NAME, value = "springdfLowBufferedQueue")  Queue springdfLowBufferedQueue() {    int lowBufferedTime = 1000 * 60 * springdfLowBufferedTime;    return createBufferedQueue(springdfLowBufferedQueue, springdfLowTopic, springdfLowRouteKey, lowBufferedTime);  }  @Bean(autowire = Autowire.BY_NAME, value = "springdfLowQueue")  Queue springdfLowQueue() {    return new Queue(springdfLowQueue, true);  }  @Bean(autowire = Autowire.BY_NAME, value = "springdfBufferedTopic")  TopicExchange springdfBufferedTopic() {    return new TopicExchange(springdfBufferedTopic);  }  @Bean  Binding springBuffereddf(Queue springdfBufferedQueue, TopicExchange springdfBufferedTopic) {    return BindingBuilder.bind(springdfBufferedQueue).to(springdfBufferedTopic).with(springdfBufferedRouteKey);  }  @Bean(autowire = Autowire.BY_NAME, value = "springdfHighBufferedTopic")  TopicExchange springdfHighBufferedTopic() {    return new TopicExchange(springdfHighBufferedTopic);  }  @Bean  Binding springHighBuffereddf(Queue springdfHighBufferedQueue, TopicExchange springdfHighBufferedTopic) {    return BindingBuilder.bind(springdfHighBufferedQueue).to(springdfHighBufferedTopic).with(springdfHighBufferedRouteKey);  }  @Bean(autowire = Autowire.BY_NAME, value = "springdfHighTopic")  TopicExchange springdfHighTopic() {    return new TopicExchange(springdfHighTopic);  }  @Bean  Binding springHighdf(Queue springdfHighQueue, TopicExchange springdfHighTopic) {    return BindingBuilder.bind(springdfHighQueue).to(springdfHighTopic).with(springdfHighRouteKey);  }  @Bean(autowire = Autowire.BY_NAME, value = "springdfLowBufferedTopic")  TopicExchange springdfLowBufferedTopic() {    return new TopicExchange(springdfLowBufferedTopic);  }  @Bean  Binding springLowBuffereddf(Queue springdfLowBufferedQueue, TopicExchange springdfLowBufferedTopic) {    return BindingBuilder.bind(springdfLowBufferedQueue).to(springdfLowBufferedTopic).with(springdfLowBufferedRouteKey);  }  @Bean(autowire = Autowire.BY_NAME, value = "springdfLowTopic")  TopicExchange springdfLowTopic() {    return new TopicExchange(springdfLowTopic);  }  @Bean  Binding springLowdf(Queue springdfLowQueue, TopicExchange springdfLowTopic) {    return BindingBuilder.bind(springdfLowQueue).to(springdfLowTopic).with(springdfLowRouteKey);  }  @Bean  SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,                       MessageListenerAdapter listenerAdapter) {    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();    container.setConnectionFactory(connectionFactory);    container.setQueueNames(springdfHighQueue, springdfLowQueue);    container.setMessageListener(listenerAdapter);    return container;  }  @Bean  MessageListenerAdapter listenerAdapter(IntegrationReceiver receiver) {    MessageListenerAdapter adapter = new MessageListenerAdapter(receiver);    adapter.setDefaultListenerMethod("receive");    Map<String, String> queueOrTagToMethodName = new HashMap<>();    queueOrTagToMethodName.put(springdfHighQueue, "springdfHighReceive");    queueOrTagToMethodName.put(springdfLowQueue, "springdfLowReceive");    adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);    return adapter;  }  private Queue createBufferedQueue(String queueName, String topic, String routeKey, int bufferedTime) {    Map<String, Object> args = new HashMap<>();    args.put("x-dead-letter-exchange", topic);    args.put("x-dead-letter-routing-key", routeKey);    args.put("x-message-ttl", bufferedTime);    // 是否持久化    boolean durable = true;    // 仅创建者可以使用的私有队列,断开后自动删除    boolean exclusive = false;    // 当所有消费客户端连接断开后,是否自动删除队列    boolean autoDelete = false;    return new Queue(queueName, durable, exclusive, autoDelete, args);  }}

消费者配置

package com.df.ps.mq;import com.fasterxml.jackson.databind.ObjectMapper;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import java.util.Map;public class MqReceiver {  private static Logger logger = LoggerFactory.getLogger(MqReceiver.class);  @Value("${high-retry:5}")  private int highRetry;  @Value("${low-retry:5}")  private int lowRetry;  @Value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}")  private String springdfHighBufferedTopic;  @Value("${spring.df.high-buffered.route:spring-df-high-buffered-route}")  private String springdfHighBufferedRouteKey;  @Value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}")  private String springdfLowBufferedTopic;  @Value("${spring.df.low-buffered.route:spring-df-low-buffered-route}")  private String springdfLowBufferedRouteKey;  private final RabbitTemplate rabbitTemplate;  @Autowired  public MqReceiver(RabbitTemplate rabbitTemplate) {    this.rabbitTemplate = rabbitTemplate;  }  public void receive(Object message) {    if (logger.isInfoEnabled()) {      logger.info("default receiver: " + message);    }  }  /**   * 消息从初始队列进入5分钟的高速缓冲队列   * @param message   */  public void highReceiver(Object message){    ObjectMapper mapper = new ObjectMapper();    Map msg = mapper.convertValue(message, Map.class);    try{      logger.info("这里做消息处理...");    }catch (Exception e){      int times = msg.get("times") == null ? 0 : (int) msg.get("times");      if (times < highRetry) {        msg.put("times", times + 1);        rabbitTemplate.convertAndSend(springdfHighBufferedTopic,springdfHighBufferedRouteKey,message);      } else {        msg.put("times", 0);        rabbitTemplate.convertAndSend(springdfLowBufferedTopic,springdfLowBufferedRouteKey,message);      }    }  }  /**   * 消息从5分钟缓冲队列进入2小时缓冲队列   * @param message   */  public void lowReceiver(Object message){    ObjectMapper mapper = new ObjectMapper();    Map msg = mapper.convertValue(message, Map.class);        try {      logger.info("这里做消息处理...");    }catch (Exception e){      int times = msg.get("times") == null ? 0 : (int) msg.get("times");      if (times < lowRetry) {        rabbitTemplate.convertAndSend(springdfLowBufferedTopic,springdfLowBufferedRouteKey,message);      }else{        logger.info("消息无法被消费...");      }    }   }}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持武林网。

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表