首页 > 开发 > Java > 正文

java多线程消息队列的实现代码

2024-07-13 10:10:11
字体:
来源:转载
供稿:网友

本文介绍了java多线程消息队列的实现代码,分享给大家,希望对大家有帮助,顺便也自己留个笔记

1、定义一个队列缓存池:

 //static修饰的成员变量和成员方法独立于该类的任何对象。也就是说,它不依赖类特定的实例,被类的所有实例共享。private static List<Queue> queueCache = new LinkedList<Queue>(); 

2、定义队列缓冲池最大消息数,如果达到该值,那么队列检入将等待检出低于该值时继续进行。

private Integer offerMaxQueue = 2000;

3、定义检出线程,如果队列缓冲池没有消息,那么检出线程会线程等待中

new Thread(){    public void run(){     while(true){      String ip = null;      try {       synchronized (queueCache) {        Integer size = queueCache.size();        if(size==0){//队列缓存池没有消息,等待。。。。     queueCache.wait();        }        Queue queue = queueCache.remove(0);        if(isIpLock(queueStr)){//假若这个是一个多应用的分布式系统,那么这个判断应该是分布式锁,这里说的锁不是线程停止,而是跳过该消息,滞后处理         queueCache.add(queue);该queue重新加入队列缓冲池,滞后处理,         continue;        }else{      ;//这里是处理该消息的操作。        }        size = queueCache.size();        if(size<offerMaxQueue&&size>=0){     queueCache.notifyAll();//在队列缓存池不超过最大值的前提下,假若检入正在等待中,那么那么让他们排队检入。        }       }      } catch (Exception e) {       e.printStackTrace();      }finally{       try {//检出该消息队列的锁        unIpLock(queueStr);       } catch (Execption e) {//捕获异常,不能让线程挂掉        e.printStackTrace();       }                       }      }   }.start();

4、检入队列

synchronized (queueCache) {while(true){Integer size = queueCache.size();if(size>=offerMaxQueue){      try {       queueCache.wait();continue;//继续执行等待中的检入任务。 } catch (InterruptedException e) {   e.printStackTrace(); } }//IFif(size<=offerMaxQueue&&size>0){ queueCache.notifyAll();}break;//检入完毕}//while}

5、锁方法实现

/**  * 锁  * @param ip  * @return  * @throws   */ public Boolean isLock(String queueStr) {  return this.redisManager.setnx(queueStr+"_lock", "LOCK", 10000)!=1; } //解锁 public void unIpLock(String queueStr) {  if(ip!=null){   this.redisManager.del(queueStr+"_lock");//  lock.unlock();  } }

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持VeVb武林网。


注:相关教程知识阅读请移步到JAVA教程频道。
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表