活动介绍
file-type

JMeter中读取ForEach控制器JSON变量的技巧

下载需积分: 6 | 2KB | 更新于2024-11-15 | 180 浏览量 | 0 下载量 举报 收藏
download 立即下载
" 知识点: 1. JMeter基础:JMeter是一个开源的性能测试工具,主要用于对软件进行压力测试。它能够模拟多人同时访问或操作软件,验证软件的性能是否满足预期。JMeter具有图形界面,支持多协议,如HTTP、FTP、LDAP、Web Services等,并且可以扩展插件来支持其他测试。 2. ForEach控制器:ForEach控制器允许用户遍历集合中的每个元素,并对每个元素执行指定的逻辑。在JMeter中,ForEach控制器通常与JSON、CSV Data Set Config等数据源配合使用,以实现对测试数据的动态管理。 3. JSON变量:JSON(JavaScript Object Notation)是一种轻量级的数据交换格式,易于人阅读和编写,同时也易于机器解析和生成。在JMeter中,JSON变量可以作为数据源使用,存储测试过程中需要动态替换的参数值。 4. 读取JSON变量值:在JMeter的ForEach控制器中,可以使用JMeter的内置函数或脚本来读取JSON变量中的值。常用的方法有__V()函数、Beanshell脚本、JSR223脚本等,其中JSR223脚本提供了更多的灵活性和功能。 5. __V()函数:__V()函数是JMeter内置的一个变量提取函数,可以用来从变量或属性中读取值。当需要从JSON变量中读取值时,__V()函数可以结合JSON Path表达式来定位JSON中的具体数据。 6. Beanshell脚本:Beanshell是一种小型的、可嵌入的Java源解释器,它提供了一个脚本语言,可以用来编写简单的脚本并执行。在JMeter中,Beanshell脚本可用于读取和修改测试数据,以及执行更复杂的逻辑。 7. JSR223脚本:JSR223测试元素允许使用Groovy、JavaScript、Jython或BeanShell脚本来编写测试逻辑。其中,Groovy和JavaScript通常是首选,因为它们拥有更好的性能和更多的库支持。JSR223脚本用于读取JSON变量时,可以利用脚本语言提供的JSON解析库来实现。 8. JSON Path:JSON Path是类似XPath的一种表达式语言,用于从JSON文档中获取数据。它提供了一种简单的方法来导航、过滤和选择JSON结构中的元素。在JMeter中,结合__V()函数或其他脚本使用JSON Path表达式,可以精确地从JSON变量中提取所需的数据。 9. View Results Tree:View Results Tree是一个JMeter的监听器,它可以以树状结构的方式展示测试过程中的所有请求和响应数据。该监听器常用于调试测试脚本,以便查看和分析测试过程中每个请求的详细信息。在本技巧中,View Results Tree可以帮助用户观察ForEach控制器和JSON变量交互的实际效果。 通过以上知识点的学习,您将能够掌握如何在JMeter中使用ForEach控制器读取JSON变量的值,从而实现测试数据的动态管理。这对于提高测试的灵活性和准确性具有重要的意义。在实际操作过程中,根据不同的测试需求,您可以选择合适的脚本或函数来读取JSON数据,并利用View Results Tree监听器进行验证和调试。

相关推荐

filetype

