Kaynağa Gözat

超节损队列

qiuenguang 11 ay önce
ebeveyn
işleme
9d7f345e95

+ 56 - 37
application/api/controller/OrderSuperLoss.php

@@ -4,6 +4,7 @@ namespace app\api\controller;
 
 use app\common\controller\Api;
 use think\Cache;
+use think\cache\driver\Redis;
 use \think\Request;
 use \think\Db;
 /**
@@ -1254,7 +1255,6 @@ class OrderSuperLoss extends Api
      */
     public function OneOrderSuperLoss($gdbh,$yjno)
     {
-//        $param = $this->request->param();
         $param = [
             'gdbh' => $gdbh,
             'yjno' => $yjno
@@ -1675,17 +1675,7 @@ class OrderSuperLoss extends Api
                 '车间名称' => $value['Gy0_SITE']
             ];
         }
-
         return $result;
-//        halt($result);
-//        \db('工单_质量考核汇总')
-//            ->where('Gy0_gdbh', $param['gdbh'])
-//            ->where('印件及工序', $param['yjno'])
-//            ->delete();
-//        $sql = \db('工单_质量考核汇总')
-//            ->fetchSql(true)
-//            ->insertAll($result);
-//        $res = \db()->query($sql);
     }
 
 
@@ -2079,20 +2069,26 @@ class OrderSuperLoss extends Api
     //循环插入超节损数据
     public function orderLossData()
     {
+        if ($this->request->isGet() === false){
+            $this->error('请求错误');
+        }
         $param = $this->request->param();
         $where = [];
-        if (isset($param['gdbh']) && isset($param['yjno']) && !empty($param['gdbh']) && !empty($param['yjno'])){
+        // 判断接收数据,创建查询条件
+        if (isset($param['gdbh']) && isset($param['yjno']) && !empty($param['gdbh']) && !empty($param['yjno'])) {
             $where['a.jjcp_gdbh'] = $param['gdbh'];
             $where['a.jjcp_yjno'] = $param['yjno'];
         }
-        if (isset($param['mouth'])){
-            $where['a.jjcp_sj'] = ['like',$param['mouth'].'%'];
+        if (isset($param['mouth'])) {
+            $where['a.jjcp_sj'] = ['like', $param['mouth'] . '%'];
         }
-        $where['a.jjcp_smb'] = ['like','末%'];
-        $list = \db('成品入仓')
+        $where['a.jjcp_smb'] = ['like', '末%'];
+
+        // 根据查询条件查询出需要计算超节损的工单和印件
+        $list = Db::name('成品入仓')
             ->alias('a')
-            ->join('设备_产量计酬 b','a.jjcp_gdbh = b.sczl_gdbh AND a.jjcp_yjno = b.sczl_yjno')
-            ->join('物料_收发记录 c','a.jjcp_gdbh = c.st_gdbh AND a.jjcp_cpdh = c.cpdh')
+            ->join('设备_产量计酬 b', 'a.jjcp_gdbh = b.sczl_gdbh AND a.jjcp_yjno = b.sczl_yjno')
+            ->join('物料_收发记录 c', 'a.jjcp_gdbh = c.st_gdbh AND a.jjcp_cpdh = c.cpdh')
             ->where($where)
             ->where(function ($query) {
                 $query->where('c.仓库编号', '101')
@@ -2101,29 +2097,52 @@ class OrderSuperLoss extends Api
             ->field('a.jjcp_gdbh,a.jjcp_yjno')
             ->group('a.jjcp_gdbh,a.jjcp_yjno')
             ->select();
-        $i = 0;
-        foreach ($list as $item){
-            $result = $this->OneOrderSuperLoss($item['jjcp_gdbh'],$item['jjcp_yjno']);
-            \db('工单_质量考核汇总')
-            ->where('Gy0_gdbh', $item['jjcp_gdbh'])
-            ->where('印件及工序', $item['jjcp_yjno'])
-            ->delete();
-            $sql = \db('工单_质量考核汇总')
-                ->fetchSql(true)
-                ->insertAll($result);
-            $res = \db()->query($sql);
-            if ($res === false){
-                $i++;
+
+        // 创建redis
+        $redis = new \Redis();
+        $redis->connect('127.0.0.1', 6379, 0);
+        $redis->auth('');
+        $redis->select(14);
+
+        // 生成批次ID
+        $batchId = 'batch_' . uniqid() . '_' . time();
+        $totalTasks = count($list);
+
+        //设置队列信息
+        $pipe = $redis->pipeline();
+        try {
+            // 设置批次信息
+            $pipe->hmset("batch:{$batchId}", [
+                'total'    => $totalTasks,
+                'processed' => 0,
+                'status'    => 'processing',
+                'created_at' => date('Y-m-d H:i:s')
+            ]);
+            $pipe->expire("batch:{$batchId}", 3600);
+
+            // 批量添加任务
+            foreach ($list as $item) {
+                $task = [
+                    'batch_id' => $batchId,
+                    'gdbh'     => $item['jjcp_gdbh'],
+                    'yjno'     => $item['jjcp_yjno'],
+                    'retries'  => 0
+                ];
+                $pipe->rpush('order_loss_queue', json_encode($task));
             }
+
+            // 执行管道
+            $pipe->exec();
+        } catch (\Exception $e) {
+            $pipe->discard();
+            $this->error('队列操作失败: ' . $e->getMessage());
         }
-        if ($i === 0){
-            $this->success('成功');
-        }else{
-            $this->error('失败');
-        }
+
+        $this->success('任务已加入队列处理', [
+            'batch_id'    => $batchId,
+            'queue_count' => $redis->llen('order_loss_queue')]);
     }
 }
 
 
 
-

+ 160 - 0
application/job/OrderLossQueueWorker.php

@@ -0,0 +1,160 @@
+<?php
+namespace app\job;
+
+use app\api\controller\OrderSuperLoss;
+use app\common\library\token\driver\Redis;
+use think\Db;
+use think\Log;
+
+class OrderLossQueueWorker
+{
+    const MAX_RETRIES = 3;
+    const QUEUE_TIMEOUT = 30; // 秒
+
+    protected $redis;
+
+    public function __construct()
+    {
+        $this->initRedis();
+    }
+
+    private function initRedis()
+    {
+        $options = [
+            'host'       => '127.0.0.1',
+            'port'       => 6379,
+            'password'   => '',
+            'select'     => 15,
+            'timeout'    => 0,
+            'expire'     => 0,
+            'persistent' => false,
+            'prefix'     => '',
+        ];
+        $this->redis = new Redis($options);
+    }
+
+    public function start()
+    {
+        while (true) {
+            $this->processQueue();
+        }
+    }
+
+    private function processQueue()
+    {
+        try {
+            // 阻塞式获取任务
+            $taskData = $this->redis->brPop(['order_loss_queue'], self::QUEUE_TIMEOUT);
+
+            if ($taskData === null) {
+                return; // 超时继续循环
+            }
+
+            $task = $this->parseTaskData($taskData);
+            $this->handleTask($task);
+
+        } catch (\Exception $e) {
+            Log::error("队列处理异常: " . $e->getMessage());
+        }
+    }
+
+    private function parseTaskData($taskData)
+    {
+        if (!is_array($taskData) || count($taskData) < 2) {
+            throw new \Exception("无效的任务数据格式");
+        }
+
+        $task = json_decode($taskData[1], true);
+
+        if (json_last_error() !== JSON_ERROR_NONE) {
+            throw new \Exception("任务JSON解析失败: " . json_last_error_msg());
+        }
+
+        return $task;
+    }
+
+    private function handleTask($task)
+    {
+        try {
+            // 执行核心业务逻辑
+            $this->processTask($task);
+
+            // 更新处理进度
+            $this->updateProgress($task['batch_id']);
+
+            // 检查批次完成状态
+            $this->checkBatchCompletion($task['batch_id']);
+
+        } catch (\Exception $e) {
+            $this->handleTaskFailure($task, $e);
+        }
+    }
+
+    private function processTask($task)
+    {
+        // 原有业务逻辑保持不变
+        $api = new OrderSuperLoss();
+        $result = $api->OneOrderSuperLoss($task['gdbh'], $task['yjno']);
+
+        // 数据库操作
+        Db::name('工单_质量考核汇总')
+            ->where('Gy0_gdbh', $task['gdbh'])
+            ->where('印件及工序', $task['yjno'])
+            ->delete();
+
+        Db::name('工单_质量考核汇总')
+            ->insertAll($result);
+    }
+
+    private function updateProgress($batchId)
+    {
+        $this->redis->hIncrBy("batch:{$batchId}", 'processed', 1);
+    }
+
+    private function checkBatchCompletion($batchId)
+    {
+        $batch = $this->redis->hGetAll("batch:{$batchId}");
+
+        if ($batch &&
+            isset($batch['total'], $batch['processed']) &&
+            $batch['processed'] >= $batch['total']
+        ) {
+            $this->markBatchCompleted($batchId);
+        }
+    }
+
+    private function markBatchCompleted($batchId)
+    {
+        $this->redis->hMSet("batch:{$batchId}", [
+            'status' => 'completed',
+            'completed_at' => date('Y-m-d H:i:s')
+        ]);
+
+        $this->redis->publish("batch:{$batchId}", json_encode([
+            'status' => 'completed',
+            'batch_id' => $batchId
+        ]));
+    }
+
+    private function handleTaskFailure($task, \Exception $e)
+    {
+        if ($task['retries'] < self::MAX_RETRIES) {
+            $this->retryTask($task);
+            Log::warning("任务重试 #{$task['retries']} [{$task['gdbh']}-{$task['yjno']}]");
+        } else {
+            $this->markBatchFailed($task['batch_id']);
+            Log::error("任务最终失败: {$e->getMessage()} ". json_encode($task));
+        }
+    }
+
+    private function retryTask(&$task)
+    {
+        $task['retries']++;
+        $this->redis->rPush('order_loss_queue', json_encode($task));
+    }
+
+    private function markBatchFailed($batchId)
+    {
+        $this->redis->hSet("batch:{$batchId}", 'status', 'failed');
+    }
+}