OrderLossQueueWorker.php 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. <?php
  2. namespace app\job;
  3. use app\api\controller\OrderSuperLoss;
  4. use app\common\library\token\driver\Redis;
  5. use think\Db;
  6. use think\Log;
  7. class OrderLossQueueWorker
  8. {
  9. const MAX_RETRIES = 3;
  10. const QUEUE_TIMEOUT = 30; // 秒
  11. protected $redis;
  12. public function __construct()
  13. {
  14. $this->initRedis();
  15. }
  16. private function initRedis()
  17. {
  18. $options = [
  19. 'host' => '127.0.0.1',
  20. 'port' => 6379,
  21. 'password' => '',
  22. 'select' => 15,
  23. 'timeout' => 0,
  24. 'expire' => 0,
  25. 'persistent' => false,
  26. 'prefix' => '',
  27. ];
  28. $this->redis = new Redis($options);
  29. }
  30. public function start()
  31. {
  32. while (true) {
  33. $this->processQueue();
  34. }
  35. }
  36. private function processQueue()
  37. {
  38. try {
  39. // 阻塞式获取任务
  40. $taskData = $this->redis->brPop(['order_loss_queue'], self::QUEUE_TIMEOUT);
  41. if ($taskData === null) {
  42. return; // 超时继续循环
  43. }
  44. $task = $this->parseTaskData($taskData);
  45. $this->handleTask($task);
  46. } catch (\Exception $e) {
  47. Log::error("队列处理异常: " . $e->getMessage());
  48. }
  49. }
  50. private function parseTaskData($taskData)
  51. {
  52. if (!is_array($taskData) || count($taskData) < 2) {
  53. throw new \Exception("无效的任务数据格式");
  54. }
  55. $task = json_decode($taskData[1], true);
  56. if (json_last_error() !== JSON_ERROR_NONE) {
  57. throw new \Exception("任务JSON解析失败: " . json_last_error_msg());
  58. }
  59. return $task;
  60. }
  61. private function handleTask($task)
  62. {
  63. try {
  64. // 执行核心业务逻辑
  65. $this->processTask($task);
  66. // 更新处理进度
  67. $this->updateProgress($task['batch_id']);
  68. // 检查批次完成状态
  69. $this->checkBatchCompletion($task['batch_id']);
  70. } catch (\Exception $e) {
  71. $this->handleTaskFailure($task, $e);
  72. }
  73. }
  74. private function processTask($task)
  75. {
  76. // 原有业务逻辑保持不变
  77. $api = new OrderSuperLoss();
  78. $result = $api->OneOrderSuperLoss($task['gdbh'], $task['yjno']);
  79. // 数据库操作
  80. Db::name('工单_质量考核汇总')
  81. ->where('Gy0_gdbh', $task['gdbh'])
  82. ->where('印件及工序', $task['yjno'])
  83. ->delete();
  84. Db::name('工单_质量考核汇总')
  85. ->insertAll($result);
  86. }
  87. private function updateProgress($batchId)
  88. {
  89. $this->redis->hIncrBy("batch:{$batchId}", 'processed', 1);
  90. }
  91. private function checkBatchCompletion($batchId)
  92. {
  93. $batch = $this->redis->hGetAll("batch:{$batchId}");
  94. if ($batch &&
  95. isset($batch['total'], $batch['processed']) &&
  96. $batch['processed'] >= $batch['total']
  97. ) {
  98. $this->markBatchCompleted($batchId);
  99. }
  100. }
  101. private function markBatchCompleted($batchId)
  102. {
  103. $this->redis->hMSet("batch:{$batchId}", [
  104. 'status' => 'completed',
  105. 'completed_at' => date('Y-m-d H:i:s')
  106. ]);
  107. $this->redis->publish("batch:{$batchId}", json_encode([
  108. 'status' => 'completed',
  109. 'batch_id' => $batchId
  110. ]));
  111. }
  112. private function handleTaskFailure($task, \Exception $e)
  113. {
  114. if ($task['retries'] < self::MAX_RETRIES) {
  115. $this->retryTask($task);
  116. Log::warning("任务重试 #{$task['retries']} [{$task['gdbh']}-{$task['yjno']}]");
  117. } else {
  118. $this->markBatchFailed($task['batch_id']);
  119. Log::error("任务最终失败: {$e->getMessage()} ". json_encode($task));
  120. }
  121. }
  122. private function retryTask(&$task)
  123. {
  124. $task['retries']++;
  125. $this->redis->rPush('order_loss_queue', json_encode($task));
  126. }
  127. private function markBatchFailed($batchId)
  128. {
  129. $this->redis->hSet("batch:{$batchId}", 'status', 'failed');
  130. }
  131. }