import os import threading import queue import json import tkinter as tk from tkinter import ttk, filedialog, messagebox from datetime import datetime from PIL import Image class SequentialMirrorApp: def __init__(self, root): self.root = root self.root.title("顺序镜像扩展工具") self.root.geometry("650x500") self.setup_ui() # 初始化状态 self.running = False self.processed_files = set() self.current_log = {} self.queue = queue.Queue() self.lock = threading.Lock() # 加载历史记录 self.load_last_log() self.root.after(100, self.process_queue) def setup_ui(self): """初始化用户界面""" main_frame = ttk.Frame(self.root, padding=15) main_frame.pack(fill=tk.BOTH, expand=True) # 输入输出设置 self.create_io_section(main_frame) # 扩展参数设置 self.create_expansion_params(main_frame) # 进度显示 self.create_progress_section(main_frame) # 操作按钮 self.create_control_buttons(main_frame) def create_io_section(self, parent): """创建输入输出组件""" io_frame = ttk.LabelFrame(parent, text="文件设置") io_frame.pack(fill=tk.X, pady=5) # 输入文件夹 ttk.Label(io_frame, text="输入路径:").grid(row=0, column=0, sticky=tk.W) self.input_entry = ttk.Entry(io_frame, width=45) self.input_entry.grid(row=0, column=1, padx=5) ttk.Button(io_frame, text="浏览", command=self.browse_input).grid(row=0, column=2) # 输出文件夹 ttk.Label(io_frame, text="输出路径:").grid(row=1, column=0, sticky=tk.W) self.output_entry = ttk.Entry(io_frame, width=45) self.output_entry.grid(row=1, column=1, padx=5) ttk.Button(io_frame, text="浏览", command=self.browse_output).grid(row=1, column=2) def create_expansion_params(self, parent): """创建扩展参数设置""" param_frame = ttk.LabelFrame(parent, text="扩展参数(像素)") param_frame.pack(fill=tk.X, pady=5) # 扩展方向参数 directions = [ ("下边扩展:", "bottom", 10), ("右边扩展:", "right", 20), ("上边扩展:", "top", 15), ("左边扩展:", "left", 5) ] self.pad_entries = {} for idx, (label, direction, default) in enumerate(directions): row, col = divmod(idx, 2) frame = ttk.Frame(param_frame) frame.grid(row=row, column=col, padx=5, pady=5, sticky=tk.W) ttk.Label(frame, text=label).pack(side=tk.LEFT) entry = ttk.Entry(frame, width=8) entry.insert(0, str(default)) entry.pack(side=tk.LEFT) self.pad_entries[direction] = entry def create_progress_section(self, parent): """创建进度显示组件""" progress_frame = ttk.LabelFrame(parent, text="处理进度") progress_frame.pack(fill=tk.BOTH, expand=True, pady=5) # 进度条 self.progress = ttk.Progressbar(progress_frame, orient=tk.HORIZONTAL, mode='determinate') self.progress.pack(fill=tk.X, padx=10, pady=5) # 统计信息 stats_frame = ttk.Frame(progress_frame) stats_frame.pack(pady=5) self.stats = { 'total': ttk.Label(stats_frame, text="总文件数: 0"), 'processed': ttk.Label(stats_frame, text="已完成: 0"), 'failed': ttk.Label(stats_frame, text="失败: 0"), 'remaining': ttk.Label(stats_frame, text="剩余: 0") } for lbl in self.stats.values(): lbl.pack(side=tk.LEFT, padx=15) def create_control_buttons(self, parent): """创建控制按钮""" btn_frame = ttk.Frame(parent) btn_frame.pack(pady=10) self.start_btn = ttk.Button(btn_frame, text="开始处理", command=self.start_processing) self.start_btn.pack(side=tk.LEFT, padx=15) ttk.Button(btn_frame, text="清除记录", command=self.clear_log).pack(side=tk.LEFT, padx=15) ttk.Button(btn_frame, text="退出", command=self.confirm_exit).pack(side=tk.RIGHT) def browse_input(self): """选择输入目录""" path = filedialog.askdirectory() if path: self.input_entry.delete(0, tk.END) self.input_entry.insert(0, path) def browse_output(self): """选择输出目录""" path = filedialog.askdirectory() if path: self.output_entry.delete(0, tk.END) self.output_entry.insert(0, path) def load_last_log(self): """加载历史日志""" log_path = os.path.join(os.path.dirname(__file__), "expand_log.json") if os.path.exists(log_path): try: with open(log_path, 'r') as f: self.current_log = json.load(f) self.processed_files = set(self.current_log.get('processed', [])) except Exception as e: messagebox.showwarning("日志加载错误", f"无法读取历史记录:\n{str(e)}") def start_processing(self): """启动处理流程""" if self.running: return # 验证输入参数 params = self.validate_inputs() if not params: return # 初始化状态 self.running = True self.start_btn.config(state=tk.DISABLED) self.current_log = { "start_time": datetime.now().isoformat(), "params": params, "processed": [], "failed": [] } # 启动处理线程 threading.Thread( target=self.process_images, args=(params['input'], params['output'], params['pads']), daemon=True ).start() def validate_inputs(self): """验证输入有效性""" try: # 获取扩展参数 pads = { 'bottom': int(self.pad_entries['bottom'].get()), 'right': int(self.pad_entries['right'].get()), 'top': int(self.pad_entries['top'].get()), 'left': int(self.pad_entries['left'].get()) } if any(v < 0 for v in pads.values()): raise ValueError("扩展值不能为负数") # 验证路径 input_dir = self.input_entry.get().strip() if not os.path.isdir(input_dir): raise FileNotFoundError("输入目录不存在") output_dir = self.output_entry.get().strip() if not output_dir: raise ValueError("输出目录不能为空") # 验证可写性 test_file = os.path.join(output_dir, ".write_test") try: with open(test_file, "w") as f: f.write("test") os.remove(test_file) except Exception as e: raise ValueError(f"输出目录不可写: {str(e)}") return { 'input': input_dir, 'output': output_dir, 'pads': pads } except Exception as e: messagebox.showerror("输入错误", str(e)) return None def process_images(self, input_dir, output_dir, pads): """核心处理逻辑""" try: os.makedirs(output_dir, exist_ok=True) valid_ext = ('.png', '.jpg', '.jpeg', '.webp', '.bmp') # 获取待处理文件列表 all_files = [ f for f in os.listdir(input_dir) if os.path.splitext(f)[1].lower() in valid_ext and f not in self.processed_files ] total = len(all_files) if not total: self.queue.put(("complete", (0, 0))) return success = 0 failed = 0 for idx, filename in enumerate(all_files, 1): if not self.running: break input_path = os.path.join(input_dir, filename) output_path = os.path.join(output_dir, filename) try: with Image.open(input_path) as img: # 转换RGB模式并处理 current_img = img.convert('RGB') # 按顺序执行四个方向的扩展 current_img = self.sequential_expand(current_img, pads) # 保存结果 current_img.save(output_path, quality=95, optimize=True) # 更新处理记录 with self.lock: self.processed_files.add(filename) self.current_log['processed'].append(filename) success += 1 # 更新进度 self.queue.put(("progress", (idx, total, filename, success, failed))) except Exception as e: with self.lock: self.current_log['failed'].append({ 'file': filename, 'error': str(e), 'time': datetime.now().isoformat() }) failed += 1 self.queue.put(("error", (filename, str(e)))) # 定期保存日志 if idx % 5 == 0: self.save_log() self.queue.put(("complete", (success, failed))) self.save_log() except Exception as e: self.queue.put(("error", ("系统错误", str(e)))) def sequential_expand(self, original_img, pads): """执行顺序镜像扩展""" current_img = original_img directions = [ ('bottom', pads['bottom'], False), # 下边:垂直翻转 ('right', pads['right'], True), # 右边:水平翻转 ('top', pads['top'], False), # 上边:垂直翻转 ('left', pads['left'], True) # 左边:水平翻转 ] for direction, pad, flip_horizontal in directions: if pad <= 0: continue width, height = current_img.size pad = min(pad, (width if flip_horizontal else height)) # 获取镜像区域 crop_box = self.get_crop_box(direction, width, height, pad) mirrored = current_img.crop(crop_box).transpose( Image.FLIP_LEFT_RIGHT if flip_horizontal else Image.FLIP_TOP_BOTTOM ) # 创建新画布 new_size = ( width + (pad if direction in ['left', 'right'] else 0), height + (pad if direction in ['top', 'bottom'] else 0) ) new_img = Image.new('RGB', new_size) # 定位粘贴 if direction == 'bottom': new_img.paste(current_img, (0, 0)) new_img.paste(mirrored, (0, height)) elif direction == 'right': new_img.paste(current_img, (0, 0)) new_img.paste(mirrored, (width, 0)) elif direction == 'top': new_img.paste(mirrored, (0, 0)) new_img.paste(current_img, (0, pad)) else: # left new_img.paste(mirrored, (0, 0)) new_img.paste(current_img, (pad, 0)) current_img = new_img return current_img def get_crop_box(self, direction, width, height, pad): """获取裁剪区域坐标""" return { 'bottom': (0, height - pad, width, height), 'right': (width - pad, 0, width, height), 'top': (0, 0, width, pad), 'left': (0, 0, pad, height) }[direction] def process_queue(self): """处理消息队列""" try: while True: msg_type, data = self.queue.get_nowait() if msg_type == "progress": current, total, filename, success, failed = data self.progress["value"] = (current / total) * 100 self.update_stats(total, success, failed) self.root.title(f"正在处理: {filename}") elif msg_type == "complete": success, failed = data self.progress["value"] = 100 self.update_stats(success + failed, success, failed) messagebox.showinfo("完成", f"处理完成!\n成功: {success} 失败: {failed}") self.running = False self.start_btn.config(state=tk.NORMAL) self.root.title("顺序镜像扩展工具") elif msg_type == "error": filename, error = data self.stats['failed'].config( text=f"失败: {int(self.stats['failed'].cget('text').split(': ')[1]) + 1}" ) messagebox.showerror("处理错误", f"文件: {filename}\n错误: {error}") except queue.Empty: pass finally: self.root.after(100, self.process_queue) def update_stats(self, total, success, failed): """更新统计信息""" self.stats['total'].config(text=f"总文件数: {total}") self.stats['processed'].config(text=f"已完成: {success}") self.stats['failed'].config(text=f"失败: {failed}") self.stats['remaining'].config(text=f"剩余: {total - success - failed}") def save_log(self): """保存处理日志""" log_path = os.path.join(os.path.dirname(__file__), "expand_log.json") with open(log_path, 'w') as f: json.dump(self.current_log, f, indent=2) def clear_log(self): """清除处理记录""" if messagebox.askyesno("确认清除", "确定要清除所有处理记录吗?"): self.processed_files = set() self.current_log = {} self.save_log() self.update_stats(0, 0, 0) messagebox.showinfo("清除完成", "所有记录已重置") def confirm_exit(self): """确认退出程序""" if self.running: if messagebox.askyesno("退出确认", "处理正在进行中,确定要退出吗?"): self.running = False self.save_log() self.root.destroy() else: self.root.destroy() if __name__ == "__main__": root = tk.Tk() app = SequentialMirrorApp(root) root.mainloop()