首页 > 学院 > 开发设计 > 正文

RabbitMQ

2019-11-14 21:54:48
字体:
来源:转载
供稿:网友
RabbitMQ - topic

在publish/subscribe模式中使用fanout类型有个缺陷,就是不能选择性接收的消息。我们可以让consumer获得所有已发布的消息中指定的几个消息。

在之前的例子中我们这样绑定exchange和队列:

channel.queueBind(queueName, EXCHANGE_NAME, "");

暂且不论该代码中绑定的exchange类型,这里空着的参数就是routing key。routing key的意义与exchange类型有关,比如使用fanout类型就会忽略掉routing key。

而解决这一问题的就是direct类型。direct exchange并不复杂,只不过是PRoducer和consumer双方的exchange对应时还需要对应routing key。

以下代码中,同一个exchange和两个队列进行绑定,两个队列分别和不同的binding key绑定。(PS:当然,我们也可以将同一个routing key绑定给不同的队列也没有问题。)另外,SERVERITY变量是rounting数组,假设将日志通过exchange发送出去,consumer根据自己的需要获取不同级别的日志:

final class ChannelFactory_{    private final static ConnectionFactory connFactory = new ConnectionFactory();     public final static String EXCHANGE_NAME = "direct_exchange";    public final static String[] SEVERITY = {"info","warning","error"};     static {        Channel temp = getChannel();        try {            temp.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT);        } catch (IOException e) {            e.printStackTrace();        }    }     public static Channel getChannel(int channelNumber){        try {            Connection connection = connFactory.newConnection();            return connection.createChannel(channelNumber);        } catch (IOException e) {            e.printStackTrace();        }return null;    }     public static Channel getChannel(){        try {            Connection connection = connFactory.newConnection();            return connection.createChannel();        } catch (IOException e) {            e.printStackTrace();        }return null;    }     public static void  closeChannel(Channel channel) throws IOException {        channel.close();        channel.getConnection().close();    } }

确认定义:

consumer只需要warning和error级别(routing)的日志消息:

public static void main(String[] args) throws IOException, InterruptedException {        Channel channel = ChannelFactory_.getChannel();         String queueName = channel.queueDeclare().getQueue();        channel.queueBind(queueName, ChannelFactory_.EXCHANGE_NAME,"warning");        channel.queueBind(queueName, ChannelFactory_.EXCHANGE_NAME,"error");         QueueingConsumer consumer = new QueueingConsumer(channel);        channel.basicConsume(queueName,true,consumer);        while(true){            QueueingConsumer.Delivery delivery = consumer.nextDelivery();            String message = new String(delivery.getBody());            String routingKey = delivery.getEnvelope().getRoutingKey();             System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");        }     }

producer将所有级别的日志都发送出去:

public static void main(String[] args) throws IOException {        Channel channel = ChannelFactory_.getChannel();        String content = "message "+new Date();         for (int i = 0; i <ChannelFactory_.SEVERITY.length ; i++) {            channel.basicPublish(EXCHANGE_NAME,ChannelFactory_.SEVERITY[i],null,content.getBytes());        }        ChannelFactory_.closeChannel(channel);    }

运行结果:

direct exchange可以让我们有选择性地接受消息。但这样做仍然有缺陷。虽然我可以只要求error和warning级别的日志,但是我不能再进行细分。比如我只想要数据库相关的error和warning级别的日志。

为了实现这一点,我们需要使用另一个exchange类型——Topic。exchange类型为topic时,routing key是一组用"."隔开的词,但仅限255bytes。比如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"

topic和direct的不同点还有在consumer中定义routing key时我们可以使用通配符,比如:符号'*':可以匹配某一个词。符号'#':可以匹配0~N个词。

举个例子说明,假设我们用rounting key描述一个动物。格式为: <性格>.<颜色>.<种类>用符号'*',我想要得到桔***的动物,即:"*.orange.*"用符号'#',我想要得到懒散的动物,即:"lazy.#"如果使用过程中有人破坏了格式,即使rounting key为"lazy.orange.male.rabbit"也可以匹配"lazy.#"。

稍微修改上面的代码,首先定义一个topic exchange。

public  final static String EXCHANGE_NAME = "topic_exchange";

temp.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.TOPIC);

确认定义:

发送sql相关的log:

public static void main(String[] args) throws IOException {        Channel channel = ChannelFactory_.getChannel();        String content = "message #$#$#$#$#$#$";         channel.basicPublish(EXCHANGE_NAME,"warning.sql.connection.close",null,content.getBytes());        channel.basicPublish(EXCHANGE_NAME,"error.sql.syntax",null,content.getBytes());         ChannelFactory_.closeChannel(channel);    }

consumer接收所有sql相关的warning和所有error:

public static void main(String[] args) throws IOException, InterruptedException {        Channel channel = ChannelFactory_.getChannel();         String queueName = channel.queueDeclare().getQueue();        channel.queueBind(queueName, ChannelFactory_.EXCHANGE_NAME,"warning.sql.#");        channel.queueBind(queueName, ChannelFactory_.EXCHANGE_NAME,"error.#");         QueueingConsumer consumer = new QueueingConsumer(channel);        channel.basicConsume(queueName,true,consumer);        while(true){            QueueingConsumer.Delivery delivery = consumer.nextDelivery();            String message = new String(delivery.getBody());            String routingKey = delivery.getEnvelope().getRoutingKey();             System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");        }     }

运行结果:


上一篇:RabbitMQ

下一篇:Java异常(1)

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表