data = $data; //// } // // protected $data; // protected $taskData; // // public function __construct($data) // { // $this->data = $data; // } // public function handle() // { //// $options = [ //// 'host' => '127.0.0.1', //// 'port' => 6379, //// 'password' => '', //// 'select' => 15, //// 'timeout' => 0, //// 'expire' => 0, //// 'persistent' => false, //// 'prefix' => '', //// ]; //// $redis = new Redis($options); //// $taskIdentifier = md5(json_encode('date')); //// $handData = []; // $options = [ // 'host' => '127.0.0.1', // 'port' => 6379, // 'password' => '123456', // 'select' => 15, // 'timeout' => 0, // 'expire' => 0, // 'persistent' => false, // 'prefix' => '', // ]; // $redis = new Redis($options); // // // 从任务数据中获取月份 // $date = $this->taskData['date'] ?? ''; // $taskId = $this->taskData['task_id'] ?? 0; // $taskIdentifier = md5('salary_calculation_' . $date); // // try { // // 更新任务状态为执行中 // if ($taskId) { // Db::name('queue_tasks') // ->where('id', $taskId) // ->update([ // 'status' => 'processing', // 'start_time' => date('Y-m-d H:i:s'), // 'update_time' => date('Y-m-d H:i:s') // ]); // } // // // 原有的数据处理逻辑 // $handData = []; // foreach ($this->data as $key=>$value){ // $handData[$key]['sczl_gdbh'] = $value['sczl_gdbh']; // $handData[$key]['sczl_yjno'] = $value['sczl_yjno']; // $handData[$key]['sczl_gxh'] = $value['sczl_gxh']; // $handData[$key]['sczl_type'] = $value['sczl_type']; // $handData[$key]['sczl_rq'] = $value['sczl_rq']; // $handData[$key]['sczl_jtbh'] = $value['sczl_jtbh']; // $handData[$key]['班组车头产量'] = $value['班组车头产量']; // $handData[$key]['工价系数'] = $value['工价系数']; // $handData[$key]['工序难度系数'] = $value['工序难度系数']; // $handData[$key]['装版工时'] = $value['装版工时']; // $handData[$key]['保养工时'] = $value['保养工时']; // $handData[$key]['打样工时'] = $value['打样工时']; // $handData[$key]['异常停机工时'] = $value['异常停机工时']; // $handData[$key]['车头产量占用机时'] = $value['车头产量占用机时']; // $handData[$key]['日定额'] = $value['日定额']; // $handData[$key]['千件工价'] = $value['千件工价']; // $handData[$key]['补产标准'] = $value['补产标准']; // $handData[$key]['班组换算产量'] = $value['班组换算产量']; // $handData[$key]['计时补差额工资'] = $value['计时补差额工资']; // $handData[$key]['bh'] = $value['bh']; // $handData[$key]['xm'] = $value['xm']; // $handData[$key]['Rate'] = $value['Rate']; // $handData[$key]['sczl_ms'] = $value['sczl_ms']; // $handData[$key]['工时占比'] = $value['工时占比']; // $handData[$key]['达标定额'] = $value['达标定额']; // $handData[$key]['个人计件工资'] = $value['个人计件工资']; // $handData[$key]['个人加班工资'] = $value['个人加班工资']; // $handData[$key]['UniqID'] = $value['UniqID']; // $handData[$key]['sys_ny'] = $value['sys_ny']; // $handData[$key]['sys_rq'] = $value['sys_rq']; // $handData[$key]['sys_id'] = $value['sys_id']; // $handData[$key]['法定天数'] = $value['法定天数']; // } // $sql =Db::name('绩效工资汇总')->fetchSql(true)->insertAll($handData); // $res = Db::query($sql); //// if ($res !== false){ //// // 获取队列的键名 //// $queueKey = 'default'; //// // 删除队列 //// Cache::store('redis')->handler()->del($queueKey); //// $redis->rm($taskIdentifier); //// } // if ($res !== false){ // // 更新任务状态为成功 // if ($taskId) { // Db::name('queue_tasks') // ->where('id', $taskId) // ->update([ // 'status' => 'success', // 'end_time' => date('Y-m-d H:i:s'), // 'result' => json_encode(['success' => true, 'message' => '工资计算完成']), // 'update_time' => date('Y-m-d H:i:s') // ]); // } // // // 清理缓存 // $queueKey = 'salary_calculation'; // Cache::store('redis')->handler()->del($queueKey); // $redis->rm($taskIdentifier); // // Log::info('工资计算任务执行成功', ['date' => $date]); // } else { // throw new \Exception('数据插入失败'); // } // // } catch (\Exception $e) { // Log::error('工资计算任务执行失败: ' . $e->getMessage()); // // // 更新任务状态为失败 // if ($taskId) { // Db::name('queue_tasks') // ->where('id', $taskId) // ->update([ // 'status' => 'failed', // 'end_time' => date('Y-m-d H:i:s'), // 'error' => $e->getMessage(), // 'update_time' => date('Y-m-d H:i:s') // ]); // } // // $redis->rm($taskIdentifier); // } // } //} namespace app\job; use think\Db; use think\Log; use think\queue\Job; use app\service\SalaryCalculationService; /** * 工资计算队列任务 */ class InsertDataJob { /** * 队列任务执行入口 * @param Job $job 队列任务对象 * @param array $data 任务参数 */ public function fire(Job $job, $data) { // 1. 基础参数校验 if (!is_array($data) || empty($data['task_id']) || empty($data['date'])) { Log::error('队列任务参数错误', ['data' => $data]); $job->delete(); // 参数错误直接删除任务 return; } $taskId = (int)$data['task_id']; $maxAttempts = 3; // 最大重试次数 try { // 2. 更新任务状态为执行中 $this->updateTaskStatus($taskId, 'processing', [ 'start_time' => date('Y-m-d H:i:s'), 'retry_count' => $job->attempts() ]); // 3. 调用工资计算服务 $service = new SalaryCalculationService(); $result = $service->calculateSalary($data); // 4. 任务成功处理 if ($result['success']) { $job->delete(); // 删除队列任务 $this->updateTaskStatus($taskId, 'success', [ 'end_time' => date('Y-m-d H:i:s'), 'result' => json_encode($result, JSON_UNESCAPED_UNICODE), 'update_time' => date('Y-m-d H:i:s') ]); Log::info('工资计算队列任务执行成功', ['task_id' => $taskId, 'date' => $data['date']]); return; } // 5. 任务失败处理(未达到最大重试次数则重试) throw new \Exception($result['message'] ?? '工资计算服务执行失败'); } catch (\Exception $e) { Log::error('工资计算队列任务执行失败', [ 'task_id' => $taskId, 'error' => $e->getMessage(), 'trace' => $e->getTraceAsString(), 'attempts' => $job->attempts() ]); // 6. 达到最大重试次数则标记失败,否则重试 if ($job->attempts() >= $maxAttempts) { $job->delete(); // 删除任务,停止重试 $this->updateTaskStatus($taskId, 'failed', [ 'end_time' => date('Y-m-d H:i:s'), 'error' => $e->getMessage(), 'retry_count' => $job->attempts(), 'update_time' => date('Y-m-d H:i:s') ]); } else { $job->release(5); // 5秒后重试 } } } /** * 更新任务状态 * @param int $taskId 任务ID * @param string $status 状态:processing/success/failed * @param array $data 附加更新字段 */ private function updateTaskStatus(int $taskId, string $status, array $data = []) { if ($taskId <= 0) return; $updateData = array_merge([ 'status' => $status, 'update_time' => date('Y-m-d H:i:s') ], $data); Db::name('queue_tasks')->where('id', $taskId)->update($updateData); } }