首页 > 编程 > Java > 正文

redis发布订阅Java代码实现过程解析

2019-11-26 08:34:34
字体:
来源:转载
供稿:网友

前言

Redis除了可以用作缓存数据外,另一个重要用途是它实现了发布订阅(pub/sub)消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。

为了实现redis的发布订阅机制,首先要打开redis服务;其次,引入redis需要的jar包,在pom.xml配置文件加入以下代码:

<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.1.0</version></dependency>

由于订阅消息通道需要再tomcat启动时触发,因此,需要创建一个listener监听器,在监听器里实现redis订阅,在web.xml里配置监听器如下:

<listener><listener-class>com.test.listener.InitListener</listener-class></listener>

一、订阅消息(InitListener实现)

redis支持多通道订阅,一个客户端可以同时订阅多个消息通道,如下代码所示,订阅了13个通道。由于订阅机制是线程阻塞的,需要额外开启一个线程专门用于处理订阅消息及接收消息处理。

public class InitListener implements ServletContextListener{  private Logger logger = Logger.getLogger(InitListener.class);    @Override  public void contextInitialized(ServletContextEvent sce) {    logger.info("启动tomcat");// 连接redis    Map<String, String> proMap = PropertyReader.getProperties();    final String url = proMap.get("redis.host");    final Integer port = Integer.parseInt(proMap.get("redis.port"));    final ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("classpath*:applicationContext.xml");    final RedisSubListener redisSubListener = (RedisSubListener) classPathXmlApplicationContext.getBean("redisSubListener");    // 为防止阻塞tomcat启动,开启线程执行    new Thread(new Runnable(){       public void run(){         // 连接redis,建立监听        Jedis jedis = null;        while(true){          //解码资源更新通知,画面选看回复,画面选看停止回复,预案启动,预案停止,轮切启动,轮切停止,预案启动回复,预案停止回复,轮切启动回复,轮切停止回复,监视屏分屏状态通知,画面状态通知          String[] channels = new String[] { "decodeResourceUpdateNtf", "tvSplitPlayRsp","tvSplitPlayStopRsp",              "planStartStatusNtf", "planStopStatusNtf", "pollStartStatusNtf", "pollStopStatusNtf",              "planStartRsp","planStopRsp","pollStartRsp","pollStopRsp","tvSplitTypeNtf","tvSplitStatusNtf"};          try{            jedis = new Jedis(url,port);            logger.info("redis请求订阅通道");            jedis.subscribe(redisSubListener,channels);            logger.info("redis订阅结束");          }catch(JedisConnectionException e){            logger.error("Jedis连接异常,异常信息 :" + e);          }catch(IllegalStateException e){             logger.error("Jedis异常,异常信息 :" + e);          }                    try {            Thread.sleep(1000);          } catch (InterruptedException e) {            e.printStackTrace();          }          if(jedis != null){            jedis = null;          }        }      }})    .start();  }

最后在spring配置文件里接入以下配置:

<!-- redis -->   <bean id="redisMessageService" class="com.test.service.impl.RedisMessageServiceImpl" scope="singleton">     <property name="webSocketService"><ref local="webSocketService" /></property>     <property name="tvSplitStatusDao" ref="tvSplitStatusDao"></property>   </bean>   <bean id="redisSubListener" class="com.test.common.RedisSubListener" scope="singleton">     <property name="redisMessageService"><ref local="redisMessageService" /></property>   </bean>

RedisMessageServiceImpl用于处理接收的redis消息。

二、发布消息

public class RedisPublishUtil {  private Logger logger = Logger.getLogger(RedisPublishUtil.class);  public static Jedis pubJedis;  private static Map<String, String> proMap = PropertyReader.getProperties();  private static final String redisPort = proMap.get("redis.port");  private static String url = proMap.get("redis.host");  private static final int port = Integer.parseInt(redisPort);    public void setPubJedis(Jedis jedis) {    RedisPublishUtil.pubJedis = jedis;  }    public Jedis getPubJedis() {    if (pubJedis == null) {      createJedisConnect();    }    // 返回对象    return pubJedis;  }    public Jedis createJedisConnect(){    // 连接redis    logger.info("===创建连接jedis=====");    try {      pubJedis = new Jedis(url, port);    } catch (JedisConnectionException e) {      logger.error("Jedis连接异常,异常信息 :" + e.getMessage());      try {        Thread.sleep(1000);        logger.info("发起重新连接jedis");        createJedisConnect();      } catch (InterruptedException except) {        except.printStackTrace();      }    }    // 返回对象    return pubJedis;  }  //公共发布接口  public void pubRedisMsg(String msgType,String msg){    logger.info("redis准备发布消息内容:" + msg);    try {      this.getPubJedis().publish(msgType, msg);    } catch (JedisConnectionException e) {      logger.error("redis发布消息失败!", e);      this.setPubJedis(null);      logger.info("重新发布消息,channel="+msgType);      pubRedisMsg(msgType, msg);    }  }}
public class PropertyReader {   private static Logger logger = Logger.getLogger(PropertyReader.class);    /*   * 获得数据库链接的配置文件   */  public static Map<String,String> getProperties(){    logger.info("读取redis配置文件开始。。。");         Properties prop = new Properties();            Map<String,String> proMap = new HashMap<String,String>();         try {       //读取属性文件redis.properties      InputStream in= PropertyReader.class.getClassLoader().getResourceAsStream("redis.properties");             prop.load(in);   ///加载属性列表      Iterator<String> it=prop.stringPropertyNames().iterator();      while(it.hasNext()){        String key=it.next();        proMap.put(key, prop.getProperty(key));      }      in.close();      logger.info("读取redis配置文件成功。。。");    } catch (Exception e) {      logger.error("读取redis配置文件异常!", e);      e.printStackTrace();    }    return proMap;  }}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持武林网。

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表