WebSphereMQ,也称MQSeries,以一致的、可靠的和易于管理的方式来连接应用程序,并为跨部门、企业范围的集成提供了可靠的基础。通过为重要的消息和事务提供可靠的、一次且仅一次的传递,MQ可以处理复杂的通信协议,并动态地将消息传递工作负载分配给可用的资源。
首先我们先来简单介绍下websphere mq以及安装使用简介
websphere mq : 用于传输信息 具有跨平台的功能。
1 安装websphere mq 并启动
2 websphere mq 建立 queue Manager (如:MQSI_SAMPLE_QM)
3 建立queue 类型选择 Local类型 的 (如lq )
4建立channels 类型选择Server Connection (如BridgeChannel)
接下来,我们来看实例代码:
- MQFileReceiver.java
- package com.mq.dpca.file;
- import java.io.File;
- import java.io.FileOutputStream;
- import com.ibm.mq.MQEnvironment;
- import com.ibm.mq.MQException;
- import com.ibm.mq.MQGetMessageOptions;
- import com.ibm.mq.MQMessage;
- import com.ibm.mq.MQQueue;
- import com.ibm.mq.MQQueueManager;
- import com.ibm.mq.constants.MQConstants;
- import com.mq.dpca.msg.MQConfig;
- import com.mq.dpca.util.ReadCmdLine;
- import com.mq.dpca.util.RenameUtil;
- /**
- *
- * MQ分组接收文件功能
- * 主动轮询
- */
- public class MQFileReceiver {
- private MQQueueManager qmgr; // 连接到队列管理器
- private MQQueue inQueue; // 传输队列
- private String queueName = ""; // 队列名称
- private String host = ""; //
- private int port = 1414; // 侦听器的端口号
- private String channel = ""; // 通道名称
- private String qmgrName = ""; // 队列管理器
- private MQMessage inMsg; // 创建消息缓冲
- private MQGetMessageOptions gmo; // 设置获取消息选项
- private static String fileName = null; // 接收队列上的消息并存入文件
- private int ccsid = 0;
- private static String file_dir = null;
- /**
- * 程序的入口
- *
- * @param args
- */
- public static void main(String args[]) {
- MQFileReceiver mfs = new MQFileReceiver();
- //初始化连接
- mfs.initproperty();
- //接收文件
- mfs.runGoupReceiver();
- //获取shell脚本名
- // String shellname = MQConfig.getValueByKey(fileName);
- // if(shellname!=null&&!"".equals(shellname)){
- // //调用shell
- // ReadCmdLine.callShell(shellname);
- // }else{
- // System.out.println("have no shell name,Only receive files.");
- // }
- }
- public void runGoupReceiver() {
- try {
- init();
- getGroupMessages();
- qmgr.commit();
- System.out.println("/n Messages successfully Receive ");
- } catch (MQException mqe) {
- mqe.printStackTrace();
- try {
- System.out.println("/n Backing out Transaction ");
- qmgr.backout();
- System.exit(2);
- } catch (Exception e) {
- e.printStackTrace();
- System.exit(2);
- }
- } catch (Exception e) {
- e.printStackTrace();
- System.exit(2);
- }
- }
- /**
- * 初始化服务器连接信息
- *
- * @throws Exception
- */
- private void init() throws Exception {
- /* 为客户机连接设置MQEnvironment属性 */
- MQEnvironment.hostname = host;
- MQEnvironment.channel = channel;
- MQEnvironment.port = port;
- /* 连接到队列管理器 */
- qmgr = new MQQueueManager(qmgrName);
- /* 设置队列打开选项以输 */
- int opnOptn = MQConstants.MQOO_INPUT_AS_Q_DEF
- | MQConstants.MQOO_FAIL_IF_QUIESCING;
- /* 打开队列以输 */
- inQueue = qmgr.accessQueue(queueName, opnOptn, null, null, null);
- }
- /**
- * 接受文件的主函数
- *
- * @throws Exception
- */
- public void getGroupMessages() {
- /* 设置获取消息选项 */
- gmo = new MQGetMessageOptions();
- gmo.options = MQConstants.MQGMO_FAIL_IF_QUIESCING;
- gmo.options = gmo.options + MQConstants.MQGMO_SYNCPOINT;
- /* 等待消息 */
- gmo.options = gmo.options + MQConstants.MQGMO_WAIT;
- /* 设置等待时间限制 */
- gmo.waitInterval = 5000;
- /* 只获取消息 */
- gmo.options = gmo.options + MQConstants.MQGMO_ALL_MSGS_AVAILABLE;
- /* 以辑顺序获取消息 */
- gmo.options = gmo.options + MQConstants.MQGMO_LOGICAL_ORDER;
- gmo.matchOptions = MQConstants.MQMO_MATCH_GROUP_ID;
- /* 创建消息缓冲 */
- inMsg = new MQMessage();
- try {
- FileOutputStream fos = null;
- /* 处理组消息 */
- while (true) {
- try {
- inQueue.get(inMsg, gmo);
- if (fos == null) {
- try {
- fileName = inMsg.getStringProperty("fileName");
- String fileName_full = null;
- fileName_full = file_dir + RenameUtil.rename(fileName);
- fos = new FileOutputStream(new File(fileName_full));
- int msgLength = inMsg.getMessageLength();
- byte[] buffer = new byte[msgLength];
- inMsg.readFully(buffer);
- fos.write(buffer, 0, msgLength);
- /* 查看是否是最后消息标识 */
- char x = gmo.groupStatus;
- if (x == MQConstants.MQGS_LAST_MSG_IN_GROUP) {
- System.out.println("Last Msg in Group");
- break;
- }
- inMsg.clearMessage();
- } catch (Exception e) {
- System.out
- .println("Receiver the message without property,do nothing!");
- inMsg.clearMessage();
- }
- } else {
- int msgLength = inMsg.getMessageLength();
- byte[] buffer = new byte[msgLength];
- inMsg.readFully(buffer);
- fos.write(buffer, 0, msgLength);
- /* 查看是否是最后消息标识 */
- char x = gmo.groupStatus;
- if (x == MQConstants.MQGS_LAST_MSG_IN_GROUP) {
- System.out.println("Last Msg in Group");
- break;
- }
- inMsg.clearMessage();
- }
- } catch (Exception e) {
- char x = gmo.groupStatus;
- if (x == MQConstants.MQGS_LAST_MSG_IN_GROUP) {
- System.out.println("Last Msg in Group");
- }
- break;
- }
- }
- if (fos != null)
- fos.close();
- } catch (Exception e) {
- System.out.println(e.getMessage());
- }
- }
- public void initproperty() {
- MQConfig config = new MQConfig().getInstance();
- if (config.getMQ_MANAGER() != null) {
- qmgrName = config.getMQ_MANAGER();
- queueName = config.getMQ_QUEUE_NAME();
- channel = config.getMQ_CHANNEL();
- host = config.getMQ_HOST_NAME();
- port = Integer.valueOf(config.getMQ_PROT());
- ccsid = Integer.valueOf(config.getMQ_CCSID());
- file_dir = config.getFILE_DIR();
- }
- }
- }
新闻热点
疑难解答
图片精选