initRedis(); } private function initRedis() { $options = [ 'host' => '127.0.0.1', 'port' => 6379, 'password' => '', 'select' => 14, 'timeout' => 0, 'expire' => 0, 'persistent' => false, 'prefix' => '', ]; $this->redis = new Redis($options); } public function start() { while (true) { $this->processQueue(); } } private function processQueue() { try { // 阻塞式获取任务 $taskData = $this->redis->brPop(['order_loss_queue'], self::QUEUE_TIMEOUT); if ($taskData === null) { return; // 超时继续循环 } $task = $this->parseTaskData($taskData); $this->handleTask($task); } catch (\Exception $e) { Log::error("队列处理异常: " . $e->getMessage()); } } private function parseTaskData($taskData) { if (!is_array($taskData) || count($taskData) < 2) { throw new \Exception("无效的任务数据格式"); } $task = json_decode($taskData[1], true); if (json_last_error() !== JSON_ERROR_NONE) { throw new \Exception("任务JSON解析失败: " . json_last_error_msg()); } return $task; } private function handleTask($task) { try { // 执行核心业务逻辑 $this->processTask($task); // 更新处理进度 $this->updateProgress($task['batch_id']); // 检查批次完成状态 $this->checkBatchCompletion($task['batch_id']); } catch (\Exception $e) { $this->handleTaskFailure($task, $e); } } private function processTask($task) { // 原有业务逻辑保持不变 $api = new OrderSuperLoss(); $result = $api->OneOrderSuperLoss($task['gdbh'], $task['yjno']); // 数据库操作 Db::name('工单_质量考核汇总') ->where('Gy0_gdbh', $task['gdbh']) ->where('印件及工序', $task['yjno']) ->delete(); Db::name('工单_质量考核汇总') ->insertAll($result); } private function updateProgress($batchId) { $this->redis->hIncrBy("batch:{$batchId}", 'processed', 1); } private function checkBatchCompletion($batchId) { $batch = $this->redis->hGetAll("batch:{$batchId}"); if ($batch && isset($batch['total'], $batch['processed']) && $batch['processed'] >= $batch['total'] ) { $this->markBatchCompleted($batchId); } } private function markBatchCompleted($batchId) { $this->redis->hMSet("batch:{$batchId}", [ 'status' => 'completed', 'completed_at' => date('Y-m-d H:i:s') ]); $this->redis->publish("batch:{$batchId}", json_encode([ 'status' => 'completed', 'batch_id' => $batchId ])); } private function handleTaskFailure($task, \Exception $e) { if ($task['retries'] < self::MAX_RETRIES) { $this->retryTask($task); Log::warning("任务重试 #{$task['retries']} [{$task['gdbh']}-{$task['yjno']}]"); } else { $this->markBatchFailed($task['batch_id']); Log::error("任务最终失败: {$e->getMessage()} ". json_encode($task)); } } private function retryTask(&$task) { $task['retries']++; $this->redis->rPush('order_loss_queue', json_encode($task)); } private function markBatchFailed($batchId) { $this->redis->hSet("batch:{$batchId}", 'status', 'failed'); } }