app.py 15 KB


  1. import os
  2. import threading
  3. import queue
  4. import json
  5. import tkinter as tk
  6. from tkinter import ttk, filedialog, messagebox
  7. from datetime import datetime
  8. from PIL import Image
  9. class SequentialMirrorApp:
  10. def __init__(self, root):
  11. self.root = root
  12. self.root.title("顺序镜像扩展工具")
  13. self.root.geometry("650x500")
  14. self.setup_ui()
  15. # 初始化状态
  16. self.running = False
  17. self.processed_files = set()
  18. self.current_log = {}
  19. self.queue = queue.Queue()
  20. self.lock = threading.Lock()
  21. # 加载历史记录
  22. self.load_last_log()
  23. self.root.after(100, self.process_queue)
  24. def setup_ui(self):
  25. """初始化用户界面"""
  26. main_frame = ttk.Frame(self.root, padding=15)
  27. main_frame.pack(fill=tk.BOTH, expand=True)
  28. # 输入输出设置
  29. self.create_io_section(main_frame)
  30. # 扩展参数设置
  31. self.create_expansion_params(main_frame)
  32. # 进度显示
  33. self.create_progress_section(main_frame)
  34. # 操作按钮
  35. self.create_control_buttons(main_frame)
  36. def create_io_section(self, parent):
  37. """创建输入输出组件"""
  38. io_frame = ttk.LabelFrame(parent, text="文件设置")
  39. io_frame.pack(fill=tk.X, pady=5)
  40. # 输入文件夹
  41. ttk.Label(io_frame, text="输入路径:").grid(row=0, column=0, sticky=tk.W)
  42. self.input_entry = ttk.Entry(io_frame, width=45)
  43. self.input_entry.grid(row=0, column=1, padx=5)
  44. ttk.Button(io_frame, text="浏览", command=self.browse_input).grid(row=0, column=2)
  45. # 输出文件夹
  46. ttk.Label(io_frame, text="输出路径:").grid(row=1, column=0, sticky=tk.W)
  47. self.output_entry = ttk.Entry(io_frame, width=45)
  48. self.output_entry.grid(row=1, column=1, padx=5)
  49. ttk.Button(io_frame, text="浏览", command=self.browse_output).grid(row=1, column=2)
  50. def create_expansion_params(self, parent):
  51. """创建扩展参数设置"""
  52. param_frame = ttk.LabelFrame(parent, text="扩展参数(像素)")
  53. param_frame.pack(fill=tk.X, pady=5)
  54. # 扩展方向参数
  55. directions = [
  56. ("下边扩展:", "bottom", 10),
  57. ("右边扩展:", "right", 20),
  58. ("上边扩展:", "top", 15),
  59. ("左边扩展:", "left", 5)
  60. ]
  61. self.pad_entries = {}
  62. for idx, (label, direction, default) in enumerate(directions):
  63. row, col = divmod(idx, 2)
  64. frame = ttk.Frame(param_frame)
  65. frame.grid(row=row, column=col, padx=5, pady=5, sticky=tk.W)
  66. ttk.Label(frame, text=label).pack(side=tk.LEFT)
  67. entry = ttk.Entry(frame, width=8)
  68. entry.insert(0, str(default))
  69. entry.pack(side=tk.LEFT)
  70. self.pad_entries[direction] = entry
  71. def create_progress_section(self, parent):
  72. """创建进度显示组件"""
  73. progress_frame = ttk.LabelFrame(parent, text="处理进度")
  74. progress_frame.pack(fill=tk.BOTH, expand=True, pady=5)
  75. # 进度条
  76. self.progress = ttk.Progressbar(progress_frame, orient=tk.HORIZONTAL, mode='determinate')
  77. self.progress.pack(fill=tk.X, padx=10, pady=5)
  78. # 统计信息
  79. stats_frame = ttk.Frame(progress_frame)
  80. stats_frame.pack(pady=5)
  81. self.stats = {
  82. 'total': ttk.Label(stats_frame, text="总文件数: 0"),
  83. 'processed': ttk.Label(stats_frame, text="已完成: 0"),
  84. 'failed': ttk.Label(stats_frame, text="失败: 0"),
  85. 'remaining': ttk.Label(stats_frame, text="剩余: 0")
  86. }
  87. for lbl in self.stats.values():
  88. lbl.pack(side=tk.LEFT, padx=15)
  89. def create_control_buttons(self, parent):
  90. """创建控制按钮"""
  91. btn_frame = ttk.Frame(parent)
  92. btn_frame.pack(pady=10)
  93. self.start_btn = ttk.Button(btn_frame, text="开始处理", command=self.start_processing)
  94. self.start_btn.pack(side=tk.LEFT, padx=15)
  95. ttk.Button(btn_frame, text="清除记录", command=self.clear_log).pack(side=tk.LEFT, padx=15)
  96. ttk.Button(btn_frame, text="退出", command=self.confirm_exit).pack(side=tk.RIGHT)
  97. def browse_input(self):
  98. """选择输入目录"""
  99. path = filedialog.askdirectory()
  100. if path:
  101. self.input_entry.delete(0, tk.END)
  102. self.input_entry.insert(0, path)
  103. def browse_output(self):
  104. """选择输出目录"""
  105. path = filedialog.askdirectory()
  106. if path:
  107. self.output_entry.delete(0, tk.END)
  108. self.output_entry.insert(0, path)
  109. def load_last_log(self):
  110. """加载历史日志"""
  111. log_path = os.path.join(os.path.dirname(__file__), "expand_log.json")
  112. if os.path.exists(log_path):
  113. try:
  114. with open(log_path, 'r') as f:
  115. self.current_log = json.load(f)
  116. self.processed_files = set(self.current_log.get('processed', []))
  117. except Exception as e:
  118. messagebox.showwarning("日志加载错误", f"无法读取历史记录:\n{str(e)}")
  119. def start_processing(self):
  120. """启动处理流程"""
  121. if self.running:
  122. return
  123. # 验证输入参数
  124. params = self.validate_inputs()
  125. if not params:
  126. return
  127. # 初始化状态
  128. self.running = True
  129. self.start_btn.config(state=tk.DISABLED)
  130. self.current_log = {
  131. "start_time": datetime.now().isoformat(),
  132. "params": params,
  133. "processed": [],
  134. "failed": []
  135. }
  136. # 启动处理线程
  137. threading.Thread(
  138. target=self.process_images,
  139. args=(params['input'], params['output'], params['pads']),
  140. daemon=True
  141. ).start()
  142. def validate_inputs(self):
  143. """验证输入有效性"""
  144. try:
  145. # 获取扩展参数
  146. pads = {
  147. 'bottom': int(self.pad_entries['bottom'].get()),
  148. 'right': int(self.pad_entries['right'].get()),
  149. 'top': int(self.pad_entries['top'].get()),
  150. 'left': int(self.pad_entries['left'].get())
  151. }
  152. if any(v < 0 for v in pads.values()):
  153. raise ValueError("扩展值不能为负数")
  154. # 验证路径
  155. input_dir = self.input_entry.get().strip()
  156. if not os.path.isdir(input_dir):
  157. raise FileNotFoundError("输入目录不存在")
  158. output_dir = self.output_entry.get().strip()
  159. if not output_dir:
  160. raise ValueError("输出目录不能为空")
  161. # 验证可写性
  162. test_file = os.path.join(output_dir, ".write_test")
  163. try:
  164. with open(test_file, "w") as f:
  165. f.write("test")
  166. os.remove(test_file)
  167. except Exception as e:
  168. raise ValueError(f"输出目录不可写: {str(e)}")
  169. return {
  170. 'input': input_dir,
  171. 'output': output_dir,
  172. 'pads': pads
  173. }
  174. except Exception as e:
  175. messagebox.showerror("输入错误", str(e))
  176. return None
  177. def process_images(self, input_dir, output_dir, pads):
  178. """核心处理逻辑"""
  179. try:
  180. os.makedirs(output_dir, exist_ok=True)
  181. valid_ext = ('.png', '.jpg', '.jpeg', '.webp', '.bmp')
  182. # 获取待处理文件列表
  183. all_files = [
  184. f for f in os.listdir(input_dir)
  185. if os.path.splitext(f)[1].lower() in valid_ext
  186. and f not in self.processed_files
  187. ]
  188. total = len(all_files)
  189. if not total:
  190. self.queue.put(("complete", (0, 0)))
  191. return
  192. success = 0
  193. failed = 0
  194. for idx, filename in enumerate(all_files, 1):
  195. if not self.running:
  196. break
  197. input_path = os.path.join(input_dir, filename)
  198. output_path = os.path.join(output_dir, filename)
  199. try:
  200. with Image.open(input_path) as img:
  201. # 转换RGB模式并处理
  202. current_img = img.convert('RGB')
  203. # 按顺序执行四个方向的扩展
  204. current_img = self.sequential_expand(current_img, pads)
  205. # 保存结果
  206. current_img.save(output_path, quality=95, optimize=True)
  207. # 更新处理记录
  208. with self.lock:
  209. self.processed_files.add(filename)
  210. self.current_log['processed'].append(filename)
  211. success += 1
  212. # 更新进度
  213. self.queue.put(("progress", (idx, total, filename, success, failed)))
  214. except Exception as e:
  215. with self.lock:
  216. self.current_log['failed'].append({
  217. 'file': filename,
  218. 'error': str(e),
  219. 'time': datetime.now().isoformat()
  220. })
  221. failed += 1
  222. self.queue.put(("error", (filename, str(e))))
  223. # 定期保存日志
  224. if idx % 5 == 0:
  225. self.save_log()
  226. self.queue.put(("complete", (success, failed)))
  227. self.save_log()
  228. except Exception as e:
  229. self.queue.put(("error", ("系统错误", str(e))))
  230. def sequential_expand(self, original_img, pads):
  231. """执行顺序镜像扩展"""
  232. current_img = original_img
  233. directions = [
  234. ('bottom', pads['bottom'], False), # 下边:垂直翻转
  235. ('right', pads['right'], True), # 右边:水平翻转
  236. ('top', pads['top'], False), # 上边:垂直翻转
  237. ('left', pads['left'], True) # 左边:水平翻转
  238. ]
  239. for direction, pad, flip_horizontal in directions:
  240. if pad <= 0:
  241. continue
  242. width, height = current_img.size
  243. pad = min(pad, (width if flip_horizontal else height))
  244. # 获取镜像区域
  245. crop_box = self.get_crop_box(direction, width, height, pad)
  246. mirrored = current_img.crop(crop_box).transpose(
  247. Image.FLIP_LEFT_RIGHT if flip_horizontal else Image.FLIP_TOP_BOTTOM
  248. )
  249. # 创建新画布
  250. new_size = (
  251. width + (pad if direction in ['left', 'right'] else 0),
  252. height + (pad if direction in ['top', 'bottom'] else 0)
  253. )
  254. new_img = Image.new('RGB', new_size)
  255. # 定位粘贴
  256. if direction == 'bottom':
  257. new_img.paste(current_img, (0, 0))
  258. new_img.paste(mirrored, (0, height))
  259. elif direction == 'right':
  260. new_img.paste(current_img, (0, 0))
  261. new_img.paste(mirrored, (width, 0))
  262. elif direction == 'top':
  263. new_img.paste(mirrored, (0, 0))
  264. new_img.paste(current_img, (0, pad))
  265. else: # left
  266. new_img.paste(mirrored, (0, 0))
  267. new_img.paste(current_img, (pad, 0))
  268. current_img = new_img
  269. return current_img
  270. def get_crop_box(self, direction, width, height, pad):
  271. """获取裁剪区域坐标"""
  272. return {
  273. 'bottom': (0, height - pad, width, height),
  274. 'right': (width - pad, 0, width, height),
  275. 'top': (0, 0, width, pad),
  276. 'left': (0, 0, pad, height)
  277. }[direction]
  278. def process_queue(self):
  279. """处理消息队列"""
  280. try:
  281. while True:
  282. msg_type, data = self.queue.get_nowait()
  283. if msg_type == "progress":
  284. current, total, filename, success, failed = data
  285. self.progress["value"] = (current / total) * 100
  286. self.update_stats(total, success, failed)
  287. self.root.title(f"正在处理: {filename}")
  288. elif msg_type == "complete":
  289. success, failed = data
  290. self.progress["value"] = 100
  291. self.update_stats(success + failed, success, failed)
  292. messagebox.showinfo("完成", f"处理完成!\n成功: {success} 失败: {failed}")
  293. self.running = False
  294. self.start_btn.config(state=tk.NORMAL)
  295. self.root.title("顺序镜像扩展工具")
  296. elif msg_type == "error":
  297. filename, error = data
  298. self.stats['failed'].config(
  299. text=f"失败: {int(self.stats['failed'].cget('text').split(': ')[1]) + 1}"
  300. )
  301. messagebox.showerror("处理错误", f"文件: {filename}\n错误: {error}")
  302. except queue.Empty:
  303. pass
  304. finally:
  305. self.root.after(100, self.process_queue)
  306. def update_stats(self, total, success, failed):
  307. """更新统计信息"""
  308. self.stats['total'].config(text=f"总文件数: {total}")
  309. self.stats['processed'].config(text=f"已完成: {success}")
  310. self.stats['failed'].config(text=f"失败: {failed}")
  311. self.stats['remaining'].config(text=f"剩余: {total - success - failed}")
  312. def save_log(self):
  313. """保存处理日志"""
  314. log_path = os.path.join(os.path.dirname(__file__), "expand_log.json")
  315. with open(log_path, 'w') as f:
  316. json.dump(self.current_log, f, indent=2)
  317. def clear_log(self):
  318. """清除处理记录"""
  319. if messagebox.askyesno("确认清除", "确定要清除所有处理记录吗?"):
  320. self.processed_files = set()
  321. self.current_log = {}
  322. self.save_log()
  323. self.update_stats(0, 0, 0)
  324. messagebox.showinfo("清除完成", "所有记录已重置")
  325. def confirm_exit(self):
  326. """确认退出程序"""
  327. if self.running:
  328. if messagebox.askyesno("退出确认", "处理正在进行中,确定要退出吗?"):
  329. self.running = False
  330. self.save_log()
  331. self.root.destroy()
  332. else:
  333. self.root.destroy()
  334. if __name__ == "__main__":
  335. root = tk.Tk()
  336. app = SequentialMirrorApp(root)
  337. root.mainloop()