首页 > 编程 > Java > 正文

rabbitmq结合spring实现消息队列优先级的方法

2019-11-26 13:03:19
字体:
来源:转载
供稿:网友

 1.1项目背景:做一个灾情预警的消息平台,灾情检查系统需要向消息平台里面推送消息,这里是典型的异构系统的消息传递,我们需要选择一个中间件作为消息队列,调研分析了rabbitmq,zeromq,activemq,kafka等消息中间件,综合性能,安全,可持久化等角度果断选择了rabbitmq作为我们的消息中间件 (其实这里是因为rabbitmq 是spring官方支持的,开发起来方便)。需求上我们有多种类型的消息,这里有紧急推送的和一般的等区分,高并发时,就会有对消息进行优先推送的情况出现,于是rabbitmq消息队优先级的推送功能是我们需要解决的首个技术点.

1.2技术调研:这里一个概念需要说明,为什么说是消息队列的优先级而不是消息的优先级,来看下消息队列的工作原理

生产者生成消息打到交换机里面(如果没有声明交换机,会打到default exchange里面),交换机绑定一个或多个队列,消息进入队列里面,消费者一直在监听队列,发现队列里面有消息就开始消费,这里就是一个消息传递的过程,queue是一个栈队列,栈是先进先出的,就是说消息来了依次排队,一个队列并不能实现消息的插队和优先推送的功能。但是如果说我们的多个队列有不同的优先级,不同优先级的消息通过roatingkey进入不同的队列,优先级高的队列消息被优先消费,这样也能形成一个相对意义上的优先级,所以说这里不是消息的优先级而是队列的优先级.

1.2.1 为什么说是相对意义上的优先级

有并发才有优先级,如果每个消息都能被瞬间处理也不会有消息优先推送的需求,那我们看看消息会在哪里阻塞

1,queue,很明显高并发的时候队列里面是会存在很多消息的,2,eschange ,高并发的时候producer发送给exchange的时候也会产生阻塞。

第一种情况由于我们队列已经定义优先级了,所以进入队列的消息都是同种优先级别的,并不需要插队。而对于第二种情况,消息在exchange时阻塞时并不能实现消息优先进入队列,依然是一个依次处理的情景,但是由于exchang到queue的处理速度极快,所有我们忽略了这块的优先级。

1.2.3 代码实现

在rabbitmq3.5版本之前,官方并没有实现队列优先级的功能,但论坛里面有一些插件可以实现(末尾附链接),这里我们主要说3.5版本之后的实现

1.2.3.1 Java代码

Connectionconn =RabbitMQConnectionUtil.getRabbitmqConnection();//创建连接     Channelchannel = conn.createChannel();//创建channel     Map<String,Object> arg = newHashMap<String, Object>();      arg.put("x-max-priority",10); //队列的属性参数 有10个优先级别     // 声明(创建)队列     //channel.queueDeclare(QUEUE_NAME, false, false, false, null);     channel.queueDeclare(QUEUE_NAME,true,false, false, arg);     // 消息内容     String message ="Hello World!";     channel.basicPublish("",QUEUE_NAME, null, message.getBytes());     BasicPropertiesprop =new BasicProperties(null, null, null, null, 1,     null, null, null, null, null, null, null, null,null);//消息的参数,声明该消息的优先级是1     channel.basicPublish("",QUEUE_NAME, prop, message.getBytes()); //消息发布     System.out.println("[x] Sent '" + message + "'");     //关闭通道和连接     channel.close();     conn.close(); 

客户端看下结果:

1.2.3.2结合spring实现: %20

1.2.3.2.1%20xml配置: %20

