#命名规范:容器名称.[队列特点or路由特点].使用的平台名称.作用#@容器名称:queue、exchange#@队列特点:非持久化标记(undurable)、延时队列(delay)、优先级队列(priority)#@路由特点:direct、topic、fanout、headers#@使用的平台名称:xiangshang、xiangqian……#@作用:干什么的#eg:消息队列(queue.xiangshang.message)、延时消息队列(queue.delay.xiangshang.message)、普通路由(exchange.direct.xiangshang.common)、通用路由(exchange.direct.xiangshang.common)
依赖的jar
<properties> <spring.amqp.version>1.6.6.RELEASE</spring.amqp.version></properties><dependencies> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-amqp</artifactId> <version>${spring.amqp.version}</version> <exclusions> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>${spring.amqp.version}</version> <exclusions> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> </exclusion> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-messaging</artifactId> </exclusion> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-tx</artifactId> </exclusion> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> </exclusion> </exclusions> </dependency></dependencies> |
连接配置(rabbitmq-config.properties)
#RabbitMQ服务器工厂参数设置rmq.addresses=10.200.0.150:5672rmq.username=rootrmq.passWord=123456#命名规范:容器名称.[队列特点or路由特点].使用的平台名称.作用#@容器名称:queue、exchange#@队列特点:非持久化标记(undurable)、延时队列(delay)、优先级队列(priority)#@路由特点:direct、topic、fanout、headers#@使用的平台名称:xiangshang、xiangqian……#@作用:干什么的#eg:消息队列(queue.xiangshang.message)、延时消息队列(queue.delay.xiangshang.message)、普通路由(exchange.direct.xiangshang.common)、通用路由(exchange.direct.xiangshang.common)rmq.queue.xiangshang.test=queue.xiangshang.testrmq.queue.undurable.xiangshang.test=queue.undurable.xiangshang.testrmq.queue.priority.xiangshang.test=queue.priority.xiangshang.testrmq.queue.delay.xiangshang.test=queue.delay.xiangshang.testrmq.exchange.direct.xiangshang.test=exchange.direct.xiangshang.testrmq.exchange.fanout.xiangshang.test=exchange.fanout.xiangshang.testrmq.exchange.headers.xiangshang.test=exchange.headers.xiangshang.test |
xml配置(rabbitConfiguration.xml)
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="<a href="http://www.springframework.org/schema/beans" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.springframework.org/schema/beans" xmlns:xsi="<a href="http://www.w3.org/2001/XMLSchema-instance" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="<a href="http://www.springframework.org/schema/rabbit" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.springframework.org/schema/rabbit"xmlns:context="<a href="http://www.springframework.org/schema/context" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd "> <description>rabbitMQ连接服务配置</description> <context:property-placeholder location="classpath:rabbitmq-config.properties"/> <!-- 连接配置 --> <rabbit:connection-factory id="connectionFactory" addresses="${rmq.addresses}" username="${rmq.username}" password="${rmq.password}" /> <rabbit:admin connection-factory="connectionFactory" /> <!-- 消息转换器 --> <bean id="gsonConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> <!-- 连接模板 --> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" message-converter="gsonConverter" exchange="${rmq.exchange.direct.xiangshang.test}" /> <!-- 消息队列 begin --> <rabbit:queue name="${rmq.queue.xiangshang.test}" durable="true" /> <rabbit:queue name="${rmq.queue.undurable.xiangshang.test}" durable="false" /> <rabbit:queue name="${rmq.queue.priority.xiangshang.test}" durable="false"> <rabbit:queue-arguments> <entry key="x-max-priority"> <value type="java.lang.Integer">10</value> </entry> </rabbit:queue-arguments> </rabbit:queue> <rabbit:queue name="${rmq.queue.delay.xiangshang.test}"> <rabbit:queue-arguments> <entry key="x-message-ttl"> <value type="java.lang.Long">60000</value> </entry> <entry key="x-dead-letter-exchange" value="${rmq.exchange.direct.xiangshang.test}"/> <entry key="x-dead-letter-routing-key" value="${rmq.queue.xiangshang.test}"/> </rabbit:queue-arguments> </rabbit:queue> <!-- 消息队列 end --> <!-- 消息路由 begin --> <rabbit:direct-exchange name="${rmq.exchange.direct.xiangshang.test}"> <rabbit:bindings> <rabbit:binding queue="${rmq.queue.xiangshang.test}" key="${rmq.queue.xiangshang.test}" /> <rabbit:binding queue="${rmq.queue.undurable.xiangshang.test}" key="${rmq.queue.undurable.xiangshang.test}" /> <rabbit:binding queue="${rmq.queue.priority.xiangshang.test}" key="${rmq.queue.priority.xiangshang.test}" /> <rabbit:binding queue="${rmq.queue.delay.xiangshang.test}" key="${rmq.queue.delay.xiangshang.test}" /> </rabbit:bindings> </rabbit:direct-exchange> <!-- 发布订阅 --> <rabbit:fanout-exchange name="${rmq.exchange.fanout.xiangshang.test}"> <rabbit:bindings> <rabbit:binding queue="${rmq.queue.xiangshang.test}" /> <rabbit:binding queue="${rmq.queue.undurable.xiangshang.test}" /> </rabbit:bindings> </rabbit:fanout-exchange> <!-- 路由转发 --> <rabbit:headers-exchange name="${rmq.exchange.headers.xiangshang.test}"> <rabbit:bindings> <rabbit:binding queue="${rmq.queue.delay.xiangshang.test}"> <rabbit:binding-arguments> <entry key="x-match" value="all"/> <entry key="Operator" value="xsjf"/> <entry key="sex" value="male"/> </rabbit:binding-arguments> </rabbit:binding> </rabbit:bindings> </rabbit:headers-exchange> <!-- 消息路由 end --> <!-- 监听器 begin --> <!-- 消息接收:DirectListener.java继承MessageListener接口实现onMessage方法即可,concurrency表示并发处理的监听数量 --> <bean id="directListener" class="com.xiangshang.mq.listener.DirectListener" /> <rabbit:listener-container connection-factory="connectionFactory" message-converter="gsonConverter" concurrency="1" acknowledge="manual"> <rabbit:listener queues="${rmq.queue.xiangshang.test},${rmq.queue.undurable.xiangshang.test}" ref="directListener" /> </rabbit:listener-container> <!-- 监听器 end --></beans> |
客户端发送(DirectQueueTest.java示例)
package com.xiangshang.mq.test;import java.util.Random;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageDeliveryMode;import org.springframework.amqp.core.MessageProperties;import org.springframework.beans.BeansException;import org.springframework.context.ConfigurableapplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class DirectQueueTest { private static String RMQ_QUEUE_XIANGSHANG_TEST; private static String RMQ_QUEUE_UNDURABLE_XIANGSHANG_TEST; private static String RMQ_QUEUE_PRIORITY_XIANGSHANG_TEST; private static String RMQ_QUEUE_DELAY_XIANGSHANG_TEST; private static String RMQ_EXCHANGE_DIRECT_XIANGSHANG_TEST; private static String RMQ_EXCHANGE_FANOUT_XIANGSHANG_TEST; private static String RMQ_EXCHANGE_HEADERS_XIANGSHANG_TEST; private static ConfigurableApplicationContext context = null; private static AmqpTemplate rabbitTemplate = null; static { RMQ_QUEUE_XIANGSHANG_TEST = "queue.xiangshang.test"; RMQ_QUEUE_UNDURABLE_XIANGSHANG_TEST = "queue.undurable.xiangshang.test"; RMQ_QUEUE_PRIORITY_XIANGSHANG_TEST = "queue.priority.xiangshang.test"; RMQ_QUEUE_DELAY_XIANGSHANG_TEST = "queue.delay.xiangshang.test"; RMQ_EXCHANGE_DIRECT_XIANGSHANG_TEST = "exchange.direct.xiangshang.test"; RMQ_EXCHANGE_FANOUT_XIANGSHANG_TEST = "exchange.fanout.xiangshang.test"; RMQ_EXCHANGE_HEADERS_XIANGSHANG_TEST = "exchange.headers.xiangshang.test"; try { context = new ClassPathXmlApplicationContext("rabbitConfiguration.xml"); } catch (BeansException e) { e.printStackTrace(); } rabbitTemplate = context.getBean("rabbitTemplate", AmqpTemplate.class); } public static void main(String[] args) { demo1(); } /** * 对队列发送消息 */ public static void demo1() { String sendMsg = "队列消息." + String.valueOf(new Random().nextDouble()); /* * 默认路由已配置为exchange.direct.xiangshang.test * 发送普通消息:rabbit服务重启后,数据丢失 */ MessageProperties mp1 = new MessageProperties(); mp1.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); Message msg1 = new Message(sendMsg.getBytes(), mp1); rabbitTemplate.convertAndSend(RMQ_QUEUE_XIANGSHANG_TEST, msg1); rabbitTemplate.convertAndSend(RMQ_EXCHANGE_DIRECT_XIANGSHANG_TEST, RMQ_QUEUE_XIANGSHANG_TEST, msg1);// 效果同上 rabbitTemplate.convertAndSend(RMQ_QUEUE_UNDURABLE_XIANGSHANG_TEST, msg1); /* * 发送持久化消息:rabbit服务重启后,持久化队列数据可恢复(默认情况是持久化的) */ rabbitTemplate.convertAndSend(RMQ_QUEUE_XIANGSHANG_TEST, sendMsg); rabbitTemplate.convertAndSend(RMQ_QUEUE_UNDURABLE_XIANGSHANG_TEST, sendMsg); } /** * 发送广播:此时广播下所有队列均可接收到数据 */ public static void demo2() { String sendMsg = "广播消息." + String.valueOf(new Random().nextDouble()); rabbitTemplate.convertAndSend(RMQ_EXCHANGE_FANOUT_XIANGSHANG_TEST, RMQ_QUEUE_UNDURABLE_XIANGSHANG_TEST, sendMsg); } /** * 优先级队列 */ public static void demo3() { for (int i = 0; i < 10; i++) { int priority = new Random().nextInt(10); String sendMsg = "队列消息." + String.valueOf(new Random().nextDouble()) + "[" + priority + "]"; System.out.println(sendMsg); MessageProperties mp = new MessageProperties(); mp.setPriority(priority); Message msg = new Message(sendMsg.getBytes(), mp); rabbitTemplate.convertAndSend(RMQ_QUEUE_PRIORITY_XIANGSHANG_TEST, msg); } } /** * 延迟消息队列 * 1.当前rabbitMQ没有自动排序的队列,它所支持的队列是无序的 * 2.队列遵守先进先出,因此它执行的顺序是确认第一条数据过期时间为基准,如果第二条过期时间在第一条之前,第二条不会在第一条之前执行 * eg:队列数据过期时间依次为 30s后、10秒后、20秒后,此时必须等到30秒后队列才出数据 * 3.因此,延迟消息功能需要规范以下几点: * 3.1 每个延迟业务申明独有的路由、队列,并标注业务使用说明 * 3.2 延迟队列最好设置消息的最大过期时间,到期后要转发的路由和队列 * 3.3 header采用全部匹配,路由中需匹配与生产端header相同 */ public static void demo4() { for (int i = 0; i < 3; i++) { int delay = (i + 1) * 2 * 1000; String sendMsg = "队列消息." + String.valueOf(new Random().nextDouble()) + "[" + delay + "]"; System.out.println(sendMsg); MessageProperties mp = new MessageProperties(); mp.setExpiration(String.valueOf(delay));// 设置消息过期时间 mp.setHeader("operator", "xsjf"); mp.setHeader("sex", "male"); Message msg = new Message(sendMsg.getBytes(), mp); rabbitTemplate.convertAndSend(RMQ_EXCHANGE_HEADERS_XIANGSHANG_TEST, "", msg); } }} |
消息监听(DirectListener.java示例)
package com.xiangshang.mq.listener;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;import com.rabbitmq.client.Channel; public class DirectListener implements ChannelAwareMessageListener { /* (non-Javadoc) * @see org.springframework.amqp.core.MessageListener#onMessage(org.springframework.amqp.core.Message) */ @Override public void onMessage(Message message, Channel channel) throws Exception { System.out.println("DirectListener 获取的消息内容:" + new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);// 返回成功ack // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);// 返回失败ack,并且数据重新入队 Thread.currentThread().sleep(1000); }} |
新闻热点
疑难解答