|
|
@@ -0,0 +1,404 @@
|
|
|
+import os
|
|
|
+import threading
|
|
|
+import queue
|
|
|
+import json
|
|
|
+import tkinter as tk
|
|
|
+from tkinter import ttk, filedialog, messagebox
|
|
|
+from datetime import datetime
|
|
|
+from PIL import Image
|
|
|
+
|
|
|
+
|
|
|
+class SequentialMirrorApp:
|
|
|
+ def __init__(self, root):
|
|
|
+ self.root = root
|
|
|
+ self.root.title("顺序镜像扩展工具")
|
|
|
+ self.root.geometry("650x500")
|
|
|
+ self.setup_ui()
|
|
|
+
|
|
|
+ # 初始化状态
|
|
|
+ self.running = False
|
|
|
+ self.processed_files = set()
|
|
|
+ self.current_log = {}
|
|
|
+ self.queue = queue.Queue()
|
|
|
+ self.lock = threading.Lock()
|
|
|
+
|
|
|
+ # 加载历史记录
|
|
|
+ self.load_last_log()
|
|
|
+ self.root.after(100, self.process_queue)
|
|
|
+
|
|
|
+ def setup_ui(self):
|
|
|
+ """初始化用户界面"""
|
|
|
+ main_frame = ttk.Frame(self.root, padding=15)
|
|
|
+ main_frame.pack(fill=tk.BOTH, expand=True)
|
|
|
+
|
|
|
+ # 输入输出设置
|
|
|
+ self.create_io_section(main_frame)
|
|
|
+
|
|
|
+ # 扩展参数设置
|
|
|
+ self.create_expansion_params(main_frame)
|
|
|
+
|
|
|
+ # 进度显示
|
|
|
+ self.create_progress_section(main_frame)
|
|
|
+
|
|
|
+ # 操作按钮
|
|
|
+ self.create_control_buttons(main_frame)
|
|
|
+
|
|
|
+ def create_io_section(self, parent):
|
|
|
+ """创建输入输出组件"""
|
|
|
+ io_frame = ttk.LabelFrame(parent, text="文件设置")
|
|
|
+ io_frame.pack(fill=tk.X, pady=5)
|
|
|
+
|
|
|
+ # 输入文件夹
|
|
|
+ ttk.Label(io_frame, text="输入路径:").grid(row=0, column=0, sticky=tk.W)
|
|
|
+ self.input_entry = ttk.Entry(io_frame, width=45)
|
|
|
+ self.input_entry.grid(row=0, column=1, padx=5)
|
|
|
+ ttk.Button(io_frame, text="浏览", command=self.browse_input).grid(row=0, column=2)
|
|
|
+
|
|
|
+ # 输出文件夹
|
|
|
+ ttk.Label(io_frame, text="输出路径:").grid(row=1, column=0, sticky=tk.W)
|
|
|
+ self.output_entry = ttk.Entry(io_frame, width=45)
|
|
|
+ self.output_entry.grid(row=1, column=1, padx=5)
|
|
|
+ ttk.Button(io_frame, text="浏览", command=self.browse_output).grid(row=1, column=2)
|
|
|
+
|
|
|
+ def create_expansion_params(self, parent):
|
|
|
+ """创建扩展参数设置"""
|
|
|
+ param_frame = ttk.LabelFrame(parent, text="扩展参数(像素)")
|
|
|
+ param_frame.pack(fill=tk.X, pady=5)
|
|
|
+
|
|
|
+ # 扩展方向参数
|
|
|
+ directions = [
|
|
|
+ ("下边扩展:", "bottom", 10),
|
|
|
+ ("右边扩展:", "right", 20),
|
|
|
+ ("上边扩展:", "top", 15),
|
|
|
+ ("左边扩展:", "left", 5)
|
|
|
+ ]
|
|
|
+
|
|
|
+ self.pad_entries = {}
|
|
|
+ for idx, (label, direction, default) in enumerate(directions):
|
|
|
+ row, col = divmod(idx, 2)
|
|
|
+ frame = ttk.Frame(param_frame)
|
|
|
+ frame.grid(row=row, column=col, padx=5, pady=5, sticky=tk.W)
|
|
|
+
|
|
|
+ ttk.Label(frame, text=label).pack(side=tk.LEFT)
|
|
|
+ entry = ttk.Entry(frame, width=8)
|
|
|
+ entry.insert(0, str(default))
|
|
|
+ entry.pack(side=tk.LEFT)
|
|
|
+ self.pad_entries[direction] = entry
|
|
|
+
|
|
|
+ def create_progress_section(self, parent):
|
|
|
+ """创建进度显示组件"""
|
|
|
+ progress_frame = ttk.LabelFrame(parent, text="处理进度")
|
|
|
+ progress_frame.pack(fill=tk.BOTH, expand=True, pady=5)
|
|
|
+
|
|
|
+ # 进度条
|
|
|
+ self.progress = ttk.Progressbar(progress_frame, orient=tk.HORIZONTAL, mode='determinate')
|
|
|
+ self.progress.pack(fill=tk.X, padx=10, pady=5)
|
|
|
+
|
|
|
+ # 统计信息
|
|
|
+ stats_frame = ttk.Frame(progress_frame)
|
|
|
+ stats_frame.pack(pady=5)
|
|
|
+
|
|
|
+ self.stats = {
|
|
|
+ 'total': ttk.Label(stats_frame, text="总文件数: 0"),
|
|
|
+ 'processed': ttk.Label(stats_frame, text="已完成: 0"),
|
|
|
+ 'failed': ttk.Label(stats_frame, text="失败: 0"),
|
|
|
+ 'remaining': ttk.Label(stats_frame, text="剩余: 0")
|
|
|
+ }
|
|
|
+ for lbl in self.stats.values():
|
|
|
+ lbl.pack(side=tk.LEFT, padx=15)
|
|
|
+
|
|
|
+ def create_control_buttons(self, parent):
|
|
|
+ """创建控制按钮"""
|
|
|
+ btn_frame = ttk.Frame(parent)
|
|
|
+ btn_frame.pack(pady=10)
|
|
|
+
|
|
|
+ self.start_btn = ttk.Button(btn_frame, text="开始处理", command=self.start_processing)
|
|
|
+ self.start_btn.pack(side=tk.LEFT, padx=15)
|
|
|
+
|
|
|
+ ttk.Button(btn_frame, text="清除记录", command=self.clear_log).pack(side=tk.LEFT, padx=15)
|
|
|
+ ttk.Button(btn_frame, text="退出", command=self.confirm_exit).pack(side=tk.RIGHT)
|
|
|
+
|
|
|
+ def browse_input(self):
|
|
|
+ """选择输入目录"""
|
|
|
+ path = filedialog.askdirectory()
|
|
|
+ if path:
|
|
|
+ self.input_entry.delete(0, tk.END)
|
|
|
+ self.input_entry.insert(0, path)
|
|
|
+
|
|
|
+ def browse_output(self):
|
|
|
+ """选择输出目录"""
|
|
|
+ path = filedialog.askdirectory()
|
|
|
+ if path:
|
|
|
+ self.output_entry.delete(0, tk.END)
|
|
|
+ self.output_entry.insert(0, path)
|
|
|
+
|
|
|
+ def load_last_log(self):
|
|
|
+ """加载历史日志"""
|
|
|
+ log_path = os.path.join(os.path.dirname(__file__), "expand_log.json")
|
|
|
+ if os.path.exists(log_path):
|
|
|
+ try:
|
|
|
+ with open(log_path, 'r') as f:
|
|
|
+ self.current_log = json.load(f)
|
|
|
+ self.processed_files = set(self.current_log.get('processed', []))
|
|
|
+ except Exception as e:
|
|
|
+ messagebox.showwarning("日志加载错误", f"无法读取历史记录:\n{str(e)}")
|
|
|
+
|
|
|
+ def start_processing(self):
|
|
|
+ """启动处理流程"""
|
|
|
+ if self.running:
|
|
|
+ return
|
|
|
+
|
|
|
+ # 验证输入参数
|
|
|
+ params = self.validate_inputs()
|
|
|
+ if not params:
|
|
|
+ return
|
|
|
+
|
|
|
+ # 初始化状态
|
|
|
+ self.running = True
|
|
|
+ self.start_btn.config(state=tk.DISABLED)
|
|
|
+ self.current_log = {
|
|
|
+ "start_time": datetime.now().isoformat(),
|
|
|
+ "params": params,
|
|
|
+ "processed": [],
|
|
|
+ "failed": []
|
|
|
+ }
|
|
|
+
|
|
|
+ # 启动处理线程
|
|
|
+ threading.Thread(
|
|
|
+ target=self.process_images,
|
|
|
+ args=(params['input'], params['output'], params['pads']),
|
|
|
+ daemon=True
|
|
|
+ ).start()
|
|
|
+
|
|
|
+ def validate_inputs(self):
|
|
|
+ """验证输入有效性"""
|
|
|
+ try:
|
|
|
+ # 获取扩展参数
|
|
|
+ pads = {
|
|
|
+ 'bottom': int(self.pad_entries['bottom'].get()),
|
|
|
+ 'right': int(self.pad_entries['right'].get()),
|
|
|
+ 'top': int(self.pad_entries['top'].get()),
|
|
|
+ 'left': int(self.pad_entries['left'].get())
|
|
|
+ }
|
|
|
+ if any(v < 0 for v in pads.values()):
|
|
|
+ raise ValueError("扩展值不能为负数")
|
|
|
+
|
|
|
+ # 验证路径
|
|
|
+ input_dir = self.input_entry.get().strip()
|
|
|
+ if not os.path.isdir(input_dir):
|
|
|
+ raise FileNotFoundError("输入目录不存在")
|
|
|
+
|
|
|
+ output_dir = self.output_entry.get().strip()
|
|
|
+ if not output_dir:
|
|
|
+ raise ValueError("输出目录不能为空")
|
|
|
+
|
|
|
+ # 验证可写性
|
|
|
+ test_file = os.path.join(output_dir, ".write_test")
|
|
|
+ try:
|
|
|
+ with open(test_file, "w") as f:
|
|
|
+ f.write("test")
|
|
|
+ os.remove(test_file)
|
|
|
+ except Exception as e:
|
|
|
+ raise ValueError(f"输出目录不可写: {str(e)}")
|
|
|
+
|
|
|
+ return {
|
|
|
+ 'input': input_dir,
|
|
|
+ 'output': output_dir,
|
|
|
+ 'pads': pads
|
|
|
+ }
|
|
|
+ except Exception as e:
|
|
|
+ messagebox.showerror("输入错误", str(e))
|
|
|
+ return None
|
|
|
+
|
|
|
+ def process_images(self, input_dir, output_dir, pads):
|
|
|
+ """核心处理逻辑"""
|
|
|
+ try:
|
|
|
+ os.makedirs(output_dir, exist_ok=True)
|
|
|
+ valid_ext = ('.png', '.jpg', '.jpeg', '.webp', '.bmp')
|
|
|
+
|
|
|
+ # 获取待处理文件列表
|
|
|
+ all_files = [
|
|
|
+ f for f in os.listdir(input_dir)
|
|
|
+ if os.path.splitext(f)[1].lower() in valid_ext
|
|
|
+ and f not in self.processed_files
|
|
|
+ ]
|
|
|
+ total = len(all_files)
|
|
|
+
|
|
|
+ if not total:
|
|
|
+ self.queue.put(("complete", (0, 0)))
|
|
|
+ return
|
|
|
+
|
|
|
+ success = 0
|
|
|
+ failed = 0
|
|
|
+
|
|
|
+ for idx, filename in enumerate(all_files, 1):
|
|
|
+ if not self.running:
|
|
|
+ break
|
|
|
+
|
|
|
+ input_path = os.path.join(input_dir, filename)
|
|
|
+ output_path = os.path.join(output_dir, filename)
|
|
|
+
|
|
|
+ try:
|
|
|
+ with Image.open(input_path) as img:
|
|
|
+ # 转换RGB模式并处理
|
|
|
+ current_img = img.convert('RGB')
|
|
|
+
|
|
|
+ # 按顺序执行四个方向的扩展
|
|
|
+ current_img = self.sequential_expand(current_img, pads)
|
|
|
+
|
|
|
+ # 保存结果
|
|
|
+ current_img.save(output_path, quality=95, optimize=True)
|
|
|
+
|
|
|
+ # 更新处理记录
|
|
|
+ with self.lock:
|
|
|
+ self.processed_files.add(filename)
|
|
|
+ self.current_log['processed'].append(filename)
|
|
|
+ success += 1
|
|
|
+
|
|
|
+ # 更新进度
|
|
|
+ self.queue.put(("progress", (idx, total, filename, success, failed)))
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ with self.lock:
|
|
|
+ self.current_log['failed'].append({
|
|
|
+ 'file': filename,
|
|
|
+ 'error': str(e),
|
|
|
+ 'time': datetime.now().isoformat()
|
|
|
+ })
|
|
|
+ failed += 1
|
|
|
+ self.queue.put(("error", (filename, str(e))))
|
|
|
+
|
|
|
+ # 定期保存日志
|
|
|
+ if idx % 5 == 0:
|
|
|
+ self.save_log()
|
|
|
+
|
|
|
+ self.queue.put(("complete", (success, failed)))
|
|
|
+ self.save_log()
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ self.queue.put(("error", ("系统错误", str(e))))
|
|
|
+
|
|
|
+ def sequential_expand(self, original_img, pads):
|
|
|
+ """执行顺序镜像扩展"""
|
|
|
+ current_img = original_img
|
|
|
+ directions = [
|
|
|
+ ('bottom', pads['bottom'], False), # 下边:垂直翻转
|
|
|
+ ('right', pads['right'], True), # 右边:水平翻转
|
|
|
+ ('top', pads['top'], False), # 上边:垂直翻转
|
|
|
+ ('left', pads['left'], True) # 左边:水平翻转
|
|
|
+ ]
|
|
|
+
|
|
|
+ for direction, pad, flip_horizontal in directions:
|
|
|
+ if pad <= 0:
|
|
|
+ continue
|
|
|
+
|
|
|
+ width, height = current_img.size
|
|
|
+ pad = min(pad, (width if flip_horizontal else height))
|
|
|
+
|
|
|
+ # 获取镜像区域
|
|
|
+ crop_box = self.get_crop_box(direction, width, height, pad)
|
|
|
+ mirrored = current_img.crop(crop_box).transpose(
|
|
|
+ Image.FLIP_LEFT_RIGHT if flip_horizontal else Image.FLIP_TOP_BOTTOM
|
|
|
+ )
|
|
|
+
|
|
|
+ # 创建新画布
|
|
|
+ new_size = (
|
|
|
+ width + (pad if direction in ['left', 'right'] else 0),
|
|
|
+ height + (pad if direction in ['top', 'bottom'] else 0)
|
|
|
+ )
|
|
|
+ new_img = Image.new('RGB', new_size)
|
|
|
+
|
|
|
+ # 定位粘贴
|
|
|
+ if direction == 'bottom':
|
|
|
+ new_img.paste(current_img, (0, 0))
|
|
|
+ new_img.paste(mirrored, (0, height))
|
|
|
+ elif direction == 'right':
|
|
|
+ new_img.paste(current_img, (0, 0))
|
|
|
+ new_img.paste(mirrored, (width, 0))
|
|
|
+ elif direction == 'top':
|
|
|
+ new_img.paste(mirrored, (0, 0))
|
|
|
+ new_img.paste(current_img, (0, pad))
|
|
|
+ else: # left
|
|
|
+ new_img.paste(mirrored, (0, 0))
|
|
|
+ new_img.paste(current_img, (pad, 0))
|
|
|
+
|
|
|
+ current_img = new_img
|
|
|
+
|
|
|
+ return current_img
|
|
|
+
|
|
|
+ def get_crop_box(self, direction, width, height, pad):
|
|
|
+ """获取裁剪区域坐标"""
|
|
|
+ return {
|
|
|
+ 'bottom': (0, height - pad, width, height),
|
|
|
+ 'right': (width - pad, 0, width, height),
|
|
|
+ 'top': (0, 0, width, pad),
|
|
|
+ 'left': (0, 0, pad, height)
|
|
|
+ }[direction]
|
|
|
+
|
|
|
+ def process_queue(self):
|
|
|
+ """处理消息队列"""
|
|
|
+ try:
|
|
|
+ while True:
|
|
|
+ msg_type, data = self.queue.get_nowait()
|
|
|
+
|
|
|
+ if msg_type == "progress":
|
|
|
+ current, total, filename, success, failed = data
|
|
|
+ self.progress["value"] = (current / total) * 100
|
|
|
+ self.update_stats(total, success, failed)
|
|
|
+ self.root.title(f"正在处理: {filename}")
|
|
|
+ elif msg_type == "complete":
|
|
|
+ success, failed = data
|
|
|
+ self.progress["value"] = 100
|
|
|
+ self.update_stats(success + failed, success, failed)
|
|
|
+ messagebox.showinfo("完成", f"处理完成!\n成功: {success} 失败: {failed}")
|
|
|
+ self.running = False
|
|
|
+ self.start_btn.config(state=tk.NORMAL)
|
|
|
+ self.root.title("顺序镜像扩展工具")
|
|
|
+ elif msg_type == "error":
|
|
|
+ filename, error = data
|
|
|
+ self.stats['failed'].config(
|
|
|
+ text=f"失败: {int(self.stats['failed'].cget('text').split(': ')[1]) + 1}"
|
|
|
+ )
|
|
|
+ messagebox.showerror("处理错误", f"文件: {filename}\n错误: {error}")
|
|
|
+ except queue.Empty:
|
|
|
+ pass
|
|
|
+ finally:
|
|
|
+ self.root.after(100, self.process_queue)
|
|
|
+
|
|
|
+ def update_stats(self, total, success, failed):
|
|
|
+ """更新统计信息"""
|
|
|
+ self.stats['total'].config(text=f"总文件数: {total}")
|
|
|
+ self.stats['processed'].config(text=f"已完成: {success}")
|
|
|
+ self.stats['failed'].config(text=f"失败: {failed}")
|
|
|
+ self.stats['remaining'].config(text=f"剩余: {total - success - failed}")
|
|
|
+
|
|
|
+ def save_log(self):
|
|
|
+ """保存处理日志"""
|
|
|
+ log_path = os.path.join(os.path.dirname(__file__), "expand_log.json")
|
|
|
+ with open(log_path, 'w') as f:
|
|
|
+ json.dump(self.current_log, f, indent=2)
|
|
|
+
|
|
|
+ def clear_log(self):
|
|
|
+ """清除处理记录"""
|
|
|
+ if messagebox.askyesno("确认清除", "确定要清除所有处理记录吗?"):
|
|
|
+ self.processed_files = set()
|
|
|
+ self.current_log = {}
|
|
|
+ self.save_log()
|
|
|
+ self.update_stats(0, 0, 0)
|
|
|
+ messagebox.showinfo("清除完成", "所有记录已重置")
|
|
|
+
|
|
|
+ def confirm_exit(self):
|
|
|
+ """确认退出程序"""
|
|
|
+ if self.running:
|
|
|
+ if messagebox.askyesno("退出确认", "处理正在进行中,确定要退出吗?"):
|
|
|
+ self.running = False
|
|
|
+ self.save_log()
|
|
|
+ self.root.destroy()
|
|
|
+ else:
|
|
|
+ self.root.destroy()
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ root = tk.Tk()
|
|
|
+ app = SequentialMirrorApp(root)
|
|
|
+ root.mainloop()
|