Queue.php 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: Admin
  5. * Date: 2017/12/29
  6. * Time: 15:08
  7. * 基本redis手消息队列
  8. * 用法:
  9. * use Com\Queue;
  10. * $queue = Queue::getInstance('msg');
  11. * 加入队列
  12. * $queue->push('aaaaaa');
  13. * $queue->push('bbbbb');
  14. * 获取队列长度
  15. * $queue->len();
  16. * 读取队列
  17. * $value = $queue->pop()
  18. * 删除队列
  19. * $queue->flushQueue();
  20. */
  21. namespace Com;
  22. class Queue extends \Think\Cache\Driver\Redis
  23. {
  24. static public $timeout = 1;
  25. static public $queueName = 'queue';
  26. /**
  27. * 操作句柄
  28. * @var string
  29. * @access protected
  30. */
  31. protected $handler;
  32. /**
  33. * 缓存连接参数
  34. * @var integer
  35. * @access protected
  36. */
  37. protected $options = array();
  38. /**
  39. * 取得缓存类实例
  40. * @static
  41. * @access public
  42. * @return mixed
  43. */
  44. public static function getInstance($queueName, $options = [])
  45. {
  46. if (C('DATA_CACHE_TYPE') != 'Redis') exit('DATA_CACHE_TYPE DO NOT Support Redis');
  47. //当前队列名称
  48. self::$queueName = $queueName;
  49. static $_instance = array();
  50. if (!isset($_instance[$queueName])) {
  51. $_instance[$queueName] = new Queue();
  52. }
  53. return $_instance[$queueName];
  54. }
  55. //设置队列名称
  56. public static function setQueueName($name)
  57. {
  58. self::$queueName = $name;
  59. }
  60. /**
  61. * 添加队列(lpush)
  62. * @param string $value
  63. * @return int 队列长度
  64. */
  65. public function push($value)
  66. {
  67. echo self::$queueName;die;
  68. return $this->lPush(self::$queueName, $value);
  69. }
  70. //brpop
  71. /**
  72. * 读取队列
  73. * @return string|nil
  74. */
  75. public function pop()
  76. {
  77. $result = $this->brPop(self::$queueName, self::$timeout);
  78. return empty($result) ? $result : $result[1];
  79. }
  80. /**
  81. * 删除一个消息队列
  82. */
  83. public function flushQueue()
  84. {
  85. $this->delete(self::$queueName);
  86. }
  87. /**
  88. * 返回队列长茺
  89. * @return int
  90. */
  91. public function len()
  92. {
  93. return $this->LLEN(self::$queueName);
  94. }
  95. }