| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160 |
- <?php
- namespace app\job;
- use app\api\controller\OrderSuperLoss;
- use app\common\library\token\driver\Redis;
- use think\Db;
- use think\Log;
- class OrderLossQueueWorker
- {
- const MAX_RETRIES = 3;
- const QUEUE_TIMEOUT = 30; // 秒
- protected $redis;
- public function __construct()
- {
- $this->initRedis();
- }
- private function initRedis()
- {
- $options = [
- 'host' => '127.0.0.1',
- 'port' => 6379,
- 'password' => '',
- 'select' => 15,
- 'timeout' => 0,
- 'expire' => 0,
- 'persistent' => false,
- 'prefix' => '',
- ];
- $this->redis = new Redis($options);
- }
- public function start()
- {
- while (true) {
- $this->processQueue();
- }
- }
- private function processQueue()
- {
- try {
- // 阻塞式获取任务
- $taskData = $this->redis->brPop(['order_loss_queue'], self::QUEUE_TIMEOUT);
- if ($taskData === null) {
- return; // 超时继续循环
- }
- $task = $this->parseTaskData($taskData);
- $this->handleTask($task);
- } catch (\Exception $e) {
- Log::error("队列处理异常: " . $e->getMessage());
- }
- }
- private function parseTaskData($taskData)
- {
- if (!is_array($taskData) || count($taskData) < 2) {
- throw new \Exception("无效的任务数据格式");
- }
- $task = json_decode($taskData[1], true);
- if (json_last_error() !== JSON_ERROR_NONE) {
- throw new \Exception("任务JSON解析失败: " . json_last_error_msg());
- }
- return $task;
- }
- private function handleTask($task)
- {
- try {
- // 执行核心业务逻辑
- $this->processTask($task);
- // 更新处理进度
- $this->updateProgress($task['batch_id']);
- // 检查批次完成状态
- $this->checkBatchCompletion($task['batch_id']);
- } catch (\Exception $e) {
- $this->handleTaskFailure($task, $e);
- }
- }
- private function processTask($task)
- {
- // 原有业务逻辑保持不变
- $api = new OrderSuperLoss();
- $result = $api->OneOrderSuperLoss($task['gdbh'], $task['yjno']);
- // 数据库操作
- Db::name('工单_质量考核汇总')
- ->where('Gy0_gdbh', $task['gdbh'])
- ->where('印件及工序', $task['yjno'])
- ->delete();
- Db::name('工单_质量考核汇总')
- ->insertAll($result);
- }
- private function updateProgress($batchId)
- {
- $this->redis->hIncrBy("batch:{$batchId}", 'processed', 1);
- }
- private function checkBatchCompletion($batchId)
- {
- $batch = $this->redis->hGetAll("batch:{$batchId}");
- if ($batch &&
- isset($batch['total'], $batch['processed']) &&
- $batch['processed'] >= $batch['total']
- ) {
- $this->markBatchCompleted($batchId);
- }
- }
- private function markBatchCompleted($batchId)
- {
- $this->redis->hMSet("batch:{$batchId}", [
- 'status' => 'completed',
- 'completed_at' => date('Y-m-d H:i:s')
- ]);
- $this->redis->publish("batch:{$batchId}", json_encode([
- 'status' => 'completed',
- 'batch_id' => $batchId
- ]));
- }
- private function handleTaskFailure($task, \Exception $e)
- {
- if ($task['retries'] < self::MAX_RETRIES) {
- $this->retryTask($task);
- Log::warning("任务重试 #{$task['retries']} [{$task['gdbh']}-{$task['yjno']}]");
- } else {
- $this->markBatchFailed($task['batch_id']);
- Log::error("任务最终失败: {$e->getMessage()} ". json_encode($task));
- }
- }
- private function retryTask(&$task)
- {
- $task['retries']++;
- $this->redis->rPush('order_loss_queue', json_encode($task));
- }
- private function markBatchFailed($batchId)
- {
- $this->redis->hSet("batch:{$batchId}", 'status', 'failed');
- }
- }
|