InsertDataJob.php 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. <?php
  2. //
  3. //namespace app\job;
  4. //
  5. ////use think\Db;
  6. ////use think\Cache;
  7. ////use think\cache\driver\Redis;
  8. //
  9. //use think\Db;
  10. //use think\Cache;
  11. //use think\Log;
  12. //use think\cache\driver\Redis;
  13. //
  14. //class InsertDataJob
  15. //{
  16. //// protected $data;
  17. ////
  18. //// public function __construct($data)
  19. //// {
  20. //// $this->data = $data;
  21. //// }
  22. //
  23. // protected $data;
  24. // protected $taskData;
  25. //
  26. // public function __construct($data)
  27. // {
  28. // $this->data = $data;
  29. // }
  30. // public function handle()
  31. // {
  32. //// $options = [
  33. //// 'host' => '127.0.0.1',
  34. //// 'port' => 6379,
  35. //// 'password' => '',
  36. //// 'select' => 15,
  37. //// 'timeout' => 0,
  38. //// 'expire' => 0,
  39. //// 'persistent' => false,
  40. //// 'prefix' => '',
  41. //// ];
  42. //// $redis = new Redis($options);
  43. //// $taskIdentifier = md5(json_encode('date'));
  44. //// $handData = [];
  45. // $options = [
  46. // 'host' => '127.0.0.1',
  47. // 'port' => 6379,
  48. // 'password' => '123456',
  49. // 'select' => 15,
  50. // 'timeout' => 0,
  51. // 'expire' => 0,
  52. // 'persistent' => false,
  53. // 'prefix' => '',
  54. // ];
  55. // $redis = new Redis($options);
  56. //
  57. // // 从任务数据中获取月份
  58. // $date = $this->taskData['date'] ?? '';
  59. // $taskId = $this->taskData['task_id'] ?? 0;
  60. // $taskIdentifier = md5('salary_calculation_' . $date);
  61. //
  62. // try {
  63. // // 更新任务状态为执行中
  64. // if ($taskId) {
  65. // Db::name('queue_tasks')
  66. // ->where('id', $taskId)
  67. // ->update([
  68. // 'status' => 'processing',
  69. // 'start_time' => date('Y-m-d H:i:s'),
  70. // 'update_time' => date('Y-m-d H:i:s')
  71. // ]);
  72. // }
  73. //
  74. // // 原有的数据处理逻辑
  75. // $handData = [];
  76. // foreach ($this->data as $key=>$value){
  77. // $handData[$key]['sczl_gdbh'] = $value['sczl_gdbh'];
  78. // $handData[$key]['sczl_yjno'] = $value['sczl_yjno'];
  79. // $handData[$key]['sczl_gxh'] = $value['sczl_gxh'];
  80. // $handData[$key]['sczl_type'] = $value['sczl_type'];
  81. // $handData[$key]['sczl_rq'] = $value['sczl_rq'];
  82. // $handData[$key]['sczl_jtbh'] = $value['sczl_jtbh'];
  83. // $handData[$key]['班组车头产量'] = $value['班组车头产量'];
  84. // $handData[$key]['工价系数'] = $value['工价系数'];
  85. // $handData[$key]['工序难度系数'] = $value['工序难度系数'];
  86. // $handData[$key]['装版工时'] = $value['装版工时'];
  87. // $handData[$key]['保养工时'] = $value['保养工时'];
  88. // $handData[$key]['打样工时'] = $value['打样工时'];
  89. // $handData[$key]['异常停机工时'] = $value['异常停机工时'];
  90. // $handData[$key]['车头产量占用机时'] = $value['车头产量占用机时'];
  91. // $handData[$key]['日定额'] = $value['日定额'];
  92. // $handData[$key]['千件工价'] = $value['千件工价'];
  93. // $handData[$key]['补产标准'] = $value['补产标准'];
  94. // $handData[$key]['班组换算产量'] = $value['班组换算产量'];
  95. // $handData[$key]['计时补差额工资'] = $value['计时补差额工资'];
  96. // $handData[$key]['bh'] = $value['bh'];
  97. // $handData[$key]['xm'] = $value['xm'];
  98. // $handData[$key]['Rate'] = $value['Rate'];
  99. // $handData[$key]['sczl_ms'] = $value['sczl_ms'];
  100. // $handData[$key]['工时占比'] = $value['工时占比'];
  101. // $handData[$key]['达标定额'] = $value['达标定额'];
  102. // $handData[$key]['个人计件工资'] = $value['个人计件工资'];
  103. // $handData[$key]['个人加班工资'] = $value['个人加班工资'];
  104. // $handData[$key]['UniqID'] = $value['UniqID'];
  105. // $handData[$key]['sys_ny'] = $value['sys_ny'];
  106. // $handData[$key]['sys_rq'] = $value['sys_rq'];
  107. // $handData[$key]['sys_id'] = $value['sys_id'];
  108. // $handData[$key]['法定天数'] = $value['法定天数'];
  109. // }
  110. // $sql =Db::name('绩效工资汇总')->fetchSql(true)->insertAll($handData);
  111. // $res = Db::query($sql);
  112. //// if ($res !== false){
  113. //// // 获取队列的键名
  114. //// $queueKey = 'default';
  115. //// // 删除队列
  116. //// Cache::store('redis')->handler()->del($queueKey);
  117. //// $redis->rm($taskIdentifier);
  118. //// }
  119. // if ($res !== false){
  120. // // 更新任务状态为成功
  121. // if ($taskId) {
  122. // Db::name('queue_tasks')
  123. // ->where('id', $taskId)
  124. // ->update([
  125. // 'status' => 'success',
  126. // 'end_time' => date('Y-m-d H:i:s'),
  127. // 'result' => json_encode(['success' => true, 'message' => '工资计算完成']),
  128. // 'update_time' => date('Y-m-d H:i:s')
  129. // ]);
  130. // }
  131. //
  132. // // 清理缓存
  133. // $queueKey = 'salary_calculation';
  134. // Cache::store('redis')->handler()->del($queueKey);
  135. // $redis->rm($taskIdentifier);
  136. //
  137. // Log::info('工资计算任务执行成功', ['date' => $date]);
  138. // } else {
  139. // throw new \Exception('数据插入失败');
  140. // }
  141. //
  142. // } catch (\Exception $e) {
  143. // Log::error('工资计算任务执行失败: ' . $e->getMessage());
  144. //
  145. // // 更新任务状态为失败
  146. // if ($taskId) {
  147. // Db::name('queue_tasks')
  148. // ->where('id', $taskId)
  149. // ->update([
  150. // 'status' => 'failed',
  151. // 'end_time' => date('Y-m-d H:i:s'),
  152. // 'error' => $e->getMessage(),
  153. // 'update_time' => date('Y-m-d H:i:s')
  154. // ]);
  155. // }
  156. //
  157. // $redis->rm($taskIdentifier);
  158. // }
  159. // }
  160. //}
  161. namespace app\job;
  162. use think\Db;
  163. use think\Log;
  164. use think\queue\Job;
  165. use app\service\SalaryCalculationService;
  166. /**
  167. * 工资计算队列任务
  168. */
  169. class InsertDataJob
  170. {
  171. /**
  172. * 队列任务执行入口
  173. * @param Job $job 队列任务对象
  174. * @param array $data 任务参数
  175. */
  176. public function fire(Job $job, $data)
  177. {
  178. // 1. 基础参数校验
  179. if (!is_array($data) || empty($data['task_id']) || empty($data['date'])) {
  180. Log::error('队列任务参数错误', [
  181. 'data' => json_encode($data, JSON_UNESCAPED_UNICODE) // 序列化复杂数组
  182. ]);
  183. $job->delete(); // 参数错误直接删除任务
  184. return;
  185. }
  186. $taskId = (int)$data['task_id'];
  187. $maxAttempts = 3; // 最大重试次数
  188. try {
  189. // 2. 更新任务状态为执行中
  190. $this->updateTaskStatus($taskId, 'processing', [
  191. 'start_time' => date('Y-m-d H:i:s'),
  192. 'retry_count' => $job->attempts()
  193. ]);
  194. // 3. 调用工资计算服务
  195. $service = new SalaryCalculationService();
  196. $result = $service->calculateSalary($data);
  197. // 4. 任务成功处理
  198. if ($result['success']) {
  199. $job->delete(); // 删除队列任务
  200. $this->updateTaskStatus($taskId, 'success', [
  201. 'end_time' => date('Y-m-d H:i:s'),
  202. 'result' => json_encode($result, JSON_UNESCAPED_UNICODE),
  203. 'update_time' => date('Y-m-d H:i:s')
  204. ]);
  205. Log::info('工资计算队列任务执行成功', [
  206. 'task_id' => (int)$taskId,
  207. 'date' => (string)($data['date'] ?? '')
  208. ]);
  209. return;
  210. }
  211. // 5. 任务失败处理(未达到最大重试次数则重试)
  212. throw new \Exception($result['message'] ?? '工资计算服务执行失败');
  213. } catch (\Exception $e) {
  214. Log::error('工资计算队列任务执行失败', [
  215. 'task_id' => (int)$taskId,
  216. 'error' => (string)$e->getMessage(),
  217. 'trace' => substr((string)$e->getTraceAsString(), 0, 2000), // 截断超长trace
  218. 'attempts' => (int)$job->attempts()
  219. ]);
  220. // 6. 达到最大重试次数则标记失败,否则重试
  221. if ($job->attempts() >= $maxAttempts) {
  222. $job->delete(); // 删除任务,停止重试
  223. $this->updateTaskStatus($taskId, 'failed', [
  224. 'end_time' => date('Y-m-d H:i:s'),
  225. 'error' => $e->getMessage(),
  226. 'retry_count' => $job->attempts(),
  227. 'update_time' => date('Y-m-d H:i:s')
  228. ]);
  229. } else {
  230. $job->release(5); // 5秒后重试
  231. }
  232. }
  233. }
  234. /**
  235. * 更新任务状态
  236. * @param int $taskId 任务ID
  237. * @param string $status 状态:processing/success/failed
  238. * @param array $data 附加更新字段
  239. */
  240. private function updateTaskStatus(int $taskId, string $status, array $data = [])
  241. {
  242. if ($taskId <= 0) return;
  243. $updateData = array_merge([
  244. 'status' => $status,
  245. 'update_time' => date('Y-m-d H:i:s')
  246. ], $data);
  247. Db::name('queue_tasks')->where('id', $taskId)->update($updateData);
  248. }
  249. }