Job.php 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. <?php
  2. // +----------------------------------------------------------------------
  3. // | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
  4. // +----------------------------------------------------------------------
  5. // | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
  6. // +----------------------------------------------------------------------
  7. // | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
  8. // +----------------------------------------------------------------------
  9. // | Author: yunwuxin <448901948@qq.com>
  10. // +----------------------------------------------------------------------
  11. namespace think\queue;
  12. use DateTime;
  13. use think\App;
  14. abstract class Job
  15. {
  16. /**
  17. * The job handler instance.
  18. * @var mixed
  19. */
  20. protected $instance;
  21. /**
  22. * The name of the queue the job belongs to.
  23. * @var string
  24. */
  25. protected $queue;
  26. /**
  27. * Indicates if the job has been deleted.
  28. * @var bool
  29. */
  30. protected $deleted = false;
  31. /**
  32. * Indicates if the job has been released.
  33. * @var bool
  34. */
  35. protected $released = false;
  36. /**
  37. * Fire the job.
  38. * @return void
  39. */
  40. abstract public function fire();
  41. /**
  42. * Delete the job from the queue.
  43. * @return void
  44. */
  45. public function delete()
  46. {
  47. $this->deleted = true;
  48. }
  49. /**
  50. * Determine if the job has been deleted.
  51. * @return bool
  52. */
  53. public function isDeleted()
  54. {
  55. return $this->deleted;
  56. }
  57. /**
  58. * Release the job back into the queue.
  59. * @param int $delay
  60. * @return void
  61. */
  62. public function release($delay = 0)
  63. {
  64. $this->released = true;
  65. }
  66. /**
  67. * Determine if the job was released back into the queue.
  68. * @return bool
  69. */
  70. public function isReleased()
  71. {
  72. return $this->released;
  73. }
  74. /**
  75. * Determine if the job has been deleted or released.
  76. * @return bool
  77. */
  78. public function isDeletedOrReleased()
  79. {
  80. return $this->isDeleted() || $this->isReleased();
  81. }
  82. /**
  83. * Get the number of times the job has been attempted.
  84. * @return int
  85. */
  86. abstract public function attempts();
  87. /**
  88. * Get the raw body string for the job.
  89. * @return string
  90. */
  91. abstract public function getRawBody();
  92. /**
  93. * Resolve and fire the job handler method.
  94. * @param array $payload
  95. * @return void
  96. */
  97. protected function resolveAndFire(array $payload)
  98. {
  99. list($class, $method) = $this->parseJob($payload['job']);
  100. $this->instance = $this->resolve($class);
  101. if ($this->instance) {
  102. $this->instance->{$method}($this, $payload['data']);
  103. }
  104. }
  105. /**
  106. * Parse the job declaration into class and method.
  107. * @param string $job
  108. * @return array
  109. */
  110. protected function parseJob($job)
  111. {
  112. $segments = explode('@', $job);
  113. return count($segments) > 1 ? $segments : [$segments[0], 'fire'];
  114. }
  115. /**
  116. * Resolve the given job handler.
  117. * @param string $name
  118. * @return mixed
  119. */
  120. protected function resolve($name)
  121. {
  122. if (strpos($name, '\\') === false) {
  123. if (strpos($name, '/') === false) {
  124. $module = '';
  125. } else {
  126. list($module, $name) = explode('/', $name, 2);
  127. }
  128. $name = App::$namespace . ($module ? '\\' . strtolower($module) : '') . '\\job\\' . $name;
  129. }
  130. if (class_exists($name)) {
  131. return new $name();
  132. }
  133. }
  134. /**
  135. * Call the failed method on the job instance.
  136. * @return void
  137. */
  138. public function failed()
  139. {
  140. $payload = json_decode($this->getRawBody(), true);
  141. list($class, $method) = $this->parseJob($payload['job']);
  142. $this->instance = $this->resolve($class);
  143. if ($this->instance && method_exists($this->instance, 'failed')) {
  144. $this->instance->failed($payload['data']);
  145. }
  146. }
  147. /**
  148. * Calculate the number of seconds with the given delay.
  149. * @param \DateTime|int $delay
  150. * @return int
  151. */
  152. protected function getSeconds($delay)
  153. {
  154. if ($delay instanceof DateTime) {
  155. return max(0, $delay->getTimestamp() - $this->getTime());
  156. }
  157. return (int) $delay;
  158. }
  159. /**
  160. * Get the current system time.
  161. * @return int
  162. */
  163. protected function getTime()
  164. {
  165. return time();
  166. }
  167. /**
  168. * Get the name of the queued job class.
  169. * @return string
  170. */
  171. public function getName()
  172. {
  173. return json_decode($this->getRawBody(), true)['job'];
  174. }
  175. /**
  176. * Get the name of the queue the job belongs to.
  177. * @return string
  178. */
  179. public function getQueue()
  180. {
  181. return $this->queue;
  182. }
  183. }