Worker.php 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. <?php
  2. // +----------------------------------------------------------------------
  3. // | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
  4. // +----------------------------------------------------------------------
  5. // | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
  6. // +----------------------------------------------------------------------
  7. // | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
  8. // +----------------------------------------------------------------------
  9. // | Author: yunwuxin <448901948@qq.com>
  10. // +----------------------------------------------------------------------
  11. namespace think\queue;
  12. use Exception;
  13. use think\Hook;
  14. use think\Queue;
  15. class Worker
  16. {
  17. /**
  18. * 执行下个任务
  19. * @param string $queue
  20. * @param int $delay
  21. * @param int $sleep
  22. * @param int $maxTries
  23. * @return array
  24. */
  25. public function pop($queue = null, $delay = 0, $sleep = 3, $maxTries = 0)
  26. {
  27. $job = $this->getNextJob($queue);
  28. if (!is_null($job)) {
  29. Hook::listen('worker_before_process', $queue);
  30. return $this->process($job, $maxTries, $delay);
  31. }
  32. Hook::listen('worker_before_sleep', $queue);
  33. $this->sleep($sleep);
  34. return ['job' => null, 'failed' => false];
  35. }
  36. /**
  37. * 获取下个任务
  38. * @param string $queue
  39. * @return Job
  40. */
  41. protected function getNextJob($queue)
  42. {
  43. if (is_null($queue)) {
  44. return Queue::pop();
  45. }
  46. foreach (explode(',', $queue) as $queue) {
  47. if (!is_null($job = Queue::pop($queue))) {
  48. return $job;
  49. }
  50. }
  51. }
  52. /**
  53. * Process a given job from the queue.
  54. * @param \think\queue\Job $job
  55. * @param int $maxTries
  56. * @param int $delay
  57. * @return array
  58. * @throws Exception
  59. */
  60. public function process(Job $job, $maxTries = 0, $delay = 0)
  61. {
  62. if ($maxTries > 0 && $job->attempts() > $maxTries) {
  63. return $this->logFailedJob($job);
  64. }
  65. try {
  66. $job->fire();
  67. return ['job' => $job, 'failed' => false];
  68. } catch (Exception $e) {
  69. if (!$job->isDeleted()) {
  70. $job->release($delay);
  71. }
  72. throw $e;
  73. }
  74. }
  75. /**
  76. * Log a failed job into storage.
  77. * @param \Think\Queue\Job $job
  78. * @return array
  79. */
  80. protected function logFailedJob(Job $job)
  81. {
  82. if (!$job->isDeleted()) {
  83. try {
  84. $job->delete();
  85. $job->failed();
  86. } finally {
  87. Hook::listen('queue_failed', $job);
  88. }
  89. }
  90. return ['job' => $job, 'failed' => true];
  91. }
  92. /**
  93. * Sleep the script for a given number of seconds.
  94. * @param int $seconds
  95. * @return void
  96. */
  97. public function sleep($seconds)
  98. {
  99. sleep($seconds);
  100. }
  101. }