首页 > 开发 > Java > 正文

java结合WebSphere MQ实现接收队列文件功能

2024-07-13 09:55:55
字体:
来源:转载
供稿:网友

WebSphereMQ,也称MQSeries,以一致的、可靠的和易于管理的方式来连接应用程序,并为跨部门、企业范围的集成提供了可靠的基础。通过为重要的消息和事务提供可靠的、一次且仅一次的传递,MQ可以处理复杂的通信协议,并动态地将消息传递工作负载分配给可用的资源。

首先我们先来简单介绍下websphere mq以及安装使用简介

websphere mq : 用于传输信息 具有跨平台的功能。

1 安装websphere mq 并启动

2 websphere mq 建立 queue Manager (如:MQSI_SAMPLE_QM)

3 建立queue 类型选择 Local类型 的 (如lq )

4建立channels 类型选择Server Connection (如BridgeChannel)

接下来,我们来看实例代码:

 

 
  1. MQFileReceiver.java 
  2.  
  3. package com.mq.dpca.file; 
  4.  
  5. import java.io.File; 
  6. import java.io.FileOutputStream; 
  7.  
  8. import com.ibm.mq.MQEnvironment; 
  9. import com.ibm.mq.MQException; 
  10. import com.ibm.mq.MQGetMessageOptions; 
  11. import com.ibm.mq.MQMessage; 
  12. import com.ibm.mq.MQQueue; 
  13. import com.ibm.mq.MQQueueManager; 
  14. import com.ibm.mq.constants.MQConstants; 
  15. import com.mq.dpca.msg.MQConfig; 
  16. import com.mq.dpca.util.ReadCmdLine; 
  17. import com.mq.dpca.util.RenameUtil; 
  18.  
  19. /** 
  20.  
  21. * MQ分组接收文件功能 
  22. * 主动轮询 
  23. */ 
  24. public class MQFileReceiver { 
  25. private MQQueueManager qmgr; // 连接到队列管理器 
  26.  
  27. private MQQueue inQueue; // 传输队列 
  28.  
  29. private String queueName = ""// 队列名称 
  30.  
  31. private String host = ""// 
  32.  
  33. private int port = 1414; // 侦听器的端口号 
  34.  
  35. private String channel = ""// 通道名称 
  36.  
  37. private String qmgrName = ""// 队列管理器 
  38.  
  39. private MQMessage inMsg; // 创建消息缓冲 
  40.  
  41. private MQGetMessageOptions gmo; // 设置获取消息选项 
  42.  
  43. private static String fileName = null// 接收队列上的消息并存入文件 
  44.  
  45. private int ccsid = 0; 
  46.  
  47. private static String file_dir = null
  48.  
  49. /** 
  50. * 程序的入口 
  51.  
  52. * @param args 
  53. */ 
  54. public static void main(String args[]) { 
  55. MQFileReceiver mfs = new MQFileReceiver(); 
  56. //初始化连接 
  57. mfs.initproperty(); 
  58. //接收文件 
  59. mfs.runGoupReceiver(); 
  60. //获取shell脚本名 
  61. // String shellname = MQConfig.getValueByKey(fileName); 
  62. // if(shellname!=null&&!"".equals(shellname)){ 
  63. // //调用shell 
  64. // ReadCmdLine.callShell(shellname); 
  65. // }else{ 
  66. // System.out.println("have no shell name,Only receive files."); 
  67. // } 
  68.  
  69.  
  70. public void runGoupReceiver() { 
  71. try { 
  72. init(); 
  73. getGroupMessages(); 
  74. qmgr.commit(); 
  75. System.out.println("/n Messages successfully Receive "); 
  76. catch (MQException mqe) { 
  77. mqe.printStackTrace(); 
  78. try { 
  79. System.out.println("/n Backing out Transaction "); 
  80. qmgr.backout(); 
  81. System.exit(2); 
  82. catch (Exception e) { 
  83. e.printStackTrace(); 
  84. System.exit(2); 
  85. catch (Exception e) { 
  86. e.printStackTrace(); 
  87. System.exit(2); 
  88.  
  89. /** 
  90. * 初始化服务器连接信息 
  91.  
  92. * @throws Exception 
  93. */ 
  94. private void init() throws Exception { 
  95. /* 为客户机连接设置MQEnvironment属性 */ 
  96. MQEnvironment.hostname = host; 
  97. MQEnvironment.channel = channel; 
  98. MQEnvironment.port = port; 
  99.  
  100. /* 连接到队列管理器 */ 
  101. qmgr = new MQQueueManager(qmgrName); 
  102.  
  103. /* 设置队列打开选项以输 */ 
  104. int opnOptn = MQConstants.MQOO_INPUT_AS_Q_DEF 
  105. | MQConstants.MQOO_FAIL_IF_QUIESCING; 
  106.  
  107. /* 打开队列以输 */ 
  108. inQueue = qmgr.accessQueue(queueName, opnOptn, nullnullnull); 
  109.  
  110. /** 
  111. * 接受文件的主函数 
  112.  
  113. * @throws Exception 
  114. */ 
  115. public void getGroupMessages() { 
  116. /* 设置获取消息选项 */ 
  117. gmo = new MQGetMessageOptions(); 
  118. gmo.options = MQConstants.MQGMO_FAIL_IF_QUIESCING; 
  119. gmo.options = gmo.options + MQConstants.MQGMO_SYNCPOINT; 
  120. /* 等待消息 */ 
  121. gmo.options = gmo.options + MQConstants.MQGMO_WAIT; 
  122. /* 设置等待时间限制 */ 
  123. gmo.waitInterval = 5000; 
  124. /* 只获取消息 */ 
  125. gmo.options = gmo.options + MQConstants.MQGMO_ALL_MSGS_AVAILABLE; 
  126. /* 以辑顺序获取消息 */ 
  127. gmo.options = gmo.options + MQConstants.MQGMO_LOGICAL_ORDER; 
  128. gmo.matchOptions = MQConstants.MQMO_MATCH_GROUP_ID; 
  129. /* 创建消息缓冲 */ 
  130. inMsg = new MQMessage(); 
  131. try { 
  132. FileOutputStream fos = null
  133. /* 处理组消息 */ 
  134. while (true) { 
  135. try { 
  136. inQueue.get(inMsg, gmo); 
  137. if (fos == null) { 
  138. try { 
  139. fileName = inMsg.getStringProperty("fileName"); 
  140. String fileName_full = null
  141. fileName_full = file_dir + RenameUtil.rename(fileName); 
  142. fos = new FileOutputStream(new File(fileName_full)); 
  143. int msgLength = inMsg.getMessageLength(); 
  144. byte[] buffer = new byte[msgLength]; 
  145. inMsg.readFully(buffer); 
  146. fos.write(buffer, 0, msgLength); 
  147. /* 查看是否是最后消息标识 */ 
  148. char x = gmo.groupStatus; 
  149. if (x == MQConstants.MQGS_LAST_MSG_IN_GROUP) { 
  150. System.out.println("Last Msg in Group"); 
  151. break
  152. inMsg.clearMessage(); 
  153.  
  154. catch (Exception e) { 
  155. System.out 
  156. .println("Receiver the message without property,do nothing!"); 
  157. inMsg.clearMessage(); 
  158. else { 
  159. int msgLength = inMsg.getMessageLength(); 
  160. byte[] buffer = new byte[msgLength]; 
  161. inMsg.readFully(buffer); 
  162. fos.write(buffer, 0, msgLength); 
  163. /* 查看是否是最后消息标识 */ 
  164. char x = gmo.groupStatus; 
  165. if (x == MQConstants.MQGS_LAST_MSG_IN_GROUP) { 
  166. System.out.println("Last Msg in Group"); 
  167. break
  168. inMsg.clearMessage(); 
  169. catch (Exception e) { 
  170. char x = gmo.groupStatus; 
  171. if (x == MQConstants.MQGS_LAST_MSG_IN_GROUP) { 
  172. System.out.println("Last Msg in Group"); 
  173. break
  174. if (fos != null
  175. fos.close(); 
  176. catch (Exception e) { 
  177. System.out.println(e.getMessage()); 
  178.  
  179. public void initproperty() { 
  180. MQConfig config = new MQConfig().getInstance(); 
  181. if (config.getMQ_MANAGER() != null) { 
  182. qmgrName = config.getMQ_MANAGER(); 
  183. queueName = config.getMQ_QUEUE_NAME(); 
  184. channel = config.getMQ_CHANNEL(); 
  185. host = config.getMQ_HOST_NAME(); 
  186. port = Integer.valueOf(config.getMQ_PROT()); 
  187. ccsid = Integer.valueOf(config.getMQ_CCSID()); 
  188. file_dir = config.getFILE_DIR(); 

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表