request->param(); $service = new ImageService(); $service->handleImage($params); $this->success('任务成功提交至队列'); } /** * 查询队列列表 * 统计文件对应的队列情况 */ public function get_queue_logs() { $params = $this->request->param('old_image_file', ''); $queue_logs = Db::name('queue_logs') ->where('old_image_file', $params) ->order('id desc') ->select(); foreach ($queue_logs as &$log) { $taskId = $log['id']; // 从 image_task_log 表统计状态 $statusCount = Db::name('image_task_log') ->field('status, COUNT(*) as count') ->where('task_id', $taskId) ->group('status') ->select(); $log['已完成数量'] = 0; $log['处理中数量'] = 0; $log['排队中的数量'] = 0; $log['失败数量'] = 0; foreach ($statusCount as $item) { switch ($item['status']) { case 0: $log['排队中的数量'] = $item['count']; break; case 1: $log['处理中数量'] = $item['count']; break; case 2: $log['已完成数量'] = $item['count']; break; case -1: $log['失败数量'] = $item['count']; break; } } // ✅ 只保留排队中的数量大于 0 的记录 if ($log['排队中的数量'] > 0) { $result[] = $log; } } return json([ 'code' => 0, 'msg' => '查询成功', 'data' => $result, 'count' => count($result) ]); } /** * 查询总队列状态(统计当前处理的数据量) */ public function queueStats() { $statusList = Db::name('image_task_log') ->field('status, COUNT(*) as total') ->where('create_time', '>=', date('Y-m-d 00:00:00')) ->group('status') ->select(); $statusCount = []; foreach ($statusList as $item) { $statusCount[$item['status']] = $item['total']; } // 总数为所有状态和 $total = array_sum($statusCount); //获取队列当前状态 $statusText = Db::name('queue_logs')->order('id desc')->value('status'); return json([ 'code' => 0, 'msg' => '获取成功', 'data' => [ '总任务数' => $total, '待处理' => $statusCount[0] ?? 0, '处理中' => $statusCount[1] ?? 0, '成功' => $statusCount[2] ?? 0, '失败' => $statusCount[-1] ?? 0, '当前状态' => $statusText ] ]); } /** * 显示当前运行中的队列监听进程 */ public function viewQueueStatus() { $redis = new \Redis(); $redis->connect('127.0.0.1', 6379); $redis->auth('123456'); $redis->select(15); $key = 'queues:imgtotxt'; // 判断 key 是否存在,避免报错 if (!$redis->exists($key)) { return json([ 'code' => 0, 'msg' => '查询成功,队列为空', 'count' => 0, 'tasks_preview' => [] ]); } $count = $redis->lLen($key); $list = $redis->lRange($key, 0, 9); // 解码 JSON 内容,确保每一项都有效 $parsed = array_filter(array_map(function ($item) { return json_decode($item, true); }, $list), function ($item) { return !is_null($item); }); return json([ 'code' => 0, 'msg' => '查询成功', 'count' => $count, 'tasks_preview' => $parsed ]); } /** * 清空队列并删除队列日志记录 */ public function stopQueueProcesses() { $redis = new \Redis(); $redis->connect('127.0.0.1', 6379); $redis->auth('123456'); $redis->select(15); $key_txttoimg = 'queues:txttoimg'; $key_txttotxt = 'queues:txttotxt'; $key_imgtotxt = 'queues:imgtotxt'; $count = $redis->lLen($key_txttoimg) + $redis->lLen($key_txttotxt) + $redis->lLen($key_imgtotxt); if ($count === 0) { return json([ 'code' => 1, 'msg' => '暂无队列需要停止' ]); } // 清空 Redis 队列 $redis->del($key_txttoimg); $redis->del($key_txttotxt); $redis->del($key_imgtotxt); // 删除数据库中 log = '队列中' 的记录 Db::name('image_task_log') ->where('log', '队列中') ->delete(); return json([ 'code' => 0, 'msg' => '已成功停止队列任务并清除队列日志' ]); } /** * 开启队列任务 * 暂时用不到、服务器已开启自动开启队列模式 */ // public function kaiStats() // { // // 判断是否已有监听进程在运行 // $check = shell_exec("ps aux | grep 'queue:listen' | grep -v grep"); // if ($check) { // return json([ // 'code' => 1, // 'msg' => '监听进程已存在,请勿重复启动' // ]); // } // // 启动监听 // $command = 'nohup php think queue:listen --queue --timeout=300 --sleep=3 --memory=256 > /var/log/job_queue.log 2>&1 &'; // exec($command, $output, $status); // if ($status === 0) { // return json([ // 'code' => 0, // 'msg' => '队列监听已启动' // ]); // } else { // return json([ // 'code' => 1, // 'msg' => '队列启动失败', // 'output' => $output // ]); // } // } }