首页 > 编程 > Java > 正文

java远程连接调用Rabbitmq的实例代码

2019-11-26 11:50:33
字体:
来源:转载
供稿:网友

本文介绍了java远程连接调用Rabbitmq,分享给大家,希望此文章对各位有所帮助。

打开IDEA创建一个maven工程(Java就可以了)。

这里写图片描述 

pom.xml文件如下

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.zhenqi</groupId> <artifactId>rabbitmq-study</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>rabbitmq-study</name> <url>http://maven.apache.org</url> <properties>  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies>  <dependency>   <groupId>junit</groupId>   <artifactId>junit</artifactId>   <version>4.12</version>   <scope>test</scope>  </dependency>  <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->  <dependency>   <groupId>com.rabbitmq</groupId>   <artifactId>amqp-client</artifactId>   <version>4.1.0</version>   <exclusions>    <exclusion>     <groupId>org.slf4j</groupId>     <artifactId>slf4j-api</artifactId>    </exclusion>   </exclusions>  </dependency>  <dependency>   <groupId>org.slf4j</groupId>   <artifactId>slf4j-log4j12</artifactId>   <version>1.7.21</version>  </dependency>  <dependency>   <groupId>commons-lang</groupId>   <artifactId>commons-lang</artifactId>   <version>2.6</version>  </dependency> </dependencies></project>

为了能远程访问rabbitmq,则需要编辑 /etc/rabbitmq/rabbitmq.conf,添加以下内容。

[  {rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["asdf"]}]} ]

添加administrator角色

rabbitmqctl set_user_tags openstack administrator

创建抽象队列 EndPoint.java

package com.zhenqi;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;/** * Created by wuming on 2017/7/16. */public abstract class EndPoint {  protected Channel channel;  protected Connection connection;  protected String endPointName;  public EndPoint(String endpointName) throws Exception {    this.endPointName = endpointName;    //创建一个连接工厂 connection factory    ConnectionFactory factory = new ConnectionFactory();    //设置rabbitmq-server服务IP地址    factory.setHost("192.168.146.128");    factory.setUsername("openstack");    factory.setPassword("rabbitmq");    factory.setPort(5672);    factory.setVirtualHost("/");    //得到 连接    connection = factory.newConnection();    //创建 channel实例    channel = connection.createChannel();    channel.queueDeclare(endpointName, false, false, false, null);  }  /**   * 关闭channel和connection。并非必须,因为隐含是自动调用的。   * @throws IOException   */  public void close() throws Exception{    this.channel.close();    this.connection.close();  }}

生产者Producer.java

生产者类的任务是向队列里写一条消息

package com.zhenqi;import org.apache.commons.lang.SerializationUtils;import java.io.Serializable;/** * Created by wuming on 2017/7/16. */public class Producer extends EndPoint {  public Producer(String endpointName) throws Exception {    super(endpointName);  }  public void sendMessage(Serializable object) throws Exception {    channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));  }}

消费者QueueConsumer.java

消费者可以以线程方式运行,对于不同的事件有不同的回调函数,其中最主要的是处理新消息到来的事件。

package com.zhenqi;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.Envelope;import com.rabbitmq.client.ShutdownSignalException;import org.apache.commons.lang.SerializationUtils;import org.apache.log4j.Logger;import java.io.IOException;import java.util.HashMap;import java.util.Map;/** * Created by wuming on 2017/7/16. */public class QueueConsumer extends EndPoint implements Runnable, Consumer {  private Logger LOG=Logger.getLogger(QueueConsumer.class);  public QueueConsumer(String endpointName) throws Exception {    super(endpointName);  }  public void handleConsumeOk(String s) {  }  public void handleCancelOk(String s) {  }  public void handleCancel(String s) throws IOException {  }  public void handleShutdownSignal(String s, ShutdownSignalException e) {  }  public void handleRecoverOk(String s) {    LOG.info("Consumer "+s +" registered");  }  public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {    Map map = (HashMap) SerializationUtils.deserialize(bytes);    LOG.info("Message Number "+ map.get("message number") + " received.");  }  public void run() {    try{      channel.basicConsume(endPointName, true,this);    }catch(IOException e){      e.printStackTrace();    }  }}

 测试

运行一个消费者线程,然后开始产生大量的消息,这些消息会被消费者取走

package com.zhenqi;import java.util.HashMap;/** * Created by wuming on 2017/7/16. */public class TestRabbitmq {  public static void main(String[] args){    try{      QueueConsumer consumer = new QueueConsumer("queue");      Thread consumerThread = new Thread(consumer);      consumerThread.start();      Producer producer = new Producer("queue");      for (int i = 0; i < 100000; i++){        HashMap message = new HashMap();        message.put("message number", i);        producer.sendMessage(message);        System.out.println("Message Number "+ i +" sent.");      }    }catch(Exception e){      e.printStackTrace();    }  }}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持武林网。

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表