OrderLossQueueWorker.php 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. <?php
  2. namespace app\job;
  3. use app\api\controller\OrderSuperLoss;
  4. use app\common\library\token\driver\Redis;
  5. use think\Db;
  6. use think\Log;
  7. class OrderLossQueueWorker
  8. {
  9. const MAX_RETRIES = 3;
  10. const QUEUE_TIMEOUT = 30; // 秒
  11. protected $redis;
  12. public function __construct()
  13. {
  14. $this->initRedis();
  15. }
  16. private function initRedis()
  17. {
  18. $options = [
  19. 'host' => '127.0.0.1',
  20. 'port' => 6379,
  21. 'password' => '123456',
  22. 'select' => 0,
  23. 'timeout' => 0,
  24. 'expire' => 0,
  25. 'persistent' => false,
  26. 'prefix' => '',
  27. ];
  28. $this->redis = new Redis($options);
  29. }
  30. public function start()
  31. {
  32. while (true) {
  33. $this->processQueue();
  34. }
  35. }
  36. private function processQueue()
  37. {
  38. try {
  39. // 阻塞式获取任务
  40. $taskData = $this->redis->brPop(['order_loss_queue'], self::QUEUE_TIMEOUT);
  41. if ($taskData === null) {
  42. return; // 超时继续循环
  43. }
  44. $task = $this->parseTaskData($taskData);
  45. $this->handleTask($task);
  46. } catch (\Exception $e) {
  47. Log::error("队列处理异常: " . $e->getMessage());
  48. }
  49. }
  50. private function parseTaskData($taskData)
  51. {
  52. if (!is_array($taskData) || count($taskData) < 2) {
  53. throw new \Exception("无效的任务数据格式");
  54. }
  55. $task = json_decode($taskData[1], true);
  56. if (json_last_error() !== JSON_ERROR_NONE) {
  57. throw new \Exception("任务JSON解析失败: " . json_last_error_msg());
  58. }
  59. return $task;
  60. }
  61. private function handleTask($task)
  62. {
  63. try {
  64. // 执行核心业务逻辑
  65. $this->processTask($task);
  66. // 更新处理进度
  67. $this->updateProgress($task['batch_id']);
  68. // 检查批次完成状态
  69. $this->checkBatchCompletion($task['batch_id']);
  70. } catch (\Exception $e) {
  71. $this->handleTaskFailure($task, $e);
  72. }
  73. }
  74. private function processTask($task)
  75. {
  76. // 原有业务逻辑保持不变
  77. $api = new OrderSuperLoss();
  78. $result = $api->OneOrderSuperLoss($task['gdbh'], $task['yjno']);
  79. // 数据库操作
  80. Db::name('工单_质量考核汇总')
  81. ->where('Gy0_gdbh', $task['gdbh'])
  82. ->where('印件及工序', $task['yjno'])
  83. ->delete();
  84. $insertSql = Db::name('工单_质量考核汇总')
  85. ->fetchSql(true)
  86. ->insertAll($result);
  87. $insertRes = Db::query($insertSql);
  88. if ($insertRes === false) {
  89. throw new \RuntimeException("超节损队列插入失败: {$task['gdbh']}-{$task['yjno']}");
  90. }
  91. }
  92. private function updateProgress($batchId)
  93. {
  94. $this->redis->hIncrBy("batch:{$batchId}", 'processed', 1);
  95. }
  96. private function checkBatchCompletion($batchId)
  97. {
  98. $batch = $this->redis->hGetAll("batch:{$batchId}");
  99. if ($batch &&
  100. isset($batch['total'], $batch['processed']) &&
  101. $batch['processed'] >= $batch['total']
  102. ) {
  103. $this->markBatchCompleted($batchId);
  104. }
  105. }
  106. private function markBatchCompleted($batchId)
  107. {
  108. $this->redis->hMSet("batch:{$batchId}", [
  109. 'status' => 'completed',
  110. 'completed_at' => date('Y-m-d H:i:s')
  111. ]);
  112. $this->redis->publish("batch:{$batchId}", json_encode([
  113. 'status' => 'completed',
  114. 'batch_id' => $batchId
  115. ]));
  116. }
  117. private function handleTaskFailure($task, \Exception $e)
  118. {
  119. if ($task['retries'] < self::MAX_RETRIES) {
  120. $this->retryTask($task);
  121. Log::warning("任务重试 #{$task['retries']} [{$task['gdbh']}-{$task['yjno']}]");
  122. } else {
  123. $this->markBatchFailed($task['batch_id']);
  124. Log::error("任务最终失败: {$e->getMessage()} ". json_encode($task));
  125. }
  126. }
  127. private function retryTask(&$task)
  128. {
  129. $task['retries']++;
  130. $this->redis->rPush('order_loss_queue', json_encode($task));
  131. }
  132. private function markBatchFailed($batchId)
  133. {
  134. $this->redis->hSet("batch:{$batchId}", 'status', 'failed');
  135. }
  136. }