ERROR o.a.j.JMeter: Uncaught exception: java.lang.NoClassDefFoundError: Could not initialize class org.apache.jmeter.gui.util.MenuFactory at org.apache.jmeter.control.gui.TestPlanGui.createPopupMenu(TestPlanGui.java:92) ~[ApacheJMeter_core.jar:5.0 r1840935] at org.apache.jmeter.gui.tree.JMeterTreeNode.createPopupMenu(JMeterTreeNode.java:184) ~[ApacheJMeter_core.jar:5.0 r1840935] at org.apache.jmeter.gui.tree.JMeterTreeListener.displayPopUp(JMeterTreeListener.java:235) ~[ApacheJMeter_core.jar:5.0 r1840935] at org.apache.jmeter.gui.tree.JMeterTreeListener.mousePressed(JMeterTreeListener.java:181) ~[ApacheJMeter_core.jar:5.0 r1840935] at java.awt.AWTEventMulticaster.mousePressed(Unknown Source) ~[?:1.8.0_131] at java.awt.Component.processMouseEvent(Unknown Source) ~[?:1.8.0_131] at javax.swing.JComponent.processMouseEvent(Unknown Source) ~[?:1.8.0_131] at java.awt.Component.processEvent(Unknown Source) ~[?:1.8.0_131] at java.awt.Container.processEvent(Unknown Source) ~[?:1.8.0_131] at java.awt.Component.dispatchEventImpl(Unknown Source) ~[?:1.8.0_131] at java.awt.Container.dispatchEventImpl(Unknown Source) ~[?:1.8.0_131] at java.awt.Component.dispatchEvent(Unknown Source) ~[?:1.8.0_131] at java.awt.LightweightDispatcher.retargetMouseEvent(Unknown Source) ~[?:1.8.0_131] at java.awt.LightweightDispatcher.processMouseEvent(Unknown Source) ~[?:1.8.0_131] at java.awt.LightweightDispatcher.dispatchEvent(Unknown Source) ~[?:1.8.0_131] at java.awt.Container.dispatchEventImpl(Unknown Source) ~[?:1.8.0_131] at java.awt.Window.dispatchEventImpl(Unknown Source) ~[?:1.8.0_131] at java.awt.Component.dispatchEvent(Unknown Source) ~[?:1.8.0_131] at java.awt.EventQueue.dispatchEventImpl(Unknown Source) ~[?:1.8.0_131] at java.awt.EventQueue.access$500(Unknown Source) ~[?:1.8.0_131] at java.awt.EventQueue$3.run(Unknown Source) ~[?:1.8.0_131] at java.awt.EventQueue$3.run(Unknown Source) ~[?:1.8.0_131] at java.security.AccessController.doPrivileged(Native Method) ~[

filetype

import os import pandas as pd import tkinter as tk from tkinter import ttk, filedialog, scrolledtext, messagebox from tkinter.colorchooser import askcolor from difflib import SequenceMatcher import re import openpyxl import threading import numpy as np from openpyxl.utils import get_column_letter import xlrd import gc import hashlib import json import tempfile from concurrent.futures import ThreadPoolExecutor, as_completed import unicodedata class EnhancedSignalComparator: def __init__(self, root): self.root = root self.root.title("增强版信号功能对比工具") self.root.geometry("1200x800") self.root.configure(bg="#f0f0f0") # 初始化变量 self.folder_path = tk.StringVar() self.search_text = tk.StringVar() self.files = [] self.results = {} # 存储信号对比结果 self.highlight_color = "#FFD700" # 默认高亮色 self.search_running = False self.stop_requested = False self.cache_dir = os.path.join(tempfile.gettempdir(), "excel_cache") self.file_cache = {} # 文件缓存 self.column_cache = {} # 列名缓存 self.max_workers = 4 # 最大并发线程数 # 创建缓存目录 os.makedirs(self.cache_dir, exist_ok=True) # 创建界面 self.create_widgets() def create_widgets(self): # 顶部控制面板 control_frame = ttk.Frame(self.root, padding=10) control_frame.pack(fill=tk.X) # 文件夹选择 ttk.Label(control_frame, text="选择文件夹:").grid(row=0, column=0, sticky=tk.W) folder_entry = ttk.Entry(control_frame, textvariable=self.folder_path, width=50) folder_entry.grid(row=0, column=1, padx=5, sticky=tk.EW) ttk.Button(control_frame, text="浏览...", command=self.browse_folder).grid(row=0, column=2) # 搜索输入 ttk.Label(control_frame, text="搜索信号:").grid(row=1, column=0, sticky=tk.W, pady=(10,0)) search_entry = ttk.Entry(control_frame, textvariable=self.search_text, width=50) search_entry.grid(row=1, column=1, padx=5, pady=(10,0), sticky=tk.EW) search_entry.bind("<Return>", lambda event: self.start_search_thread()) ttk.Button(control_frame, text="搜索", command=self.start_search_thread).grid(row=1, column=2, pady=(10,0)) ttk.Button(control_frame, text="停止", command=self.stop_search).grid(row=1, column=3, pady=(10,0), padx=5) # 高级选项 ttk.Label(control_frame, text="并发线程:").grid(row=2, column=0, sticky=tk.W, pady=(10,0)) self.thread_var = tk.StringVar(value="4") ttk.Combobox(control_frame, textvariable=self.thread_var, values=["1", "2", "4", "8"], width=5).grid(row=2, column=1, sticky=tk.W, padx=5, pady=(10,0)) # 文件过滤 ttk.Label(control_frame, text="文件过滤:").grid(row=2, column=2, sticky=tk.W, pady=(10,0)) self.filter_var = tk.StringVar(value="*.xlsx;*.xlsm;*.xls") ttk.Entry(control_frame, textvariable=self.filter_var, width=20).grid(row=2, column=3, sticky=tk.W, padx=5, pady=(10,0)) # 高亮颜色选择 ttk.Label(control_frame, text="高亮颜色:").grid(row=3, column=0, sticky=tk.W, pady=(10,0)) self.color_btn = tk.Button(control_frame, bg=self.highlight_color, width=3, command=self.choose_color) self.color_btn.grid(row=3, column=1, sticky=tk.W, padx=5, pady=(10,0)) # 进度条 self.progress = ttk.Progressbar(control_frame, orient="horizontal", length=200, mode="determinate") self.progress.grid(row=3, column=2, columnspan=2, sticky=tk.EW, padx=5, pady=(10,0)) # 结果标签 self.result_label = ttk.Label(control_frame, text="") self.result_label.grid(row=3, column=4, sticky=tk.W, padx=5, pady=(10,0)) # 对比面板 notebook = ttk.Notebook(self.root) notebook.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) # 表格视图 self.table_frame = ttk.Frame(notebook) notebook.add(self.table_frame, text="表格视图") # 文本对比视图 self.text_frame = ttk.Frame(notebook) notebook.add(self.text_frame, text="行内容对比") # 状态栏 self.status_var = tk.StringVar() status_bar = ttk.Label(self.root, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W) status_bar.pack(side=tk.BOTTOM, fill=tk.X) # 初始化表格和文本区域 self.init_table_view() self.init_text_view() def init_table_view(self): """初始化表格视图""" # 创建树状表格 columns = ("信号", "文件", "行内容摘要") self.tree = ttk.Treeview(self.table_frame, columns=columns, show="headings") # 设置列标题 for col in columns: self.tree.heading(col, text=col) self.tree.column(col, width=200, anchor=tk.W) # 添加滚动条 scrollbar = ttk.Scrollbar(self.table_frame, orient=tk.VERTICAL, command=self.tree.yview) self.tree.configure(yscrollcommand=scrollbar.set) self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) # 绑定选择事件 self.tree.bind("<<TreeviewSelect>>", self.on_table_select) def init_text_view(self): """初始化文本对比视图""" self.text_panes = {} self.text_frame.columnconfigure(0, weight=1) self.text_frame.rowconfigure(0, weight=1) # 创建对比容器 self.compare_container = ttk.Frame(self.text_frame) self.compare_container.grid(row=0, column=0, sticky="nsew", padx=5, pady=5) # 添加差异高亮按钮 btn_frame = ttk.Frame(self.text_frame) btn_frame.grid(row=1, column=0, sticky="ew", padx=5, pady=5) ttk.Button(btn_frame, text="高亮显示差异", command=self.highlight_differences).pack(side=tk.LEFT) ttk.Button(btn_frame, text="导出差异报告", command=self.export_report).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="清除缓存", command=self.clear_cache).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="手动指定列名", command=self.manual_column_select).pack(side=tk.LEFT, padx=5) def browse_folder(self): """选择文件夹""" folder = filedialog.askdirectory(title="选择包含Excel文件的文件夹") if folder: self.folder_path.set(folder) self.load_files() def load_files(self): """加载文件夹中的Excel文件(优化特殊字符处理)""" folder = self.folder_path.get() if not folder or not os.path.isdir(folder): return # 获取文件过滤模式 filter_patterns = self.filter_var.get().split(';') self.files = [] for file in os.listdir(folder): file_path = os.path.join(folder, file) # 跳过临时文件 if file.startswith('~$'): continue # 检查文件扩展名 file_lower = file.lower() matched = False for pattern in filter_patterns: # 移除通配符并转换为小写 ext = pattern.replace('*', '').lower() if file_lower.endswith(ext): matched = True break if matched: # 规范化文件名处理特殊字符 normalized_path = self.normalize_file_path(file_path) if normalized_path and os.path.isfile(normalized_path): self.files.append(normalized_path) self.status_var.set(f"找到 {len(self.files)} 个Excel文件") def normalize_file_path(self, path): """规范化文件路径,处理特殊字符""" try: # 尝试直接访问文件 if os.path.exists(path): return path # 尝试Unicode规范化 normalized = unicodedata.normalize('NFC', path) if os.path.exists(normalized): return normalized # 尝试不同编码方案 encodings = ['utf-8', 'shift_jis', 'euc-jp', 'cp932'] for encoding in encodings: try: decoded = path.encode('latin1').decode(encoding) if os.path.exists(decoded): return decoded except: continue # 最终尝试原始路径 return path except Exception as e: self.status_var.set(f"文件路径处理错误: {str(e)}") return path def get_file_hash(self, file_path): """计算文件哈希值用于缓存""" try: hash_md5 = hashlib.md5() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest() except Exception as e: self.status_var.set(f"计算文件哈希失败: {str(e)}") return str(os.path.getmtime(file_path)) def get_cache_filename(self, file_path): """获取缓存文件名""" file_hash = self.get_file_hash(file_path) return os.path.join(self.cache_dir, f"{os.path.basename(file_path)}_{file_hash}.cache") def load_header_cache(self, file_path): """加载列名缓存""" cache_file = self.get_cache_filename(file_path) if os.path.exists(cache_file): try: with open(cache_file, "r", encoding='utf-8') as f: return json.load(f) except: return None return None def save_header_cache(self, file_path, header_info): """保存列名缓存""" cache_file = self.get_cache_filename(file_path) try: with open(cache_file, "w", encoding='utf-8') as f: json.dump(header_info, f) return True except: return False def find_header_row(self, file_path): """查找列名行(增强版)""" # 检查缓存 cache = self.load_header_cache(file_path) if cache: return cache.get("header_row"), cache.get("signal_col") # 没有缓存则重新查找 if file_path.lower().endswith((".xlsx", ".xlsm")): return self.find_header_row_openpyxl(file_path) elif file_path.lower().endswith(".xls"): return self.find_header_row_xlrd(file_path) return None, None def find_header_row_openpyxl(self, file_path): """使用openpyxl查找列名行(增强版)""" try: wb = openpyxl.load_workbook(file_path, read_only=True, data_only=True) ws = wb.active # 尝试多种列名匹配模式 patterns = [ r'データ名', # 半角片假名 r'データ名', # 全角片假名 r'信号名', # 中文 r'Signal Name', # 英文 r'Data Name', r'信号名称', r'データ名称' ] # 扩大搜索范围:前100行和前100列 for row_idx in range(1, 101): # 1-100行 # 扩大列搜索范围到100列 for col_idx in range(1, 101): # 1-100列 try: cell = ws.cell(row=row_idx, column=col_idx) cell_value = cell.value if not cell_value: continue # 尝试所有匹配模式 cell_str = str(cell_value) for pattern in patterns: if re.search(pattern, cell_str, re.IGNORECASE): # 找到列名行后,尝试确定信号列 signal_col = None # 在同行中查找信号列 for col_idx2 in range(1, 101): # 1-100列 try: cell2 = ws.cell(row=row_idx, column=col_idx2) cell2_value = cell2.value if not cell2_value: continue cell2_str = str(cell2_value) if re.search(pattern, cell2_str, re.IGNORECASE): signal_col = col_idx2 break except: continue # 保存缓存 if signal_col is not None: header_info = {"header_row": row_idx, "signal_col": signal_col} self.save_header_cache(file_path, header_info) wb.close() return row_idx, signal_col except: continue wb.close() except Exception as e: self.status_var.set(f"查找列名行出错: {str(e)}") return None, None def find_header_row_xlrd(self, file_path): """使用xlrd查找列名行(增强版)""" try: wb = xlrd.open_workbook(file_path) ws = wb.sheet_by_index(0) # 尝试多种列名匹配模式 patterns = [ r'データ名', # 半角片假名 r'データ名', # 全角片假名 r'信号名', # 中文 r'Signal Name', # 英文 r'Data Name', r'信号名称', r'データ名称' ] # 扩大搜索范围:前100行和前100列 for row_idx in range(0, 100): # 0-99行 # 扩大列搜索范围到100列 for col_idx in range(0, 100): # 0-99列 try: cell_value = ws.cell_value(row_idx, col_idx) if not cell_value: continue # 尝试所有匹配模式 cell_str = str(cell_value) for pattern in patterns: if re.search(pattern, cell_str, re.IGNORECASE): # 找到列名行后,尝试确定信号列 signal_col = None # 在同行中查找信号列 for col_idx2 in range(0, 100): # 0-99列 try: cell2_value = ws.cell_value(row_idx, col_idx2) if not cell2_value: continue cell2_str = str(cell2_value) if re.search(pattern, cell2_str, re.IGNORECASE): signal_col = col_idx2 break except: continue # 保存缓存 if signal_col is not None: header_info = {"header_row": row_idx, "signal_col": signal_col} self.save_header_cache(file_path, header_info) return row_idx, signal_col except: continue except Exception as e: self.status_var.set(f"查找列名行出错: {str(e)}") return None, None def extract_row_content(self, ws, row_idx, header_row, max_cols=100): """高效提取行内容(最多到100列)""" content = [] # 扩展到100列 for col_idx in range(1, max_cols + 1): try: cell = ws.cell(row=row_idx, column=col_idx) if cell.value is not None and str(cell.value).strip() != '': # 使用列名缓存 col_key = f"{header_row}-{col_idx}" if col_key in self.column_cache: col_name = self.column_cache[col_key] else: col_name_cell = ws.cell(row=header_row, column=col_idx) col_name = col_name_cell.value if col_name_cell.value else f"列{get_column_letter(col_idx)}" self.column_cache[col_key] = col_name content.append(f"{col_name}: {str(cell.value).strip()}") except: continue return "\n".join(content) def start_search_thread(self): """启动搜索线程""" if self.search_running: return self.search_running = True self.stop_requested = False self.max_workers = int(self.thread_var.get()) threading.Thread(target=self.search_files, daemon=True).start() def stop_search(self): """停止搜索""" self.stop_requested = True self.status_var.set("正在停止搜索...") def search_files(self): """在文件中搜索内容(优化特殊文件处理)""" search_term = self.search_text.get().strip() if not search_term: self.status_var.set("请输入搜索内容") self.search_running = False return if not self.files: self.status_var.set("请先选择文件夹") self.search_running = False return # 重置结果和UI self.results = {} for item in self.tree.get_children(): self.tree.delete(item) total_files = len(self.files) processed_files = 0 found_signals = 0 # 使用线程池处理文件 with ThreadPoolExecutor(max_workers=self.max_workers) as executor: futures = {} for file_path in self.files: if self.stop_requested: break future = executor.submit(self.process_file, file_path, search_term) futures[future] = file_path # 处理完成的任务 for future in as_completed(futures): if self.stop_requested: break file_path = futures[future] try: found = future.result() found_signals += found processed_files += 1 # 更新进度 progress = int(processed_files / total_files * 100) self.progress["value"] = progress self.status_var.set(f"已处理 {processed_files}/{total_files} 个文件") self.root.update_idletasks() except Exception as e: self.status_var.set(f"处理文件 {os.path.basename(file_path)} 出错: {str(e)}") # 更新结果 if self.stop_requested: self.status_var.set(f"搜索已停止,已处理 {processed_files}/{total_files} 个文件") elif found_signals == 0: self.status_var.set(f"未找到包含 '{search_term}' 的信号") else: self.status_var.set(f"找到 {len(self.results)} 个匹配信号,共 {found_signals} 处匹配") self.update_text_view() self.progress["value"] = 0 self.search_running = False gc.collect() # 强制垃圾回收释放内存 def process_file(self, file_path, search_term): """处理单个文件(增强异常处理)""" found = 0 try: # 获取列名行和信号列 header_row, signal_col = self.find_header_row(file_path) # 如果自动查找失败,尝试手动模式 if header_row is None or signal_col is None: self.status_var.set(f"文件 {os.path.basename(file_path)} 未找到列名行,尝试手动查找...") header_row, signal_col = self.manual_find_header_row(file_path) if header_row is None or signal_col is None: self.status_var.set(f"文件 {os.path.basename(file_path)} 无法确定列名行,已跳过") return found # 根据文件类型处理 if file_path.lower().endswith((".xlsx", ".xlsm")): found = self.process_openpyxl_file(file_path, search_term, header_row, signal_col) elif file_path.lower().endswith(".xls"): found = self.process_xlrd_file(file_path, search_term, header_row, signal_col) except Exception as e: self.status_var.set(f"处理文件 {os.path.basename(file_path)} 出错: {str(e)}") return found def manual_find_header_row(self, file_path): """手动查找列名行(当自动查找失败时使用)""" try: # 尝试打开文件 if file_path.lower().endswith((".xlsx", ".xlsm")): wb = openpyxl.load_workbook(file_path, read_only=True, data_only=True) ws = wb.active # 扫描整个工作表(最多1000行) for row_idx in range(1, 1001): for col_idx in range(1, 101): try: cell = ws.cell(row=row_idx, column=col_idx) if cell.value and "データ" in str(cell.value): # 找到可能的列名行 return row_idx, col_idx except: continue wb.close() elif file_path.lower().endswith(".xls"): wb = xlrd.open_workbook(file_path) ws = wb.sheet_by_index(0) # 扫描整个工作表(最多1000行) for row_idx in range(0, 1000): for col_idx in range(0, 100): try: cell_value = ws.cell_value(row_idx, col_idx) if cell_value and "データ" in str(cell_value): # 找到可能的列名行 return row_idx, col_idx except: continue except: pass return None, None def get_file_cache_key(self, file_path, header_row, signal_col): """生成唯一的文件缓存键""" file_hash = self.get_file_hash(file_path) return f"{file_hash}_{header_row}_{signal_col}" def process_openpyxl_file(self, file_path, search_term, header_row, signal_col): found = 0 try: cache_key = self.get_file_cache_key(file_path, header_row, signal_col) if cache_key in self.file_cache: signal_data = self.file_cache[cache_key] else: wb = openpyxl.load_workbook(file_path, data_only=True, read_only=True) ws = wb.active min_row = header_row + 1 max_row = min(ws.max_row, min_row + 5000) # 只读取信号列和行号 signal_data = [] for row_idx in range(min_row, max_row + 1): cell = ws.cell(row=row_idx, column=signal_col) if cell.value: signal_data.append((row_idx, str(cell.value))) self.file_cache[cache_key] = signal_data wb.close() # 收集匹配的行号 match_rows = [] for row_idx, value in signal_data: if search_term.lower() in value.lower(): match_rows.append(row_idx) # 批量处理匹配行 if match_rows: wb = openpyxl.load_workbook(file_path, data_only=True, read_only=True) ws = wb.active for row_idx in match_rows: # 只提取必要的列:信号名和关键元数据 signal = ws.cell(row=row_idx, column=signal_col).value data_name = ws.cell(row=row_idx, column=signal_col + 1).value if signal_col + 1 <= 100 else "" data_type = ws.cell(row=row_idx, column=signal_col + 2).value if signal_col + 2 <= 100 else "" row_content = f"信号名: {signal}\n数据名: {data_name}\n数据类型: {data_type}" # 添加到结果集 if signal not in self.results: self.results[signal] = {} short_name = os.path.basename(file_path) self.results[signal][short_name] = row_content # 添加到表格 summary = row_content[:50] + "..." if len(row_content) > 50 else row_content self.tree.insert("", tk.END, values=(signal, short_name, summary)) found += 1 wb.close() except Exception as e: self.status_var.set(f"处理文件 {os.path.basename(file_path)} 出错: {str(e)}") return found def process_xlrd_file(self, file_path, search_term, header_row, signal_col): """高效处理xls文件(增强异常处理)""" found = 0 try: # 使用缓存 file_key = f"{file_path}_{header_row}_{signal_col}" if file_key in self.file_cache: ws = self.file_cache[file_key] else: wb = xlrd.open_workbook(file_path) ws = wb.sheet_by_index(0) self.file_cache[file_key] = ws # 优化搜索范围 min_row = header_row + 1 max_row = min(ws.nrows, min_row + 5000) # 限制搜索行数 # 批量读取信号列数据 signal_values = ws.col_values(signal_col, min_row, max_row) # 搜索匹配的信号 for idx, value in enumerate(signal_values): if self.stop_requested: break if not value: continue cell_str = str(value) if search_term.lower() in cell_str.lower(): signal = cell_str row_idx = min_row + idx # 提取整行内容 row_content = self.extract_xlrd_row_content(ws, row_idx, header_row) # 添加到结果集 if signal not in self.results: self.results[signal] = {} # 使用短文件名避免特殊字符问题 short_name = os.path.basename(file_path) self.results[signal][short_name] = row_content # 添加到表格(显示前50字符摘要) summary = row_content[:50] + "..." if len(row_content) > 50 else row_content self.tree.insert("", tk.END, values=(signal, short_name, summary)) found += 1 except Exception as e: self.status_var.set(f"处理文件 {os.path.basename(file_path)} 出错: {str(e)}") return found def extract_xlrd_row_content(self, ws, row_idx, header_row): """为xls文件高效提取行内容""" content = [] try: row_values = ws.row_values(row_idx) except: return "" # 扩展到100列 for col_idx in range(min(len(row_values), 100)): try: cell_value = row_values[col_idx] if cell_value is not None and str(cell_value).strip() != '': # 使用列名缓存 col_key = f"{header_row}-{col_idx}" if col_key in self.column_cache: col_name = self.column_cache[col_key] else: try: col_name = ws.cell_value(header_row, col_idx) if not col_name: col_name = f"列{col_idx+1}" except: col_name = f"列{col_idx+1}" self.column_cache[col_key] = col_name content.append(f"{col_name}: {str(cell_value).strip()}") except: continue return "\n".join(content) def update_text_view(self): """更新文本对比视图""" # 清除现有文本区域 for widget in self.compare_container.winfo_children(): widget.destroy() if not self.results: return # 获取第一个信号作为默认显示 first_signal = next(iter(self.results.keys())) self.display_signal_comparison(first_signal) def on_table_select(self, event): """表格选择事件处理""" selected = self.tree.selection() if not selected: return item = self.tree.item(selected[0]) signal = item["values"][0] self.display_signal_comparison(signal) def display_signal_comparison(self, signal): """显示指定信号的对比""" # 清除现有文本区域 for widget in self.compare_container.winfo_children(): widget.destroy() if signal not in self.results: return signal_data = self.results[signal] files = list(signal_data.keys()) contents = list(signal_data.values()) # 创建列框架 for i, (file, content) in enumerate(zip(files, contents)): col_frame = ttk.Frame(self.compare_container) col_frame.grid(row=0, column=i, sticky="nsew", padx=5, pady=5) self.compare_container.columnconfigure(i, weight=1) # 文件名标签 file_label = ttk.Label(col_frame, text=file, font=("Arial", 10, "bold")) file_label.pack(fill=tk.X, pady=(0, 5)) # 文本区域 text_area = scrolledtext.ScrolledText(col_frame, wrap=tk.WORD, width=30, height=15) text_area.insert(tk.INSERT, content) text_area.configure(state="disabled") text_area.pack(fill=tk.BOTH, expand=True) # 保存引用 self.text_panes[file] = text_area def highlight_differences(self): """高亮显示文本差异""" if not self.text_panes: return # 获取所有行内容 all_contents = [] for text_area in self.text_panes.values(): text_area.configure(state="normal") text = text_area.get("1.0", tk.END).strip() text_area.configure(state="disabled") all_contents.append(text) # 如果所有内容相同,则不需要高亮 if len(set(all_contents)) == 1: self.status_var.set("所有文件行内容完全一致") return # 使用第一个文件作为基准 base_text = all_contents[0] # 对比并高亮差异 for i, (file, text_area) in enumerate(self.text_panes.items()): if i == 0: # 基准文件不需要处理 continue text_area.configure(state="normal") text_area.tag_configure("diff", background=self.highlight_color) # 清除之前的高亮 text_area.tag_remove("diff", "1.0", tk.END) # 获取当前文本 compare_text = text_area.get("1.0", tk.END).strip() # 使用序列匹配器查找差异 s = SequenceMatcher(None, base_text, compare_text) # 高亮差异部分 for tag in s.get_opcodes(): opcode = tag[0] start = tag[3] end = tag[4] if opcode != "equal": # 添加高亮标签 text_area.tag_add("diff", f"1.0+{start}c", f"1.0+{end}c") text_area.configure(state="disabled") self.status_var.set("差异已高亮显示") def choose_color(self): """选择高亮颜色""" color = askcolor(title="选择高亮颜色", initialcolor=self.highlight_color) if color[1]: self.highlight_color = color[1] self.color_btn.configure(bg=self.highlight_color) def export_report(self): """导出差异报告""" if not self.results: messagebox.showwarning("警告", "没有可导出的结果") return try: # 创建报告数据结构 report_data = [] for signal, files_data in self.results.items(): for file, content in files_data.items(): report_data.append({ "信号": signal, "文件": file, "行内容": content }) # 转换为DataFrame df = pd.DataFrame(report_data) # 保存到Excel save_path = filedialog.asksaveasfilename( defaultextension=".xlsx", filetypes=[("Excel文件", "*.xlsx")], title="保存差异报告" ) if save_path: df.to_excel(save_path, index=False) self.status_var.set(f"报告已保存到: {save_path}") except Exception as e: messagebox.showerror("错误", f"导出报告失败: {str(e)}") def clear_cache(self): """清除缓存""" try: for file in os.listdir(self.cache_dir): if file.endswith(".cache"): os.remove(os.path.join(self.cache_dir, file)) self.file_cache = {} self.column_cache = {} self.status_var.set("缓存已清除") except Exception as e: self.status_var.set(f"清除缓存失败: {str(e)}") def manual_column_select(self): """手动指定列名位置""" if not self.files: messagebox.showinfo("提示", "请先选择文件夹") return # 创建手动选择窗口 manual_window = tk.Toplevel(self.root) manual_window.title("手动指定列名位置") manual_window.geometry("400x300") # 文件选择 ttk.Label(manual_window, text="选择文件:").pack(pady=(10, 5)) file_var = tk.StringVar() file_combo = ttk.Combobox(manual_window, textvariable=file_var, values=[os.path.basename(f) for f in self.files]) file_combo.pack(fill=tk.X, padx=20, pady=5) file_combo.current(0) # 行号输入 ttk.Label(manual_window, text="列名行号:").pack(pady=(10, 5)) row_var = tk.StringVar(value="1") row_entry = ttk.Entry(manual_window, textvariable=row_var) row_entry.pack(fill=tk.X, padx=20, pady=5) # 列号输入 ttk.Label(manual_window, text="信号列号:").pack(pady=(10, 5)) col_var = tk.StringVar(value="1") col_entry = ttk.Entry(manual_window, textvariable=col_var) col_entry.pack(fill=tk.X, padx=20, pady=5) # 确认按钮 def confirm_selection(): try: file_idx = file_combo.current() file_path = self.files[file_idx] header_row = int(row_var.get()) signal_col = int(col_var.get()) # 保存到缓存 header_info = {"header_row": header_row, "signal_col": signal_col} self.save_header_cache(file_path, header_info) messagebox.showinfo("成功", f"已为 {os.path.basename(file_path)} 设置列名位置:行{header_row} 列{signal_col}") manual_window.destroy() except Exception as e: messagebox.showerror("错误", f"无效输入: {str(e)}") ttk.Button(manual_window, text="确认", command=confirm_selection).pack(pady=20) if __name__ == "__main__": root = tk.Tk() app = EnhancedSignalComparator(root) root.mainloop() 1、现在更改之后,读取的速度更慢了,在大文件的搜索中,一直在已处理中,速度太慢了,甚至没有改之前快,请搜索网络上类似的处理,优化处理流程