| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404 |
- import os
- import threading
- import queue
- import json
- import tkinter as tk
- from tkinter import ttk, filedialog, messagebox
- from datetime import datetime
- from PIL import Image
- class SequentialMirrorApp:
- def __init__(self, root):
- self.root = root
- self.root.title("顺序镜像扩展工具")
- self.root.geometry("650x500")
- self.setup_ui()
- # 初始化状态
- self.running = False
- self.processed_files = set()
- self.current_log = {}
- self.queue = queue.Queue()
- self.lock = threading.Lock()
- # 加载历史记录
- self.load_last_log()
- self.root.after(100, self.process_queue)
- def setup_ui(self):
- """初始化用户界面"""
- main_frame = ttk.Frame(self.root, padding=15)
- main_frame.pack(fill=tk.BOTH, expand=True)
- # 输入输出设置
- self.create_io_section(main_frame)
- # 扩展参数设置
- self.create_expansion_params(main_frame)
- # 进度显示
- self.create_progress_section(main_frame)
- # 操作按钮
- self.create_control_buttons(main_frame)
- def create_io_section(self, parent):
- """创建输入输出组件"""
- io_frame = ttk.LabelFrame(parent, text="文件设置")
- io_frame.pack(fill=tk.X, pady=5)
- # 输入文件夹
- ttk.Label(io_frame, text="输入路径:").grid(row=0, column=0, sticky=tk.W)
- self.input_entry = ttk.Entry(io_frame, width=45)
- self.input_entry.grid(row=0, column=1, padx=5)
- ttk.Button(io_frame, text="浏览", command=self.browse_input).grid(row=0, column=2)
- # 输出文件夹
- ttk.Label(io_frame, text="输出路径:").grid(row=1, column=0, sticky=tk.W)
- self.output_entry = ttk.Entry(io_frame, width=45)
- self.output_entry.grid(row=1, column=1, padx=5)
- ttk.Button(io_frame, text="浏览", command=self.browse_output).grid(row=1, column=2)
- def create_expansion_params(self, parent):
- """创建扩展参数设置"""
- param_frame = ttk.LabelFrame(parent, text="扩展参数(像素)")
- param_frame.pack(fill=tk.X, pady=5)
- # 扩展方向参数
- directions = [
- ("下边扩展:", "bottom", 10),
- ("右边扩展:", "right", 20),
- ("上边扩展:", "top", 15),
- ("左边扩展:", "left", 5)
- ]
- self.pad_entries = {}
- for idx, (label, direction, default) in enumerate(directions):
- row, col = divmod(idx, 2)
- frame = ttk.Frame(param_frame)
- frame.grid(row=row, column=col, padx=5, pady=5, sticky=tk.W)
- ttk.Label(frame, text=label).pack(side=tk.LEFT)
- entry = ttk.Entry(frame, width=8)
- entry.insert(0, str(default))
- entry.pack(side=tk.LEFT)
- self.pad_entries[direction] = entry
- def create_progress_section(self, parent):
- """创建进度显示组件"""
- progress_frame = ttk.LabelFrame(parent, text="处理进度")
- progress_frame.pack(fill=tk.BOTH, expand=True, pady=5)
- # 进度条
- self.progress = ttk.Progressbar(progress_frame, orient=tk.HORIZONTAL, mode='determinate')
- self.progress.pack(fill=tk.X, padx=10, pady=5)
- # 统计信息
- stats_frame = ttk.Frame(progress_frame)
- stats_frame.pack(pady=5)
- self.stats = {
- 'total': ttk.Label(stats_frame, text="总文件数: 0"),
- 'processed': ttk.Label(stats_frame, text="已完成: 0"),
- 'failed': ttk.Label(stats_frame, text="失败: 0"),
- 'remaining': ttk.Label(stats_frame, text="剩余: 0")
- }
- for lbl in self.stats.values():
- lbl.pack(side=tk.LEFT, padx=15)
- def create_control_buttons(self, parent):
- """创建控制按钮"""
- btn_frame = ttk.Frame(parent)
- btn_frame.pack(pady=10)
- self.start_btn = ttk.Button(btn_frame, text="开始处理", command=self.start_processing)
- self.start_btn.pack(side=tk.LEFT, padx=15)
- ttk.Button(btn_frame, text="清除记录", command=self.clear_log).pack(side=tk.LEFT, padx=15)
- ttk.Button(btn_frame, text="退出", command=self.confirm_exit).pack(side=tk.RIGHT)
- def browse_input(self):
- """选择输入目录"""
- path = filedialog.askdirectory()
- if path:
- self.input_entry.delete(0, tk.END)
- self.input_entry.insert(0, path)
- def browse_output(self):
- """选择输出目录"""
- path = filedialog.askdirectory()
- if path:
- self.output_entry.delete(0, tk.END)
- self.output_entry.insert(0, path)
- def load_last_log(self):
- """加载历史日志"""
- log_path = os.path.join(os.path.dirname(__file__), "expand_log.json")
- if os.path.exists(log_path):
- try:
- with open(log_path, 'r') as f:
- self.current_log = json.load(f)
- self.processed_files = set(self.current_log.get('processed', []))
- except Exception as e:
- messagebox.showwarning("日志加载错误", f"无法读取历史记录:\n{str(e)}")
- def start_processing(self):
- """启动处理流程"""
- if self.running:
- return
- # 验证输入参数
- params = self.validate_inputs()
- if not params:
- return
- # 初始化状态
- self.running = True
- self.start_btn.config(state=tk.DISABLED)
- self.current_log = {
- "start_time": datetime.now().isoformat(),
- "params": params,
- "processed": [],
- "failed": []
- }
- # 启动处理线程
- threading.Thread(
- target=self.process_images,
- args=(params['input'], params['output'], params['pads']),
- daemon=True
- ).start()
- def validate_inputs(self):
- """验证输入有效性"""
- try:
- # 获取扩展参数
- pads = {
- 'bottom': int(self.pad_entries['bottom'].get()),
- 'right': int(self.pad_entries['right'].get()),
- 'top': int(self.pad_entries['top'].get()),
- 'left': int(self.pad_entries['left'].get())
- }
- if any(v < 0 for v in pads.values()):
- raise ValueError("扩展值不能为负数")
- # 验证路径
- input_dir = self.input_entry.get().strip()
- if not os.path.isdir(input_dir):
- raise FileNotFoundError("输入目录不存在")
- output_dir = self.output_entry.get().strip()
- if not output_dir:
- raise ValueError("输出目录不能为空")
- # 验证可写性
- test_file = os.path.join(output_dir, ".write_test")
- try:
- with open(test_file, "w") as f:
- f.write("test")
- os.remove(test_file)
- except Exception as e:
- raise ValueError(f"输出目录不可写: {str(e)}")
- return {
- 'input': input_dir,
- 'output': output_dir,
- 'pads': pads
- }
- except Exception as e:
- messagebox.showerror("输入错误", str(e))
- return None
- def process_images(self, input_dir, output_dir, pads):
- """核心处理逻辑"""
- try:
- os.makedirs(output_dir, exist_ok=True)
- valid_ext = ('.png', '.jpg', '.jpeg', '.webp', '.bmp')
- # 获取待处理文件列表
- all_files = [
- f for f in os.listdir(input_dir)
- if os.path.splitext(f)[1].lower() in valid_ext
- and f not in self.processed_files
- ]
- total = len(all_files)
- if not total:
- self.queue.put(("complete", (0, 0)))
- return
- success = 0
- failed = 0
- for idx, filename in enumerate(all_files, 1):
- if not self.running:
- break
- input_path = os.path.join(input_dir, filename)
- output_path = os.path.join(output_dir, filename)
- try:
- with Image.open(input_path) as img:
- # 转换RGB模式并处理
- current_img = img.convert('RGB')
- # 按顺序执行四个方向的扩展
- current_img = self.sequential_expand(current_img, pads)
- # 保存结果
- current_img.save(output_path, quality=95, optimize=True)
- # 更新处理记录
- with self.lock:
- self.processed_files.add(filename)
- self.current_log['processed'].append(filename)
- success += 1
- # 更新进度
- self.queue.put(("progress", (idx, total, filename, success, failed)))
- except Exception as e:
- with self.lock:
- self.current_log['failed'].append({
- 'file': filename,
- 'error': str(e),
- 'time': datetime.now().isoformat()
- })
- failed += 1
- self.queue.put(("error", (filename, str(e))))
- # 定期保存日志
- if idx % 5 == 0:
- self.save_log()
- self.queue.put(("complete", (success, failed)))
- self.save_log()
- except Exception as e:
- self.queue.put(("error", ("系统错误", str(e))))
- def sequential_expand(self, original_img, pads):
- """执行顺序镜像扩展"""
- current_img = original_img
- directions = [
- ('bottom', pads['bottom'], False), # 下边:垂直翻转
- ('right', pads['right'], True), # 右边:水平翻转
- ('top', pads['top'], False), # 上边:垂直翻转
- ('left', pads['left'], True) # 左边:水平翻转
- ]
- for direction, pad, flip_horizontal in directions:
- if pad <= 0:
- continue
- width, height = current_img.size
- pad = min(pad, (width if flip_horizontal else height))
- # 获取镜像区域
- crop_box = self.get_crop_box(direction, width, height, pad)
- mirrored = current_img.crop(crop_box).transpose(
- Image.FLIP_LEFT_RIGHT if flip_horizontal else Image.FLIP_TOP_BOTTOM
- )
- # 创建新画布
- new_size = (
- width + (pad if direction in ['left', 'right'] else 0),
- height + (pad if direction in ['top', 'bottom'] else 0)
- )
- new_img = Image.new('RGB', new_size)
- # 定位粘贴
- if direction == 'bottom':
- new_img.paste(current_img, (0, 0))
- new_img.paste(mirrored, (0, height))
- elif direction == 'right':
- new_img.paste(current_img, (0, 0))
- new_img.paste(mirrored, (width, 0))
- elif direction == 'top':
- new_img.paste(mirrored, (0, 0))
- new_img.paste(current_img, (0, pad))
- else: # left
- new_img.paste(mirrored, (0, 0))
- new_img.paste(current_img, (pad, 0))
- current_img = new_img
- return current_img
- def get_crop_box(self, direction, width, height, pad):
- """获取裁剪区域坐标"""
- return {
- 'bottom': (0, height - pad, width, height),
- 'right': (width - pad, 0, width, height),
- 'top': (0, 0, width, pad),
- 'left': (0, 0, pad, height)
- }[direction]
- def process_queue(self):
- """处理消息队列"""
- try:
- while True:
- msg_type, data = self.queue.get_nowait()
- if msg_type == "progress":
- current, total, filename, success, failed = data
- self.progress["value"] = (current / total) * 100
- self.update_stats(total, success, failed)
- self.root.title(f"正在处理: {filename}")
- elif msg_type == "complete":
- success, failed = data
- self.progress["value"] = 100
- self.update_stats(success + failed, success, failed)
- messagebox.showinfo("完成", f"处理完成!\n成功: {success} 失败: {failed}")
- self.running = False
- self.start_btn.config(state=tk.NORMAL)
- self.root.title("顺序镜像扩展工具")
- elif msg_type == "error":
- filename, error = data
- self.stats['failed'].config(
- text=f"失败: {int(self.stats['failed'].cget('text').split(': ')[1]) + 1}"
- )
- messagebox.showerror("处理错误", f"文件: {filename}\n错误: {error}")
- except queue.Empty:
- pass
- finally:
- self.root.after(100, self.process_queue)
- def update_stats(self, total, success, failed):
- """更新统计信息"""
- self.stats['total'].config(text=f"总文件数: {total}")
- self.stats['processed'].config(text=f"已完成: {success}")
- self.stats['failed'].config(text=f"失败: {failed}")
- self.stats['remaining'].config(text=f"剩余: {total - success - failed}")
- def save_log(self):
- """保存处理日志"""
- log_path = os.path.join(os.path.dirname(__file__), "expand_log.json")
- with open(log_path, 'w') as f:
- json.dump(self.current_log, f, indent=2)
- def clear_log(self):
- """清除处理记录"""
- if messagebox.askyesno("确认清除", "确定要清除所有处理记录吗?"):
- self.processed_files = set()
- self.current_log = {}
- self.save_log()
- self.update_stats(0, 0, 0)
- messagebox.showinfo("清除完成", "所有记录已重置")
- def confirm_exit(self):
- """确认退出程序"""
- if self.running:
- if messagebox.askyesno("退出确认", "处理正在进行中,确定要退出吗?"):
- self.running = False
- self.save_log()
- self.root.destroy()
- else:
- self.root.destroy()
- if __name__ == "__main__":
- root = tk.Tk()
- app = SequentialMirrorApp(root)
- root.mainloop()
|