jms即java Message Service,是面向消息中间件的API,用于在两个应用程序之间、分布式系统中发送消息,进行异步通信。下图是jms点-点和发布-订阅两种形式的原理(百度一大堆资料)。
下面的程序以发布-订阅为例为例:
1).准备环境:linux + eclipse + jdk1.7 + maven + ActiveMQ-5.12.0
2).安装ActiveMQ,jms是原生,而ActiveMQ是jms的容器,方便使用 <1>下载ActiveMQ(这里以5.12.0为例) 官网:http://activemq.apache.org/
<2>安装ActiveMQ 将apache-activemq-5.12.0-bin.tar.gz解压在 /opt 目录下:
tar -zxvf apache-activemq-5.12.0-bin.tar.gz<3>启动ActiveMQ 进入解压后的apache-activemq-5.12.0/bin目录,运行启动脚本:
./activemq start然后在浏览器输入:http://localhost:8161/admin/ 输入用户:admin 密码:admin, 如果出现下图, 则表示安装ActiveMQ成功
3).在eclipse新建一个maven项目, 项目结构如下:
4).pom 文件:
<PRoject xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hsp.jms</groupId> <artifactId>JmsDemo</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>JmsDemo</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.apache.geronimo.specs</groupId> <artifactId>geronimo-jms_1.1_spec</artifactId> <version>1.1</version> </dependency> <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-jms-client</artifactId> <version>0.3.0</version> </dependency> <!-- <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-broker</artifactId> <version>5.12.0</version> </dependency> --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.1</version> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> <!-- include all the dependencies into the jar so it's easier to execute the example --> <plugin> <groupId>org.fusesource.mvnplugins</groupId> <artifactId>maven-uberize-plugin</artifactId> <version>1.14</version> <executions> <execution> <phase>package</phase> <goals><goal>uberize</goal></goals> </execution> </executions> </plugin> </plugins> </build></project>5).Publisher.java:
package com.hsp.jms.JmsDemo;import org.apache.qpid.jms.*;import javax.jms.*;public class Publisher { public static void main(String[] args) throws Exception { //topic前缀 final String TOPIC_PREFIX = "topic://"; //发布者的用户名 String user = env("ACTIVEMQ_USER", "admin"); //发布者的密码 String passWord = env("ACTIVEMQ_PASSWORD", "password"); //发布者的主机 String host = env("ACTIVEMQ_HOST", "localhost"); //发布者的端口号 int port = Integer.parseInt(env("ACTIVEMQ_PORT", "5672")); //发布者向外提供的链接 String connectionURI = "amqp://" + host + ":" + port; //发布者的destinationde名称 String destinationName = arg(args, 0, "topic://event"); int messages = 10; int size = 256; //创建连接工厂 JmsConnectionFactory factory = new JmsConnectionFactory(connectionURI); //jms连接工厂创建一个连接 Connection connection = factory.createConnection(user, password); //开放连接 connection.start(); //根据连接创建一个会话 session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //发布者的目的的 Destination destination = null; if(destinationName.startsWith(TOPIC_PREFIX)) { destination = session.createTopic(destinationName.substring(TOPIC_PREFIX.length())); } else { destination = session.createQueue(destinationName); } //创建发布者 MessageProducer producer = session.createProducer(destination); //设置发布的消息为非持久态 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //发布消息 for(int i = 1; i <= messages; i++) { TextMessage msg = session.createTextMessage("#" + i); msg.setIntProperty("id", i); producer.send(msg); System.out.println(String.format("发送第 %d 条信息: ", i) + msg.getText()); } producer.send(session.createTextMessage("SHUTDOWN")); Thread.sleep(1000 * 3); //关闭连接 connection.close(); //退出程序 System.exit(0); } //获取环境变量类 private static String env(String key, String defaultValue) { String rc = System.getenv(key); if(rc == null) return defaultValue; return rc; } private static String arg(String[] args, int index, String defaultValue) { if(index < args.length) return args[index]; else return defaultValue; }}6).Listener1.java:
package com.hsp.jms.JmsDemo;import javax.jms.*;import org.apache.qpid.jms.*;public class Listener1 { public static void main(String[] args) throws Exception { //topic前缀 final String TOPIC_PREFIX = "topic://"; //发布者的用户名 String user = env("ACTIVEMQ_USER", "admin"); //发布者的密码 String password = env("ACTIVEMQ_PASSWORD", "password"); //发布者的主机 String host = env("ACTIVEMQ_HOST", "localhost"); //发布者的端口号 int port = Integer.parseInt(env("ACTIVEMQ_PORT", "5672")); //发布者向外提供的链接 String connectionURI = "amqp://" + host + ":" + port; //发布者的destinationde名称 String destinationName = arg(args, 0, "topic://event"); //创建连接工厂 JmsConnectionFactory factory = new JmsConnectionFactory(connectionURI); //jms连接工厂创建一个连接 Connection connection = factory.createConnection(user, password); //开放连接 connection.start(); //根据连接创建一个会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //发布者的目的的 Destination destination = null; if(destinationName.startsWith(TOPIC_PREFIX)) { destination = session.createTopic(destinationName.substring(TOPIC_PREFIX.length())); } else { destination = session.createQueue(destinationName); } MessageConsumer consumer = session.createConsumer(destination); long start = System.currentTimeMillis(); long count = 1; System.out.println("Listener1 等待消息 ..."); while(true) { Message msg = consumer.receive(); if(msg instanceof TextMessage) { String body = ((TextMessage) msg).getText(); if("SHUTDOWN".equals(body)) { long diff = System.currentTimeMillis() - start; System.out.println(String.format("在 %.2f 秒内接收到 %d 条消息", (1.0 * diff / 1000.0), (count - 1))); connection.close(); try { Thread.sleep(10); } catch (Exception e) {} System.exit(1); } else { try { if (count != msg.getIntProperty("id")) { System.out.println("消息不匹配: " + count + "!=" + msg.getIntProperty("id")); } } catch (NumberFormatException ignore) { } if(count == 1) { start = System.currentTimeMillis(); } System.out.println(String.format("接收到第 %d 条消息: ", count) + body); count ++; } } else { System.out.println("错误的消息类型: " + msg.getClass()); } } } //获取环境变量类 private static String env(String key, String defaultValue) { String rc = System.getenv(key); if(rc == null) return defaultValue; return rc; } private static String arg(String[] args, int index, String defaultValue) { if(index < args.length) return args[index]; else return defaultValue; }}7).Listener2.java文件跟Listener1.java差不多
8).先运行Listener1.java和Listener2.java,再运行Publisher.java看看效果:
新闻热点
疑难解答