首页 > 编程 > JavaScript > 正文

PHP实现基于Redis的MessageQueue队列封装操作示例

2019-11-19 12:10:37
字体:
来源:转载
供稿:网友

本文实例讲述了PHP实现基于Redis的MessageQueue队列封装操作。分享给大家供大家参考,具体如下:

Redis的链表List可以用来做链表,高并发的特性非常适合做分布式的并行消息传递。

项目地址:https://github.com/huyanping/Zebra-PHP-Framework

左进右出

$redis->lPush($key, $value);$redis->rPop($key);

以下程序已在生产环境中正式使用。

基于Redis的PHP消息队列封装

<?php/** * Created by PhpStorm. * User: huyanping * Date: 14-8-19 * Time: 下午12:10 * * 基于Redis的消息队列封装 */namespace Zebra/MessageQueue;class RedisMessageQueue implements IMessageQueue{  protected $redis_server;  protected $server;  protected $port;  /**   * @var 消息队列标志   */  protected $key;  /**   * 构造队列,创建redis链接   * @param $server_config   * @param $key   * @param bool $p_connect   */  public function __construct($server_config = array('IP' => '127.0.0.1', 'PORT' => '6379'), $key = 'redis_message_queue', $p_connect = false)  {    if (empty($key))      throw new /Exception('message queue key can not be empty');    $this->server = $server_config['IP'];    $this->port = $server_config['PORT'];    $this->key = $key;    $this->check_environment();    if ($p_connect) {      $this->pconnect();    } else {      $this->connect();    }  }  /**   * 析构函数,关闭redis链接,使用长连接时,最好主动调用关闭   */  public function __destruct()  {    $this->close();  }  /**   * 短连接   */  private function connect()  {    $this->redis_server = new /Redis();    $this->redis_server->connect($this->server, $this->port);  }  /**   * 长连接   */  public function pconnect()  {    $this->redis_server = new /Redis();    $this->redis_server->pconnect($this->server, $this->port);  }  /**   * 关闭链接   */  public function close()  {    $this->redis_server->close();  }  /**   * 向队列插入一条信息   * @param $message   * @return mixed   */  public function put($message)  {    return $this->redis_server->lPush($this->key, $message);  }  /**   * 向队列中插入一串信息   * @param $message   * @return mixed   */  public function puts(){    $params = func_get_args();    $message_array = array_merge(array($this->key), $params);    return call_user_func_array(array($this->redis_server, 'lPush'), $message_array);  }  /**   * 从队列顶部获取一条记录   * @return mixed   */  public function get()  {    return $this->redis_server->lPop($this->key);  }  /**   * 选择数据库,可以用于区分不同队列   * @param $database   */  public function select($database)  {    $this->redis_server->select($database);  }  /**   * 获得队列状态,即目前队列中的消息数量   * @return mixed   */  public function size()  {    return $this->redis_server->lSize($this->key);  }  /**   * 获取某一位置的值,不会删除该位置的值   * @param $pos   * @return mixed   */  public function view($pos)  {    return $this->redis_server->lGet($this->key, $pos);  }  /**   * 检查Redis扩展   * @throws Exception   */  protected function check_environment()  {    if (!/extension_loaded('redis')) {      throw new /Exception('Redis extension not loaded');    }  }}

如果需要一次写入多个队列,可以使用如下调用方式:

<?php$redis = new RedisMessageQueue();$redis->puts(1, 2, 3, 4);$redis->puts(5, 6, 7, 8, 9);

模仿HTTPSQS输出结果的封装如下,提供了写入位置和读取位置记录的功能:

<?php/** * Created by PhpStorm. * User: huyanping * Date: 14-9-5 * Time: 下午2:16 * * 附加了队列状态信息的RedisMessageQueue */namespace Zebra/MessageQueue;class RedisMessageQueueStatus extends RedisMessageQueue {  protected $record_status;  protected $put_position;  protected $get_position;  public function __construct(    $server_config = array('IP' => '127.0.0.1', 'PORT' => '6379'),    $key = 'redis_message_queue',    $p_connect = false,    $record_status=true  ){    parent::__construct($server_config, $key, $p_connect);    $this->record_status = $record_status;    $this->put_position = $this->key . '_put_position';    $this->get_position = $this->key . '_get_position';  }  public function get(){    if($queue = parent::get()){      $incr_result = $this->redis_server->incr($this->get_position);      if(!$incr_result) throw new /Exception('can not mark get position,please check the redis server');      return $queue;    }else{      return false;    }  }  public function put($message){    if(parent::put($message)){      $incr_result = $this->redis_server->incr($this->put_position);      if(!$incr_result) throw new /Exception('can not mark put position,please check the redis server');      return true;    }else{      return false;    }  }  public function puts_status(){    $message_array = func_get_args();    $result = call_user_func_array(array($this, 'puts'), $message_array);    if($result){      $this->redis_server->incrBy($this->put_position, count($message_array));      return true;    }    return false;  }  public function size(){    return $this->redis_server->lSize($this->key);  }  public function status(){    $status['put_position'] = ($put_position = $this->redis_server->get($this->put_position)) ? $put_position : 0;    $status['get_position'] = ($get_position = $this->redis_server->get($this->get_position)) ? $get_position : 0;    $status['unread_queue'] = $this->size();    $status['queue_name'] = $this->key;    $status['server'] = $this->server;    $status['port'] = $this->port;    return $status;  }  public function status_normal(){    $status = $this->status();    $message = 'Redis Message Queue' . PHP_EOL;    $message .= '-------------------' . PHP_EOL;    $message .= 'Message queue name:' . $status['queue_name'] . PHP_EOL;    $message .= 'Put position of queue:' . $status['put_position'] . PHP_EOL;    $message .= 'Get position of queue:' . $status['get_position'] . PHP_EOL;    $message .= 'Number of unread queue:' . $status['unread_queue'] . PHP_EOL;    return $message;  }  public function status_json(){    return /json_encode($this->status());  }}

更多关于PHP相关内容感兴趣的读者可查看本站专题:《php+redis数据库程序设计技巧总结》、《php面向对象程序设计入门教程》、《PHP基本语法入门教程》、《PHP数组(Array)操作技巧大全》、《php字符串(string)用法总结》、《php+mysql数据库操作入门教程》及《php常见数据库操作技巧汇总

希望本文所述对大家PHP程序设计有所帮助。

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表