isPost() == false) { $this->error('非法请求'); } $params = Request::instance()->param(); if (!isset($params['month']) || empty($params['month'])) { $this->error('月份参数错误'); } $month = trim($params['month']); $sysId = $params['sys_id'] ?? ''; // 检查是否有正在执行的任务 $runningTask = Db::name('queue_tasks') ->where('task_type', 'cost_calculation') ->where('task_data', 'like', '%"month":"' . $month . '"%') ->where('status', 'in', ['pending', 'processing']) ->find(); if ($runningTask) { $this->success('该月份的成本计算任务已在执行中,请勿重复提交'); } // 检查工资计算是否在执行中(可选) $salaryRunning = Db::name('queue_tasks') ->where('task_type', 'salary_calculation') ->where('task_data', 'like', '%"date":"' . $month . '"%') ->where('status', 'in', ['pending', 'processing']) ->find(); if ($salaryRunning) { $this->error('该月份的工资计算正在执行中,建议等待工资计算完成后再进行成本计算'); } // 准备任务数据 $taskData = [ 'month' => $month, 'sys_id' => $sysId, 'user_id' => session('user_id') ?? 0, 'user_name' => session('user_name') ?? '系统', 'request_time' => date('Y-m-d H:i:s') ]; // 先创建任务记录 $taskId = Db::name('queue_tasks')->insertGetId([ 'task_type' => 'cost_calculation', 'task_data' => json_encode($taskData, JSON_UNESCAPED_UNICODE), 'status' => 'pending', 'queue_name' => 'cost_calculation', 'create_time' => date('Y-m-d H:i:s') ]); // 添加到任务数据中 $taskData['task_id'] = $taskId; // 提交到成本计算队列 $jobHandlerClassName = 'app\job\UnifiedCostCalculationJob'; $queueName = 'cost_calculation'; $jobId = Queue::push($jobHandlerClassName, $taskData, $queueName); if ($jobId) { // 更新任务记录 Db::name('queue_tasks') ->where('id', $taskId) ->update([ 'job_id' => $jobId, 'update_time' => date('Y-m-d H:i:s') ]); $this->success('成本计算任务已提交到队列,请稍后查看结果', null, [ 'task_id' => $taskId, 'job_id' => $jobId, 'month' => $month, 'queue_name' => $queueName ]); } else { // 更新任务状态为失败 Db::name('queue_tasks') ->where('id', $taskId) ->update([ 'status' => 'failed', 'error' => '任务提交到队列失败', 'update_time' => date('Y-m-d H:i:s') ]); $this->error('成本计算任务提交失败'); } } /** * 查询成本计算状态 * @ApiMethod GET * @param string month 年月 */ public function status() { $month = Request::instance()->param('month'); if (empty($month)) { $this->error('月份参数错误'); } // 查询任务记录 $task = Db::name('queue_tasks') ->where('task_type', 'cost_calculation') ->where('task_data', 'like', '%"month":"' . $month . '"%') ->order('id', 'desc') ->find(); if ($task) { $result = json_decode($task['result'] ?? '{}', true); $taskData = json_decode($task['task_data'] ?? '{}', true); $response = [ 'exists' => true, 'task_id' => $task['id'], 'month' => $month, 'status' => $task['status'], 'queue_name' => $task['queue_name'], 'job_id' => $task['job_id'], 'start_time' => $task['start_time'], 'end_time' => $task['end_time'], 'retry_count' => $task['retry_count'], 'result' => $result, 'error' => $task['error'] ?? '', 'create_time' => $task['create_time'], 'user_info' => [ 'user_id' => $taskData['user_id'] ?? 0, 'user_name' => $taskData['user_name'] ?? '' ] ]; } else { $response = ['exists' => false, 'month' => $month]; } $this->success('查询成功', null, $response); } /** * 获取成本计算任务列表 * @ApiMethod GET */ public function list() { $page = Request::instance()->param('page', 1); $limit = Request::instance()->param('limit', 20); $month = Request::instance()->param('month'); $status = Request::instance()->param('status'); $query = Db::name('queue_tasks') ->where('task_type', 'cost_calculation'); if ($month) { $query->where('task_data', 'like', '%"month":"' . $month . '"%'); } if ($status) { $query->where('status', $status); } $total = $query->count(); $list = $query->order('id', 'desc') ->page($page, $limit) ->select(); // 解析任务数据 foreach ($list as &$item) { $taskData = json_decode($item['task_data'] ?? '{}', true); $item['month'] = $taskData['month'] ?? ''; $item['user_name'] = $taskData['user_name'] ?? ''; $item['request_time'] = $taskData['request_time'] ?? ''; if (!empty($item['result'])) { $item['result_data'] = json_decode($item['result'], true); } } $this->success('查询成功', null, [ 'list' => $list, 'total' => $total, 'page' => $page, 'pages' => ceil($total / $limit) ]); } /** * 手动重试失败的任务 * @ApiMethod POST * @param int task_id 任务ID */ public function retry() { $taskId = Request::instance()->param('task_id'); if (empty($taskId)) { $this->error('任务ID不能为空'); } $task = Db::name('queue_tasks') ->where('id', $taskId) ->where('task_type', 'cost_calculation') ->where('status', 'failed') ->find(); if (!$task) { $this->error('任务不存在或无法重试'); } // 解析原任务数据 $taskData = json_decode($task['task_data'], true); $taskData['task_id'] = $taskId; $taskData['retry_time'] = date('Y-m-d H:i:s'); // 更新原任务状态 Db::name('queue_tasks') ->where('id', $taskId) ->update([ 'status' => 'retrying', 'update_time' => date('Y-m-d H:i:s') ]); // 提交到队列 $jobHandlerClassName = 'app\job\CostCalculationJob'; $queueName = 'cost_calculation'; $jobId = Queue::push($jobHandlerClassName, $taskData, $queueName); if ($jobId) { $this->success('任务已重新提交到队列', null, [ 'task_id' => $taskId, 'new_job_id' => $jobId, 'queue_name' => $queueName ]); } else { $this->error('任务重试失败'); } } }