<beans%20xmlns="http://www.springframework.org/schema/beans"%20%20%20xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"%20%20%20xmlns:rabbit="http://www.springframework.org/schema/rabbit"%20%20%20%20xsi:schemaLocation="http://www.springframework.org/schema/beans%20%20%20%20http://www.springframework.org/schema/beans/spring-beans-3.0.xsd%20%20%20%20http://www.springframework.org/schema/rabbit%20%20%20http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"%20>%20%20%20<description>rabbitmq%20连接服务配置</description>%20%20%20<!--%20连接配置%20-->%20%20%20<rabbit:connection-factory%20id="connectionFactory"%20host="${rabbit.ip}"%20username="${rabbit.username}"%20%20%20%20password="${rabbit.password}"%20port="${rabbit.port}"%20virtual-host="${rabbit.vhost}"/>%20%20%20<rabbit:admin%20connection-factory="connectionFactory"/>%20%20%20<rabbit:template%20id="amqpTemplate"%20connection-factory="connectionFactory"%20/>%20%20%20<!--%20spring%20template声明-->%20%20%20<!--%20声明一个队列%20-->%20%20%20<rabbit:queue%20id="test_queue_key"%20name="test_queue_key"%20durable="true"%20auto-delete="false"%20exclusive="false">%20%20%20%20<rabbit:queue-arguments>%20%20%20%20%20<entry%20key="x-max-priority">%20%20%20%20%20%20%20<value%20type="java.lang.Integer">10</value>//这个地方一定是integer的,别的不好使!!%20%20%20%20%20</entry>%20%20%20%20</rabbit:queue-arguments>%20%20%20</rabbit:queue>%20%20%20<!--%20监听配置queues:监听的队列,多个的话用逗号(,)分隔%20ref:监听器-->%20%20%20<rabbit:listener-container%20connection-factory="connectionFactory"%20acknowledge="auto">%20%20%20%20%20<rabbit:listener%20queue-names="test_queue_key"%20ref="queueListenter"%20method="onMessage"/>%20%20%20</rabbit:listener-container>%20<bean%20id="queueListenter"%20class="com.DF.spring.springAMQP.QueueListener"%20/>%20

1.2.3.2.2代码部分:

producter:

AbstractApplicationContext%20ctx%20=%20new%20%20ClassPathXmlApplicationContext("classpath:/spring/rabbitmq-contextDemo2.xml");%20RabbitTemplate%20amqpTemplate%20=%20ctx.getBean(RabbitTemplate.class);%20%20Random%20random%20=%20new%20Random();%20%20%20for%20(int%20i=0;%20i<%201000;%20i++){%20%20%20%20%20final%20int%20priority%20=%20random.nextInt(10%20-%201%20+%201)%20+%201;//随机的优先级%20%20%20%20%20amqpTemplate.convertAndSend("test_queue_key",%20(Object)("hello%20world"),%20new%20MessagePostProcessor()%20{%20%20%20%20%20%20@Override%20%20%20%20%20%20public%20Message%20postProcessMessage(Message%20message)%20throws%20AmqpException%20{%20%20%20%20%20%20%20message.getMessageProperties().setPriority(priority);%20%20%20%20%20%20%20return%20message;%20%20%20%20%20%20}%20%20%20%20%20});%20%20%20}%20

customer: %20

public%20class%20QueueListener%20implements%20MessageListener{%20%20@Override%20%20public%20void%20onMessage(Message%20message)%20{%20%20%20try{%20%20%20%20System.out.print("[x]%20接收到的消息:"+new%20String(message.getBody(),"utf-8")+"&&&"+"优先级"+message.getMessageProperties().getPrority());%20%20%20%20Thread.sleep(1000);%20%20%20}catch(Exception%20e){%20%20%20%20e.printStackTrace();%20%20%20}%20%20}%20}%20

从客户端看下队列里面的消息:

我们发送随机优先级的消息进入队列,看看消费端打印出来的消息:

到这里,rabbitmq结合spring的demo功能实现......

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持武林网。

上一篇:spring mvc实现文件上传并携带其他参数的示例

下一篇:java 分布式与集群的区别和联系

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表
学习交流
热门图片
猜你喜欢的新闻
猜你喜欢的关注

新闻热点

疑难解答

图片精选

网友关注