活动介绍
file-type

Oracle SYS_CONNECT_BY_PATH函数详解与示例

下载需积分: 12 | 9KB | 更新于2024-08-05 | 128 浏览量 | 0 下载量 举报 收藏
download 立即下载
Oracle的SYS_CONNECT_BY_PATH函数是数据库系统中的一个非常实用工具,主要用于处理层级关系数据,尤其是在处理层级查询和行转列(pivot)操作时。该函数的主要功能是通过递归关系将数据按照指定的路径连接成字符串,这对于组织和展示层级分明的数据结构非常有用。 在SQL查询中,使用SYS_CONNECT_BY_PATH时,需要遵循特定的语法格式。其基本形式是:SYS_CONNECT_BY_PATH(column_name, path_delimiter),其中column_name是你想要连接的字段名,而path_delimiter则是用于分隔每个层级的字符,通常是逗号或冒号。这个函数通常配合CONNECT BY语句一起使用,尤其是START WITH和CONNECT BY PRIOR子句。 例如,你提供的部分代码片段展示了如何在一个Oracle 9i的环境中使用SYS_CONNECT_BY_PATH。首先,确保数据库连接设置正确,并且在WHERE子句中设置了适当的条件,如筛选出具有特定父ID(pbcode)的记录,同时排除了历史记录(ledate小于当前日期)。 在查询中,通过如下步骤执行: 1. 使用START WITH语句定义初始记录,这里是m.bcode = 00000004,这可能是某个顶级部门的代码。 2. 连接条件CONNECT BY PRIOR指定了当前记录的父记录,即m.pbcode等于上一行的bcode,这样就构建了一个递归关系。 3. 使用DISTINCT关键字确保结果中的路径字符串不重复,因为SYS_CONNECT_BY_PATH会返回所有可能的路径组合。 4. 在调用SYS_CONNECT_BY_PATH时,如果路径字符串中包含逗号,必须使用REPLACE函数替换为指定的分隔符,如这里的',',以确保结果的准确性。 通过这样的查询,你可以得到一个按层级关系组织的deptname列表,每个部门的完整路径由其所有上级部门的名字连接而成。这对于生成组织架构图、分析部门树状结构或进行报告等场景非常有帮助。 理解并熟练运用Oracle的SYS_CONNECT_BY_PATH函数对于处理复杂的层次结构数据至关重要,它能够帮助你有效地提取和展示数据间的父子关系,提高数据分析和报告的效率。

相关推荐

filetype

import sys import json import csv import os import time import re import requests import concurrent.futures from urllib.parse import urlparse, urljoin from requests.exceptions import RequestException from PyQt5.QtWidgets import ( QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QTabWidget, QLabel, QLineEdit, QPushButton, QTextEdit, QTableWidget, QTableWidgetItem, QTreeWidget, QTreeWidgetItem, QHeaderView, QFileDialog, QMessageBox, QSplitter, QSpinBox, QAction, QDialog, QFormLayout, QDialogButtonBox, QProgressBar, QGroupBox ) from PyQt5.QtCore import Qt, QThread, pyqtSignal from PyQt5.QtGui import QIcon # 忽略SSL证书验证的警告信息 requests.packages.urllib3.disable_warnings() class FingerprintManager: def __init__(self): self.fingerprints = [] self.default_file_path = os.path.join(os.path.expanduser("~"), "cms_fingerprints.json") if not self.load_from_default_file(): self.load_default_fingerprints() def load_default_fingerprints(self): # 优化默认指纹库,确保正则表达式正确 self.fingerprints = [ { "cms": "WordPress", "version": "", "confidence": 0, "http_headers": [ {"header": "X-Powered-By", "pattern": "PHP/.*", "score": 10, "type": "general"} ], "html_content": [ {"pattern": "<meta name=\"generator\" content=\"WordPress ([\\d.]+)\"", "score": 150, "type": "core", "version_group": 1}, {"pattern": "wp-content/themes/([^/]+)", "score": 80, "type": "specific"}, {"pattern": "wp-includes/js/wp-util.js", "score": 90, "type": "specific"} ], "url_paths": [ {"path": "/wp-admin", "score": 80, "type": "specific"}, {"path": "/wp-login.php", "score": 100, "type": "core"} ] }, { "cms": "示例站点", "version": "", "confidence": 0, "html_content": [ {"pattern": "恭喜, 站点创建成功!", "score": 120, "type": "core"}, {"pattern": "

这是默认index.html,本页面由系统自动生成

", "score": 100, "type": "core"} ], "url_paths": [] }, { "cms": "Nginx", "version": "", "confidence": 0, "http_headers": [ {"header": "Server", "pattern": "nginx/([\\d.]+)", "score": 90, "type": "core", "version_group": 1} ], "html_content": [ {"pattern": "If you see this page, the nginx web server is successfully installed", "score": 120, "type": "core"} ] }, { "cms": "Drupal", "version": "", "html_content": [ {"pattern": "<meta name=\"generator\" content=\"Drupal ([\\d.]+)\"", "score": 150, "type": "core", "version_group": 1}, {"pattern": "sites/default/files", "score": 70, "type": "specific"} ], "url_paths": [ {"path": "/sites/all", "score": 80, "type": "specific"} ] }, { "cms": "ThinkPHP", "version": "", "html_content": [ {"pattern": "think\\\\Exception", "score": 100, "type": "core"}, {"pattern": "app\\\\controller", "score": 80, "type": "specific"} ] }, { "cms": "Yii", "version": "", "html_content": [ {"pattern": "yii\\\\base\\\\Exception", "score": 100, "type": "core"}, {"pattern": "yii\\\\web\\\\HttpException", "score": 90, "type": "specific"} ] }, { "cms": "Phalcon", "version": "", "html_content": [ {"pattern": "Phalcon\\\\Exception", "score": 100, "type": "core"} ] }, { "cms": "FuelPHP", "version": "", "html_content": [ {"pattern": "Fuel\\\\Exception", "score": 100, "type": "core"} ] }, { "cms": "Habari", "version": "", "html_content": [ {"pattern": "Habari\\\\Core\\\\Exception", "score": 100, "type": "core"} ] }, { "cms": "帝国CMS", "version": "", "html_content": [ {"pattern": "ecmsinfo\\(", "score": 100, "type": "core"} ] } ] self.save_to_default_file() def load_from_default_file(self): try: if os.path.exists(self.default_file_path): with open(self.default_file_path, 'r', encoding='utf-8') as f: loaded_data = json.load(f) valid_fingerprints = [] for fp in loaded_data: if self._is_valid_fingerprint(fp): cleaned_fp = self._clean_fingerprint(fp) valid_fingerprints.append(cleaned_fp) else: print(f"跳过无效指纹: {fp}") self.fingerprints = valid_fingerprints return True return False except Exception as e: print(f"从默认文件加载指纹失败: {e}") return False def _clean_fingerprint(self, fp): """清理指纹中的正则表达式,修复常见错误""" for header in fp.get('http_headers', []): if 'pattern' in header: header['pattern'] = self._fix_regex_pattern(header['pattern']) for html in fp.get('html_content', []): if 'pattern' in html: html['pattern'] = self._fix_regex_pattern(html['pattern']) for url in fp.get('url_paths', []): if 'pattern' in url: url['pattern'] = self._fix_regex_pattern(url['pattern']) return fp def _fix_regex_pattern(self, pattern): """修复常见的正则表达式错误""" if not pattern: return "" # 修复未转义的反斜杠 fixed = re.sub(r'(?<!\\)\\(?!["\\/])', r'\\\\', pattern) # 修复未闭合的括号 open_count = fixed.count('(') close_count = fixed.count(')') if open_count > close_count: fixed += ')' * (open_count - close_count) # 修复不完整的字符类 if '[' in fixed and ']' not in fixed: fixed += ']' return fixed def _is_valid_fingerprint(self, fp): required_fields = ["cms"] for field in required_fields: if field not in fp: return False if not fp["cms"].strip(): return False for key in ["http_headers", "html_content", "url_paths"]: if key not in fp: fp[key] = [] return True def save_to_default_file(self): try: dir_path = os.path.dirname(self.default_file_path) if not os.path.exists(dir_path): os.makedirs(dir_path) with open(self.default_file_path, 'w', encoding='utf-8') as f: json.dump(self.fingerprints, f, indent=4, ensure_ascii=False) return True except Exception as e: print(f"保存指纹到默认文件失败: {e}") return False def add_fingerprint(self, fingerprint): if self._is_valid_fingerprint(fingerprint): cleaned = self._clean_fingerprint(fingerprint) self.fingerprints.append(cleaned) self.save_to_default_file() return True print(f"无法添加无效指纹: {fingerprint}") return False def remove_fingerprint(self, index): if 0 <= index < len(self.fingerprints): self.fingerprints.pop(index) self.save_to_default_file() def update_fingerprint(self, index, fingerprint): if 0 <= index < len(self.fingerprints) and self._is_valid_fingerprint(fingerprint): cleaned = self._clean_fingerprint(fingerprint) self.fingerprints[index] = cleaned self.save_to_default_file() return True return False def clear_fingerprints(self): self.fingerprints = [] self.save_to_default_file() return True def restore_default_fingerprints(self): self.load_default_fingerprints() return True def get_fingerprints(self): return self.fingerprints def export_fingerprints(self, filename): try: with open(filename, 'w', encoding='utf-8') as f: json.dump(self.fingerprints, f, indent=4, ensure_ascii=False) return True except Exception as e: print(f"导出失败: {e}") return False def import_fingerprints(self, filename): try: with open(filename, 'r', encoding='utf-8') as f: imported_data = json.load(f) valid_fingerprints = [] for fp in imported_data: if self._is_valid_fingerprint(fp): cleaned = self._clean_fingerprint(fp) valid_fingerprints.append(cleaned) else: print(f"导入时跳过无效指纹: {fp}") if valid_fingerprints: self.fingerprints = valid_fingerprints self.save_to_default_file() return True print("导入的指纹全部无效") return False except Exception as e: print(f"导入失败: {e}") return False class DetectionWorker(QThread): progress_signal = pyqtSignal(int, int, str) result_signal = pyqtSignal(dict) log_signal = pyqtSignal(str) finished_signal = pyqtSignal() def __init__(self, urls, fingerprints, max_threads=10, retry_count=2): super().__init__() self.urls = urls self.fingerprints = fingerprints self.max_threads = max_threads self.running = True self.retry_count = retry_count self.timeout = 15 # 超时时间(秒) # 缓存响应以提高性能 self.response_cache = {} def run(self): self.log_signal.emit("开始检测...") total = len(self.urls) for i, url in enumerate(self.urls): if not self.running: break self.progress_signal.emit(i+1, total, url) result = self.detect_cms(url) self.result_signal.emit(result) self.log_signal.emit("检测完成!") self.finished_signal.emit() def stop(self): self.running = False def preprocess_html(self, html): """优化HTML预处理:保留标签结构,不过度压缩""" processed = re.sub(r'\n\s+', '\n', html) processed = re.sub(r'>\s+<', '><', processed) return processed.strip() def escape_special_chars(self, pattern): """安全转义正则特殊字符""" if not pattern: return "" safe_pattern = re.sub(r'\\(?![\\.*+?^${}()|[\]sSdDwWtnbfvr])', r'\\\\', pattern) return safe_pattern def validate_regex(self, pattern): """验证正则表达式是否有效""" if not pattern: return True, pattern try: re.compile(pattern) return True, pattern except re.error as e: fixed = pattern if "bad escape" in str(e): fixed = re.sub(r'(?<!\\)\\(?!["\\/])', r'\\\\', pattern) elif "unterminated subpattern" in str(e): open_count = pattern.count('(') close_count = pattern.count(')') if open_count > close_count: fixed = pattern + ')' * (open_count - close_count) try: re.compile(fixed) self.log_signal.emit(f"自动修复正则表达式: {pattern} -> {fixed}") return True, fixed except re.error: return False, pattern def extract_version(self, content, pattern, group_idx): """从匹配结果中提取版本号""" if not pattern or group_idx is None: return "" try: match = re.search(pattern, content, re.IGNORECASE) if match and len(match.groups()) >= group_idx: return match.group(group_idx).strip() except re.error as e: self.log_signal.emit(f"版本提取正则错误 {pattern}: {str(e)}") return "" def fetch_url_content(self, url): """带重试机制的URL内容获取""" # 检查缓存 if url in self.response_cache: return self.response_cache[url] headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9' } for attempt in range(self.retry_count + 1): try: response = requests.get( url, headers=headers, allow_redirects=True, verify=False, timeout=self.timeout ) response.encoding = response.apparent_encoding # 缓存响应 self.response_cache[url] = response return response except RequestException as e: self.log_signal.emit(f"请求尝试 {attempt+1} 失败: {str(e)}") if attempt >= self.retry_count: return None time.sleep(1) return None def build_full_url(self, base_url, path): """构建完整的URL""" if not path.startswith('/'): path = '/' + path parsed = urlparse(base_url) return f"{parsed.scheme}://{parsed.netloc}{path}" def check_url_path(self, base_url, path, pattern, item_score, weight): """检查URL路径特征 - 主动访问并验证""" full_url = self.build_full_url(base_url, path) feature_desc = f"URL路径: {full_url}" # 尝试获取响应 response = self.fetch_url_content(full_url) if response and response.status_code == 200: # 如果有正则模式,检查内容 if pattern: is_valid, fixed_pattern = self.validate_regex(pattern) if is_valid: try: if re.search(fixed_pattern, response.text, re.IGNORECASE): return True, feature_desc, item_score * weight except re.error as e: self.log_signal.emit(f"URL路径正则错误: {str(e)}") # 如果没有正则模式,只要状态200就算匹配 else: return True, feature_desc, item_score * weight return False, feature_desc, 0 def detect_cms(self, url): original_url = url if not url.startswith(('http://', 'https://')): urls_to_try = [f'http://{url}', f'https://{url}'] else: urls_to_try = [url] response = None for test_url in urls_to_try: response = self.fetch_url_content(test_url) if response: url = test_url break if not response: return { "url": original_url, "status": -1, "results": [{"cms": "无法访问", "version": "", "confidence": 0, "judgment_basis": ["无法建立连接"]}], "primary": {"cms": "无法访问", "version": "", "confidence": 0} } status_code = response.status_code headers = response.headers html_content = response.text final_url = response.url processed_html = self.preprocess_html(html_content) self.log_signal.emit(f"获取内容: {final_url} (状态码: {status_code})") cms_matches = [] min_score_threshold = 50 for cms in self.fingerprints: total_score = 0 version = "" # 记录详细的判断依据 judgment_basis = [] matched_features = [] unmatched_features = [] # 1. 匹配HTTP头特征 for header_item in cms.get('http_headers', []): header_name = header_item.get('header', '').lower() pattern = header_item.get('pattern', '') item_score = header_item.get('score', 0) feature_type = header_item.get('type', 'general') if not header_name or not pattern: continue is_valid, fixed_pattern = self.validate_regex(pattern) if not is_valid: self.log_signal.emit(f"跳过无效HTTP头正则: {pattern}") continue weight = 2 if feature_type == 'core' else 1 adjusted_score = item_score * weight feature_desc = f"HTTP头[{header_name}]匹配模式[{fixed_pattern}]" if header_name in headers: header_value = str(headers[header_name]) try: if re.search(fixed_pattern, header_value, re.IGNORECASE): total_score += adjusted_score matched_features.append(f"{feature_desc} (+{adjusted_score})") judgment_basis.append(f"✓ {feature_desc},匹配成功,加{adjusted_score}分") if 'version_group' in header_item: version = self.extract_version( header_value, fixed_pattern, header_item['version_group'] ) or version else: unmatched_features.append(f"{feature_desc} (未匹配)") judgment_basis.append(f"✗ {feature_desc},未匹配") except re.error as e: self.log_signal.emit(f"HTTP头正则执行错误 {fixed_pattern}: {str(e)}") else: unmatched_features.append(f"{feature_desc} (Header不存在)") judgment_basis.append(f"✗ {feature_desc},Header不存在") # 2. 匹配HTML内容特征 for html_item in cms.get('html_content', []): pattern = html_item.get('pattern', '').strip() item_score = html_item.get('score', 0) feature_type = html_item.get('type', 'general') if not pattern: continue is_valid, fixed_pattern = self.validate_regex(pattern) if not is_valid: self.log_signal.emit(f"跳过无效HTML正则: {pattern}") continue weight = 2.5 if feature_type == 'core' else (1.5 if feature_type == 'specific' else 1) adjusted_score = int(item_score * weight) feature_desc = f"HTML内容匹配模式[{fixed_pattern[:50]}{'...' if len(fixed_pattern)>50 else ''}]" try: if '<' in fixed_pattern and '>' in fixed_pattern: escaped_pattern = self.escape_special_chars(fixed_pattern) flexible_pattern = re.sub(r'\s+', r'\\s+', escaped_pattern) match_found = re.search(flexible_pattern, processed_html, re.IGNORECASE | re.DOTALL) else: match_found = re.search(fixed_pattern, processed_html, re.IGNORECASE | re.DOTALL) if match_found: total_score += adjusted_score matched_features.append(f"{feature_desc} (+{adjusted_score})") judgment_basis.append(f"✓ {feature_desc},匹配成功,加{adjusted_score}分") if 'version_group' in html_item: version = self.extract_version( processed_html, fixed_pattern, html_item['version_group'] ) or version else: unmatched_features.append(f"{feature_desc} (未匹配)") judgment_basis.append(f"✗ {feature_desc},未匹配") except re.error as e: self.log_signal.emit(f"HTML正则执行错误 {fixed_pattern}: {str(e)}") # 3. 匹配URL路径特征 - 使用线程池并发处理 url_path_tasks = [] with concurrent.futures.ThreadPoolExecutor(max_workers=min(5, self.max_threads)) as executor: for url_item in cms.get('url_paths', []): path = url_item.get('path', '') pattern = url_item.get('pattern', '') item_score = url_item.get('score', 0) feature_type = url_item.get('type', 'general') if not path: continue weight = 2 if feature_type == 'core' else 1 adjusted_score = item_score * weight # 提交任务到线程池 task = executor.submit( self.check_url_path, final_url, path, pattern, item_score, weight ) url_path_tasks.append((task, adjusted_score, path)) # 处理URL路径特征结果 for task, adjusted_score, path in url_path_tasks: try: matched, desc, score = task.result() if matched: total_score += score matched_features.append(f"{desc} (+{score})") judgment_basis.append(f"✓ {desc},访问成功,加{score}分") else: unmatched_features.append(f"{desc} (访问失败或未匹配)") judgment_basis.append(f"✗ {desc},访问失败或未匹配") except Exception as e: self.log_signal.emit(f"URL路径检查出错: {str(e)}") # 计算置信度 max_possible = sum( (h.get('score', 0) * (2 if h.get('type') == 'core' else 1)) for h in cms.get('http_headers', []) ) + sum( (h.get('score', 0) * (2.5 if h.get('type') == 'core' else 1)) for h in cms.get('html_content', []) ) + sum( (u.get('score', 0) * (2 if u.get('type') == 'core' else 1)) for u in cms.get('url_paths', []) ) confidence = min(100, int((total_score / max_possible) * 100)) if max_possible > 0 else 0 # 汇总判断依据 if matched_features: judgment_basis.insert(0, f"匹配到{len(matched_features)}个特征,总分{total_score}") else: judgment_basis.insert(0, f"未匹配到任何特征,总分0") if total_score >= min_score_threshold: cms_matches.append({ "cms": cms['cms'], "version": version or cms.get('version', ''), "score": total_score, "confidence": confidence, "judgment_basis": judgment_basis, # 存储详细判断依据 "features": matched_features }) cms_matches.sort(key=lambda x: (-x['confidence'], -x['score'])) filtered_results = [] if cms_matches: max_score = cms_matches[0]['score'] for match in cms_matches: if match['score'] >= max_score * 0.8 or match['confidence'] >= 70: filtered_results.append(match) # 如果没有匹配到任何结果,添加一个默认结果并说明原因 if not filtered_results: filtered_results.append({ "cms": "未知", "version": "", "confidence": 0, "judgment_basis": ["未匹配到任何已知CMS的特征", "请检查指纹库是否完整或添加新指纹"] }) primary_result = filtered_results[0] if filtered_results else { "cms": "未知", "version": "", "confidence": 0 } return { "url": final_url, "status": status_code, "results": filtered_results, "primary": primary_result } class AddFingerprintDialog(QDialog): def __init__(self, parent=None, fingerprint=None): super().__init__(parent) self.fingerprint = fingerprint self.setWindowTitle("编辑指纹" if fingerprint else "添加指纹") self.setGeometry(300, 300, 600, 500) self.init_ui() def init_ui(self): layout = QVBoxLayout() form_layout = QFormLayout() self.cms_input = QLineEdit() self.version_input = QLineEdit() form_layout.addRow("CMS名称*:", self.cms_input) form_layout.addRow("默认版本:", self.version_input) regex_help = QLabel("正则表达式提示: 反斜杠需要输入两次(\\\\),特殊字符(如. * + ?)需要转义") regex_help.setStyleSheet("color: #2980b9; font-size: 12px;") form_layout.addRow(regex_help) type_note = QLabel("特征类型说明: core(核心特征,权重高) > specific(特定特征) > general(通用特征)") type_note.setStyleSheet("color: #666; font-size: 12px;") form_layout.addRow(type_note) layout.addLayout(form_layout) # HTTP头特征表格 http_group = QWidget() http_layout = QVBoxLayout(http_group) http_layout.addWidget(QLabel("HTTP头特征:")) self.http_table = QTableWidget(0, 4) self.http_table.setHorizontalHeaderLabels(["Header", "Pattern", "Score", "Type(core/general)"]) self.http_table.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch) http_btn_layout = QHBoxLayout() add_http_btn = QPushButton("添加") add_http_btn.clicked.connect(lambda: self.add_row(self.http_table, ["", "", "50", "general"])) remove_http_btn = QPushButton("移除") remove_http_btn.clicked.connect(lambda: self.remove_row(self.http_table)) http_btn_layout.addWidget(add_http_btn) http_btn_layout.addWidget(remove_http_btn) http_layout.addWidget(self.http_table) http_layout.addLayout(http_btn_layout) layout.addWidget(http_group) # HTML内容特征表格 html_group = QWidget() html_layout = QVBoxLayout(html_group) html_layout.addWidget(QLabel("HTML内容特征:")) self.html_table = QTableWidget(0, 4) self.html_table.setHorizontalHeaderLabels(["Pattern", "Score", "Type(core/specific)", "版本提取组(可选)"]) self.html_table.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch) html_btn_layout = QHBoxLayout() add_html_btn = QPushButton("添加") add_html_btn.clicked.connect(lambda: self.add_row(self.html_table, ["", "80", "specific", ""])) remove_html_btn = QPushButton("移除") remove_html_btn.clicked.connect(lambda: self.remove_row(self.html_table)) html_btn_layout.addWidget(add_html_btn) html_btn_layout.addWidget(remove_html_btn) html_layout.addWidget(self.html_table) html_layout.addLayout(html_btn_layout) layout.addWidget(html_group) # URL路径特征表格 url_group = QWidget() url_layout = QVBoxLayout(url_group) url_layout.addWidget(QLabel("URL路径特征 (将主动访问这些路径):")) self.url_table = QTableWidget(0, 4) self.url_table.setHorizontalHeaderLabels(["Path", "Pattern(可选)", "Score", "Type(core/specific)"]) self.url_table.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch) url_btn_layout = QHBoxLayout() add_url_btn = QPushButton("添加") add_url_btn.clicked.connect(lambda: self.add_row(self.url_table, ["", "", "60", "specific"])) remove_url_btn = QPushButton("移除") remove_url_btn.clicked.connect(lambda: self.remove_row(self.url_table)) url_btn_layout.addWidget(add_url_btn) url_btn_layout.addWidget(remove_url_btn) url_layout.addWidget(self.url_table) url_layout.addLayout(url_btn_layout) layout.addWidget(url_group) # 测试正则按钮 test_btn = QPushButton("测试选中的正则表达式") test_btn.clicked.connect(self.test_selected_regex) layout.addWidget(test_btn) # 确认按钮 btn_box = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel) btn_box.accepted.connect(self.accept) btn_box.rejected.connect(self.reject) layout.addWidget(btn_box) self.setLayout(layout) self.load_fingerprint_data() def test_selected_regex(self): current_table = None pattern = "" if self.http_table.currentRow() >= 0: current_table = self.http_table item = self.http_table.item(self.http_table.currentRow(), 1) if item: pattern = item.text() elif self.html_table.currentRow() >= 0: current_table = self.html_table item = self.html_table.item(self.html_table.currentRow(), 0) if item: pattern = item.text() if not pattern: QMessageBox.information(self, "测试结果", "请选择一个正则表达式进行测试") return try: re.compile(pattern) QMessageBox.information(self, "测试结果", f"正则表达式有效:\n{pattern}") except re.error as e: fixed = pattern if "bad escape" in str(e): fixed = re.sub(r'(?<!\\)\\(?!["\\/])', r'\\\\', pattern) elif "unterminated subpattern" in str(e): open_count = pattern.count('(') close_count = pattern.count(')') if open_count > close_count: fixed = pattern + ')' * (open_count - close_count) try: re.compile(fixed) QMessageBox.information( self, "修复成功", f"原表达式无效: {str(e)}\n修复后表达式: {fixed}" ) if current_table == self.http_table: self.http_table.item(self.http_table.currentRow(), 1).setText(fixed) else: self.html_table.item(self.html_table.currentRow(), 0).setText(fixed) except re.error as e2: QMessageBox.warning( self, "测试失败", f"正则表达式无效: {str(e2)}\n表达式: {pattern}" ) def add_row(self, table, default_values): row = table.rowCount() table.insertRow(row) for col, val in enumerate(default_values): table.setItem(row, col, QTableWidgetItem(val)) def remove_row(self, table): row = table.currentRow() if row >= 0: table.removeRow(row) def load_fingerprint_data(self): if not self.fingerprint: return self.cms_input.setText(self.fingerprint.get("cms", "")) self.version_input.setText(self.fingerprint.get("version", "")) for header in self.fingerprint.get("http_headers", []): self.add_row(self.http_table, [ header.get("header", ""), header.get("pattern", ""), str(header.get("score", 50)), header.get("type", "general") ]) for html in self.fingerprint.get("html_content", []): self.add_row(self.html_table, [ html.get("pattern", ""), str(html.get("score", 80)), html.get("type", "specific"), str(html.get("version_group", "")) if "version_group" in html else "" ]) for path in self.fingerprint.get("url_paths", []): self.add_row(self.url_table, [ path.get("path", ""), path.get("pattern", ""), str(path.get("score", 60)), path.get("type", "specific") ]) def validate_regex(self, pattern): try: if pattern: re.compile(pattern) return True except re.error as e: QMessageBox.warning(self, "正则错误", f"模式 '{pattern}' 无效: {str(e)}\n请使用测试按钮修复") return False def get_fingerprint(self): cms_name = self.cms_input.text().strip() if not cms_name: QMessageBox.warning(self, "输入错误", "CMS名称不能为空") return None for row in range(self.html_table.rowCount()): pattern_item = self.html_table.item(row, 0) if pattern_item and not self.validate_regex(pattern_item.text().strip()): return None fingerprint = { "cms": cms_name, "version": self.version_input.text().strip(), "confidence": 0, "http_headers": [], "html_content": [], "url_paths": [] } for row in range(self.http_table.rowCount()): header = self.http_table.item(row, 0).text().strip() if self.http_table.item(row, 0) else "" pattern = self.http_table.item(row, 1).text().strip() if self.http_table.item(row, 1) else "" score = int(self.http_table.item(row, 2).text() or 50) f_type = self.http_table.item(row, 3).text().strip() or "general" if header and pattern: fingerprint["http_headers"].append({ "header": header, "pattern": pattern, "score": score, "type": f_type }) for row in range(self.html_table.rowCount()): pattern = self.html_table.item(row, 0).text().strip() if self.html_table.item(row, 0) else "" score = int(self.html_table.item(row, 1).text() or 80) f_type = self.html_table.item(row, 2).text().strip() or "specific" version_group = self.html_table.item(row, 3).text().strip() if pattern: item = { "pattern": pattern, "score": score, "type": f_type } if version_group and version_group.isdigit(): item["version_group"] = int(version_group) fingerprint["html_content"].append(item) for row in range(self.url_table.rowCount()): path = self.url_table.item(row, 0).text().strip() if self.url_table.item(row, 0) else "" pattern = self.url_table.item(row, 1).text().strip() if self.url_table.item(row, 1) else "" score = int(self.url_table.item(row, 2).text() or 60) f_type = self.url_table.item(row, 3).text().strip() or "specific" if path: fingerprint["url_paths"].append({ "path": path, "pattern": pattern, "score": score, "type": f_type }) return fingerprint class JudgmentBasisDialog(QDialog): """判断依据展示对话框""" def __init__(self, parent=None, result=None): super().__init__(parent) self.result = result self.setWindowTitle(f"识别依据 - {result['url']}") self.setGeometry(400, 200, 800, 600) self.init_ui() def init_ui(self): layout = QVBoxLayout() # 基本信息 basic_info = QLabel(f"""

URL: {self.result['url']}

状态码: {self.result['status']}

""") layout.addWidget(basic_info) # 识别结果 results_group = QGroupBox("识别结果汇总") results_layout = QVBoxLayout() for i, res in enumerate(self.result['results']): is_primary = (i == 0) # 第一个结果是主要结果 result_label = QLabel(f"""

{'★ ' if is_primary else ''}{res['cms']} v{res['version']} 置信度: {res['confidence']}%

""") results_layout.addWidget(result_label) results_group.setLayout(results_layout) layout.addWidget(results_group) # 详细判断依据 basis_group = QTabWidget() for res in self.result['results']: text_edit = QTextEdit() text_edit.setReadOnly(True) # 显示所有判断依据 text_edit.setText("\n".join(res['judgment_basis'])) basis_group.addTab(text_edit, f"{res['cms']} (置信度{res['confidence']}%)") layout.addWidget(basis_group) # 关闭按钮 btn_box = QDialogButtonBox(QDialogButtonBox.Ok) btn_box.accepted.connect(self.accept) layout.addWidget(btn_box) self.setLayout(layout) class CMSDetectorApp(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("多CMS识别工具 (带判断依据)") self.setGeometry(100, 100, 1200, 800) self.fingerprint_manager = FingerprintManager() self.results = [] self.create_menu() self.init_ui() self.apply_styles() def create_menu(self): menubar = self.menuBar() file_menu = menubar.addMenu("文件") import_action = QAction("导入网站列表", self) import_action.triggered.connect(self.import_urls) file_menu.addAction(import_action) export_action = QAction("导出结果", self) export_action.triggered.connect(self.export_results) file_menu.addAction(export_action) file_menu.addSeparator() exit_action = QAction("退出", self) exit_action.setShortcut("Ctrl+Q") exit_action.triggered.connect(self.close) file_menu.addAction(exit_action) fingerprint_menu = menubar.addMenu("指纹库") add_fingerprint_action = QAction("添加指纹", self) add_fingerprint_action.triggered.connect(self.add_fingerprint) fingerprint_menu.addAction(add_fingerprint_action) import_fingerprint_action = QAction("导入指纹库", self) import_fingerprint_action.triggered.connect(self.import_fingerprints) fingerprint_menu.addAction(import_fingerprint_action) export_fingerprint_action = QAction("导出指纹库", self) export_fingerprint_action.triggered.connect(self.export_fingerprints) fingerprint_menu.addAction(export_fingerprint_action) clear_fingerprint_action = QAction("清空指纹库", self) clear_fingerprint_action.triggered.connect(self.clear_fingerprints) fingerprint_menu.addAction(clear_fingerprint_action) restore_default_action = QAction("恢复默认指纹库", self) restore_default_action.triggered.connect(self.restore_default_fingerprints) fingerprint_menu.addAction(restore_default_action) help_menu = menubar.addMenu("帮助") about_action = QAction("关于", self) about_action.triggered.connect(self.show_about) help_menu.addAction(about_action) def init_ui(self): main_widget = QWidget() main_layout = QVBoxLayout() self.tabs = QTabWidget() self.detection_tab = self.create_detection_tab() self.fingerprint_tab = self.create_fingerprint_tab() self.tabs.addTab(self.detection_tab, "网站检测") self.tabs.addTab(self.fingerprint_tab, "指纹库管理") main_layout.addWidget(self.tabs) main_widget.setLayout(main_layout) self.setCentralWidget(main_widget) self.status_bar = self.statusBar() self.status_label = QLabel("就绪") self.status_bar.addWidget(self.status_label) self.detection_thread = None def apply_styles(self): self.setStyleSheet(""" QMainWindow { background-color: #f0f0f0; } QTabWidget::pane { border: 1px solid #cccccc; background: white; } QTableWidget { background-color: white; alternate-background-color: #f8f8f8; gridline-color: #e0e0e0; } QHeaderView::section { background-color: #e0e0e0; padding: 4px; border: 1px solid #d0d0d0; } QPushButton { background-color: #4a86e8; color: white; border: none; padding: 5px 10px; border-radius: 4px; } QPushButton:hover { background-color: #3a76d8; } QPushButton:pressed { background-color: #2a66c8; } QPushButton:disabled { background-color: #a0a0a0; } QPushButton#clearBtn { background-color: #e74c3c; } QPushButton#clearBtn:hover { background-color: #c0392b; } QPushButton#restoreBtn { background-color: #27ae60; } QPushButton#restoreBtn:hover { background-color: #219653; } """) def create_detection_tab(self): tab = QWidget() layout = QVBoxLayout() # URL输入区域 control_layout = QHBoxLayout() self.url_input = QLineEdit() self.url_input.setPlaceholderText("输入网站URL (例如: example.com 或 http://example.com)") add_url_btn = QPushButton("添加URL") add_url_btn.clicked.connect(self.add_single_url) import_btn = QPushButton("导入URL列表") import_btn.clicked.connect(self.import_urls) clear_btn = QPushButton("清空列表") clear_btn.clicked.connect(self.clear_urls) control_layout.addWidget(self.url_input, 4) control_layout.addWidget(add_url_btn, 1) control_layout.addWidget(import_btn, 1) control_layout.addWidget(clear_btn, 1) layout.addLayout(control_layout) # URL列表区域 url_list_layout = QVBoxLayout() url_list_layout.addWidget(QLabel("待检测网站列表:")) self.url_list = QTextEdit() self.url_list.setPlaceholderText("每行一个URL") self.url_list.setMinimumHeight(80) url_list_layout.addWidget(self.url_list) layout.addLayout(url_list_layout) # 检测控制区域 detection_control_layout = QHBoxLayout() self.thread_spin = QSpinBox() self.thread_spin.setRange(1, 20) self.thread_spin.setValue(5) self.thread_spin.setPrefix("线程数: ") self.retry_spin = QSpinBox() self.retry_spin.setRange(0, 3) self.retry_spin.setValue(1) self.retry_spin.setPrefix("重试次数: ") self.timeout_spin = QSpinBox() self.timeout_spin.setRange(5, 60) self.timeout_spin.setValue(15) self.timeout_spin.setPrefix("超时时间(秒): ") self.detect_btn = QPushButton("开始检测") self.detect_btn.clicked.connect(self.start_detection) self.stop_btn = QPushButton("停止检测") self.stop_btn.clicked.connect(self.stop_detection) self.stop_btn.setEnabled(False) detection_control_layout.addWidget(self.thread_spin) detection_control_layout.addWidget(self.retry_spin) detection_control_layout.addWidget(self.timeout_spin) detection_control_layout.addStretch() detection_control_layout.addWidget(self.detect_btn) detection_control_layout.addWidget(self.stop_btn) layout.addLayout(detection_control_layout) # 进度条 self.progress_bar = QProgressBar() self.progress_bar.setRange(0, 100) self.progress_bar.setTextVisible(True) layout.addWidget(self.progress_bar) # 结果展示区域 splitter = QSplitter(Qt.Vertical) self.result_table = QTableWidget(0, 6) # 增加一列显示操作 self.result_table.setHorizontalHeaderLabels(["URL", "状态", "CMS类型", "版本", "置信度(%)", "操作"]) self.result_table.horizontalHeader().setSectionResizeMode(0, QHeaderView.Stretch) self.result_table.horizontalHeader().setSectionResizeMode(1, QHeaderView.ResizeToContents) self.result_table.horizontalHeader().setSectionResizeMode(2, QHeaderView.Stretch) self.result_table.horizontalHeader().setSectionResizeMode(3, QHeaderView.ResizeToContents) self.result_table.horizontalHeader().setSectionResizeMode(4, QHeaderView.ResizeToContents) self.result_table.horizontalHeader().setSectionResizeMode(5, QHeaderView.ResizeToContents) self.result_table.setAlternatingRowColors(True) self.log_area = QTextEdit() self.log_area.setReadOnly(True) self.log_area.setMinimumHeight(150) splitter.addWidget(self.result_table) splitter.addWidget(self.log_area) splitter.setSizes([400, 150]) layout.addWidget(splitter, 1) tab.setLayout(layout) return tab def create_fingerprint_tab(self): tab = QWidget() layout = QVBoxLayout() btn_layout = QHBoxLayout() add_btn = QPushButton("添加指纹") add_btn.clicked.connect(self.add_fingerprint) edit_btn = QPushButton("编辑指纹") edit_btn.clicked.connect(self.edit_fingerprint) remove_btn = QPushButton("删除指纹") remove_btn.clicked.connect(self.remove_fingerprint) clear_btn = QPushButton("清空指纹库") clear_btn.setObjectName("clearBtn") clear_btn.clicked.connect(self.clear_fingerprints) restore_btn = QPushButton("恢复默认") restore_btn.setObjectName("restoreBtn") restore_btn.clicked.connect(self.restore_default_fingerprints) import_btn = QPushButton("导入指纹库") import_btn.clicked.connect(self.import_fingerprints) export_btn = QPushButton("导出指纹库") export_btn.clicked.connect(self.export_fingerprints) btn_layout.addWidget(add_btn) btn_layout.addWidget(edit_btn) btn_layout.addWidget(remove_btn) btn_layout.addWidget(clear_btn) btn_layout.addWidget(restore_btn) btn_layout.addStretch() btn_layout.addWidget(import_btn) btn_layout.addWidget(export_btn) layout.addLayout(btn_layout) self.fingerprint_tree = QTreeWidget() self.fingerprint_tree.setHeaderLabels(["CMS名称", "版本", "核心特征数", "总特征数"]) self.fingerprint_tree.setColumnWidth(0, 200) self.fingerprint_tree.setSortingEnabled(True) self.populate_fingerprint_tree() layout.addWidget(self.fingerprint_tree, 1) tab.setLayout(layout) return tab def populate_fingerprint_tree(self): self.fingerprint_tree.clear() fingerprints = self.fingerprint_manager.get_fingerprints() for i, fp in enumerate(fingerprints): try: cms_name = fp["cms"] version = fp.get("version", "") core_features = 0 total_features = 0 for h in fp.get("http_headers", []): total_features += 1 if h.get("type") == "core": core_features += 1 for h in fp.get("html_content", []): total_features += 1 if h.get("type") == "core": core_features += 1 for u in fp.get("url_paths", []): total_features += 1 if u.get("type") == "core": core_features += 1 item = QTreeWidgetItem([ cms_name, version, str(core_features), str(total_features) ]) item.setData(0, Qt.UserRole, i) self.fingerprint_tree.addTopLevelItem(item) except Exception as e: self.log(f"处理指纹时出错: {e},已跳过") def add_single_url(self): url = self.url_input.text().strip() if url: current_text = self.url_list.toPlainText() new_text = current_text + (("\n" + url) if current_text else url) self.url_list.setPlainText(new_text) self.url_input.clear() def import_urls(self): file_path, _ = QFileDialog.getOpenFileName( self, "导入URL列表", "", "文本文件 (*.txt);;所有文件 (*)" ) if file_path: try: with open(file_path, 'r', encoding='utf-8') as f: urls = [line.strip() for line in f if line.strip()] self.url_list.setPlainText("\n".join(urls)) self.log(f"成功导入 {len(urls)} 个URL") except Exception as e: QMessageBox.critical(self, "导入错误", f"导入失败: {str(e)}") def clear_urls(self): self.url_list.clear() def start_detection(self): urls_text = self.url_list.toPlainText().strip() if not urls_text: QMessageBox.warning(self, "警告", "请先添加要检测的URL") return urls = [url.strip() for url in urls_text.splitlines() if url.strip()] if not urls: QMessageBox.warning(self, "警告", "没有有效的URL") return self.result_table.setRowCount(0) self.results = [] max_threads = self.thread_spin.value() retry_count = self.retry_spin.value() timeout = self.timeout_spin.value() self.detection_thread = DetectionWorker( urls, self.fingerprint_manager.get_fingerprints(), max_threads, retry_count ) self.detection_thread.timeout = timeout self.detection_thread.progress_signal.connect(self.update_progress) self.detection_thread.result_signal.connect(self.add_result) self.detection_thread.log_signal.connect(self.log) self.detection_thread.finished_signal.connect(self.detection_finished) self.detect_btn.setEnabled(False) self.stop_btn.setEnabled(True) self.progress_bar.setRange(0, len(urls)) self.progress_bar.setValue(0) self.detection_thread.start() def stop_detection(self): if self.detection_thread and self.detection_thread.isRunning(): self.detection_thread.stop() self.log("检测已停止") self.detection_finished() def detection_finished(self): self.detect_btn.setEnabled(True) self.stop_btn.setEnabled(False) self.status_label.setText("检测完成") def update_progress(self, current, total, url): self.progress_bar.setMaximum(total) self.progress_bar.setValue(current) self.status_label.setText(f"正在检测: {url} ({current}/{total})") def show_judgment_basis(self, result): """显示判断依据对话框""" dialog = JudgmentBasisDialog(self, result) dialog.exec_() def add_result(self, result): self.results.append(result) row = self.result_table.rowCount() self.result_table.insertRow(row) # URL url_item = QTableWidgetItem(result["url"]) url_item.setFlags(url_item.flags() ^ Qt.ItemIsEditable) self.result_table.setItem(row, 0, url_item) # 状态码 status = result["status"] status_item = QTableWidgetItem(str(status)) status_item.setFlags(status_item.flags() ^ Qt.ItemIsEditable) if status == 200: status_item.setForeground(Qt.darkGreen) elif 400 <= status < 500: status_item.setForeground(Qt.darkRed) elif status >= 500: status_item.setForeground(Qt.darkMagenta) self.result_table.setItem(row, 1, status_item) # CMS类型(主结果) primary = result["primary"] cms_item = QTableWidgetItem(primary["cms"]) cms_item.setFlags(cms_item.flags() ^ Qt.ItemIsEditable) self.result_table.setItem(row, 2, cms_item) # 版本 version_item = QTableWidgetItem(primary["version"]) version_item.setFlags(version_item.flags() ^ Qt.ItemIsEditable) self.result_table.setItem(row, 3, version_item) # 置信度 confidence = primary["confidence"] confidence_item = QTableWidgetItem(f"{confidence}%") confidence_item.setFlags(confidence_item.flags() ^ Qt.ItemIsEditable) if confidence >= 90: confidence_item.setForeground(Qt.darkGreen) elif confidence >= 70: confidence_item.setForeground(Qt.darkBlue) elif confidence >= 50: confidence_item.setForeground(Qt.darkOrange) else: confidence_item.setForeground(Qt.darkGray) self.result_table.setItem(row, 4, confidence_item) # 查看依据按钮 view_btn = QPushButton("查看依据") # 使用lambda表达式传递当前result view_btn.clicked.connect(lambda checked, res=result: self.show_judgment_basis(res)) self.result_table.setCellWidget(row, 5, view_btn) def add_fingerprint(self): dialog = AddFingerprintDialog(self) if dialog.exec_() == QDialog.Accepted: fingerprint = dialog.get_fingerprint() if fingerprint and self.fingerprint_manager.add_fingerprint(fingerprint): self.populate_fingerprint_tree() self.log(f"已添加指纹: {fingerprint['cms']}") def edit_fingerprint(self): selected_items = self.fingerprint_tree.selectedItems() if not selected_items: QMessageBox.warning(self, "警告", "请选择一个指纹进行编辑") return item = selected_items[0] index = item.data(0, Qt.UserRole) fingerprints = self.fingerprint_manager.get_fingerprints() if index is None or not (0 <= index < len(fingerprints)): QMessageBox.warning(self, "错误", "无效的指纹索引") return fingerprint = fingerprints[index] dialog = AddFingerprintDialog(self, fingerprint) if dialog.exec_() == QDialog.Accepted: updated = dialog.get_fingerprint() if updated and self.fingerprint_manager.update_fingerprint(index, updated): self.populate_fingerprint_tree() self.log(f"已更新指纹: {updated['cms']}") def remove_fingerprint(self): selected_items = self.fingerprint_tree.selectedItems() if not selected_items: QMessageBox.warning(self, "警告", "请选择要删除的指纹") return item = selected_items[0] cms_name = item.text(0) index = item.data(0, Qt.UserRole) reply = QMessageBox.question( self, "确认删除", f"确定要删除 '{cms_name}' 的指纹吗?", QMessageBox.Yes | QMessageBox.No ) if reply == QMessageBox.Yes: self.fingerprint_manager.remove_fingerprint(index) self.populate_fingerprint_tree() self.log(f"已删除指纹: {cms_name}") def clear_fingerprints(self): if not self.fingerprint_manager.get_fingerprints(): QMessageBox.information(self, "提示", "指纹库已为空") return reply = QMessageBox.question( self, "确认清空", "确定要清空所有指纹吗?此操作不可恢复!", QMessageBox.Yes | QMessageBox.No ) if reply == QMessageBox.Yes: self.fingerprint_manager.clear_fingerprints() self.populate_fingerprint_tree() self.log("已清空所有指纹") def restore_default_fingerprints(self): reply = QMessageBox.question( self, "确认恢复", "确定要恢复默认指纹库吗?当前指纹将被替换!", QMessageBox.Yes | QMessageBox.No ) if reply == QMessageBox.Yes: self.fingerprint_manager.restore_default_fingerprints() self.populate_fingerprint_tree() self.log("已恢复默认指纹库") def import_fingerprints(self): file_path, _ = QFileDialog.getOpenFileName( self, "导入指纹库", "", "JSON文件 (*.json);;所有文件 (*)" ) if file_path and self.fingerprint_manager.import_fingerprints(file_path): self.populate_fingerprint_tree() self.log(f"成功导入指纹库: {file_path}") def export_fingerprints(self): file_path, _ = QFileDialog.getSaveFileName( self, "导出指纹库", "cms_fingerprints.json", "JSON文件 (*.json)" ) if file_path and self.fingerprint_manager.export_fingerprints(file_path): self.log(f"成功导出指纹库: {file_path}") def export_results(self): if not self.results: QMessageBox.warning(self, "警告", "没有结果可导出") return file_path, _ = QFileDialog.getSaveFileName( self, "导出结果", "", "CSV文件 (*.csv);;JSON文件 (*.json)" ) if not file_path: return try: if file_path.endswith(".csv"): with open(file_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(["URL", "状态", "CMS类型", "版本", "置信度(%)"]) for result in self.results: primary = result["primary"] writer.writerow([ result["url"], result["status"], primary["cms"], primary["version"], primary["confidence"] ]) elif file_path.endswith(".json"): # 导出完整结果,包括判断依据 with open(file_path, 'w', encoding='utf-8') as f: json.dump(self.results, f, indent=4, ensure_ascii=False) self.log(f"结果已导出到: {file_path}") except Exception as e: QMessageBox.critical(self, "导出错误", f"导出失败: {str(e)}") def log(self, message): timestamp = time.strftime("%H:%M:%S") self.log_area.append(f"[{timestamp}] {message}") def show_about(self): about_text = """

多CMS识别工具 (带判断依据)

版本: 2.3.0

功能特点:

  • 显示详细的识别判断依据
  • URL路径特征主动访问验证
  • 并发检测提高效率
  • 核心特征加权识别,准确率高
  • 支持正则表达式测试和验证
  • 可自定义超时时间和重试次数

使用说明: 点击结果中的"查看依据"按钮可查看详细的识别依据

""" QMessageBox.about(self, "关于", about_text) def closeEvent(self, event): if self.detection_thread and self.detection_thread.isRunning(): reply = QMessageBox.question( self, "检测中", "检测仍在进行中,确定要退出吗?", QMessageBox.Yes | QMessageBox.No ) if reply == QMessageBox.Yes: self.detection_thread.stop() event.accept() else: event.ignore() else: event.accept() if __name__ == "__main__": if hasattr(Qt, 'AA_EnableHighDpiScaling'): QApplication.setAttribute(Qt.AA_EnableHighDpiScaling, True) if hasattr(Qt, 'AA_UseHighDpiPixmaps'): QApplication.setAttribute(Qt.AA_UseHighDpiPixmaps, True) app = QApplication(sys.argv) app.setStyle("Fusion") window = CMSDetectorApp() window.show() sys.exit(app.exec_()) 修改代码提高验证效率 其他无需修改 完整输出
filetype

import sys import pandas as pd import numpy as np import matplotlib.pyplot as plt from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QTableWidget, QTableWidgetItem, QFileDialog, QTabWidget, QLabel, QLineEdit, QComboBox, QMessageBox, QHeaderView, QDialog, QGroupBox, QTextEdit, QScrollArea, QSplitter, QFormLayout) from PyQt5.QtCore import Qt from sklearn.linear_model import LinearRegression from datetime import datetime import openpyxl from openpyxl.utils.dataframe import dataframe_to_rows import re import os class MaterialSystem(QMainWindow): def init(self): super().init() self.setWindowTitle(“辅料管理系统”) self.setGeometry(100, 100, 1200, 800) # 初始化变量 self.file_path = None self.df = None self.original_df = None self.production_data = {} self.historical_data = [] # 存储历史数据用于预测模型 # 创建主部件和布局 main_widget = QWidget() main_layout = QVBoxLayout() # 顶部按钮区域 top_layout = QHBoxLayout() self.load_button = QPushButton("加载Excel文件") self.load_button.clicked.connect(self.load_file) self.save_button = QPushButton("保存修改") self.save_button.clicked.connect(self.save_changes) self.save_button.setEnabled(False) self.predict_button = QPushButton("预测功能") self.predict_button.clicked.connect(self.show_prediction_dialog) self.predict_button.setEnabled(False) top_layout.addWidget(self.load_button) top_layout.addWidget(self.save_button) top_layout.addWidget(self.predict_button) top_layout.addStretch() # 标签显示文件路径 self.file_label = QLabel("未加载文件") top_layout.addWidget(self.file_label) main_layout.addLayout(top_layout) # 创建标签页 self.tabs = QTabWidget() self.table_tab = QWidget() self.production_tab = QWidget() self.history_tab = QWidget() # 新增历史数据标签页 # 表格标签页布局 table_layout = QVBoxLayout(self.table_tab) self.table_widget = QTableWidget() self.table_widget.setEditTriggers(QTableWidget.DoubleClicked) table_layout.addWidget(self.table_widget) # 产量数据标签页布局 production_layout = QVBoxLayout(self.production_tab) production_form_layout = QHBoxLayout() production_form_layout.addWidget(QLabel("阴极锌产量:")) self.zn_input = QLineEdit() production_form_layout.addWidget(self.zn_input) production_form_layout.addWidget(QLabel("电锌产量:")) self.dx_input = QLineEdit() production_form_layout.addWidget(self.dx_input) production_form_layout.addWidget(QLabel("渣处理量:")) self.zl_input = QLineEdit() production_form_layout.addWidget(self.zl_input) production_form_layout.addWidget(QLabel("硫酸产量:")) self.ls_input = QLineEdit() production_form_layout.addWidget(self.ls_input) production_form_layout.addWidget(QLabel("小窑焙砂处理量:")) self.by_input = QLineEdit() production_form_layout.addWidget(self.by_input) self.save_production_button = QPushButton("保存产量数据") self.save_production_button.clicked.connect(self.save_production_data) production_form_layout.addWidget(self.save_production_button) production_layout.addLayout(production_form_layout) production_layout.addStretch() # 历史数据标签页布局 history_layout = QVBoxLayout(self.history_tab) # 历史数据导入区域 history_group = QGroupBox("历史数据管理") history_group_layout = QVBoxLayout() self.import_button = QPushButton("导入历史数据") self.import_button.clicked.connect(self.import_historical_data) history_group_layout.addWidget(self.import_button) self.history_list = QTextEdit() self.history_list.setReadOnly(True) history_group_layout.addWidget(self.history_list) history_group.setLayout(history_group_layout) history_layout.addWidget(history_group) # 公式编辑区域 formula_group = QGroupBox("计算公式管理") formula_layout = QVBoxLayout() self.formula_edit = QTextEdit() self.formula_edit.setPlaceholderText("在此编辑计算公式...\n示例: 实际单耗 = 本月消耗 / 本月产量 * 1000") formula_layout.addWidget(self.formula_edit) self.save_formula_button = QPushButton("保存公式") self.save_formula_button.clicked.connect(self.save_formulas) formula_layout.addWidget(self.save_formula_button) formula_group.setLayout(formula_layout) history_layout.addWidget(formula_group) # 添加标签页 self.tabs.addTab(self.table_tab, "辅料数据") self.tabs.addTab(self.production_tab, "产量数据") self.tabs.addTab(self.history_tab, "历史数据与公式") # 新增标签页 main_layout.addWidget(self.tabs) main_widget.setLayout(main_layout) self.setCentralWidget(main_widget) # 初始化产量数据 self.init_production_data() # 初始化公式 self.formulas = { "实际单耗": "本月消耗 / 本月产量 * 1000", "定额单耗": "上月实际单耗 * 0.98", "定额消耗": "定额单耗 * 下月计划产量 / 1000" } self.update_formula_display() def update_formula_display(self): """更新公式显示""" text = "" for name, formula in self.formulas.items(): text += f"{name}: {formula}\n" self.formula_edit.setText(text) def save_formulas(self): """保存公式""" text = self.formula_edit.toPlainText() lines = text.split('\n') new_formulas = {} for line in lines: if ':' in line: parts = line.split(':', 1) name = parts[0].strip() formula = parts[1].strip() if name and formula: new_formulas[name] = formula if new_formulas: self.formulas = new_formulas QMessageBox.information(self, "成功", "公式已保存!") else: QMessageBox.warning(self, "警告", "未检测到有效的公式!") def import_historical_data(self): """导入历史数据""" file_paths, _ = QFileDialog.getOpenFileNames( self, "选择历史数据文件", "", "Excel Files (*.xlsx *.xls)" ) if not file_paths: return success_count = 0 for file_path in file_paths: try: # 读取历史数据 wb = openpyxl.load_workbook(file_path) sheet = wb.active # 提取数据 data = [] for row in sheet.iter_rows(values_only=True): data.append(row) # 提取产量数据 production_info = {} for row in data: if any(keyword in str(row[0]) for keyword in ["阴极锌", "电锌", "渣处理", "硫酸", "焙砂"]): parts = str(row[0]).split(';') for part in parts: if ':' in part: key_value = part.split(':', 1) key = key_value[0].strip() value = key_value[1].strip() # 提取数值部分 numbers = re.findall(r"[-+]?\d*\.\d+|\d+", value) if numbers: production_info[key] = float(numbers[0]) # 提取辅料消耗数据 material_consumption = {} for i, row in enumerate(data): if i >= 4: # 跳过标题行 if len(row) > 1 and row[1] and str(row[1]).strip() != "": material_name = str(row[1]).strip() # 确保有足够的列 if len(row) > 12 and row[12] is not None: try: consumption = float(row[12]) material_consumption[material_name] = consumption except (ValueError, TypeError): pass # 添加到历史数据 if production_info and material_consumption: history_entry = { "file": os.path.basename(file_path), "date": datetime.now().strftime("%Y-%m"), "production": production_info, "materials": material_consumption } self.historical_data.append(history_entry) success_count += 1 except Exception as e: QMessageBox.warning(self, "导入错误", f"文件 {os.path.basename(file_path)} 导入失败: {str(e)}") # 更新历史数据显示 self.update_history_display() QMessageBox.information(self, "导入完成", f"成功导入 {success_count}/{len(file_paths)} 个历史数据文件!") def update_history_display(self): """更新历史数据显示""" text = "已导入的历史数据:\n\n" for i, entry in enumerate(self.historical_data, 1): text += f"{i}. {entry['file']} ({entry['date']})\n" text += f" 产量: {entry['production']}\n" text += f" 辅料数量: {len(entry['materials'])}\n\n" self.history_list.setText(text) def init_production_data(self): """初始化产量数据""" self.production_data = { "阴极锌": 9293.226, "电锌": 8635.059, "渣处理量": 6514.5, "硫酸产量": 13143.5, "小窑焙砂处理量": 1095.9 } self.zn_input.setText(str(self.production_data["阴极锌"])) self.dx_input.setText(str(self.production_data["电锌"])) self.zl_input.setText(str(self.production_data["渣处理量"])) self.ls_input.setText(str(self.production_data["硫酸产量"])) self.by_input.setText(str(self.production_data["小窑焙砂处理量"])) def load_file(self): """加载Excel文件""" file_path, _ = QFileDialog.getOpenFileName( self, "打开Excel文件", "", "Excel Files (*.xlsx *.xls)" ) if file_path: self.file_path = file_path self.file_label.setText(f"已加载: {file_path}") try: # 使用openpyxl加载工作簿 wb = openpyxl.load_workbook(file_path) sheet = wb.active # 将工作表转换为DataFrame data = sheet.values headers = next(data) self.df = pd.DataFrame(data, columns=headers) self.original_df = self.df.copy() # 显示数据 self.display_table() # 启用按钮 self.save_button.setEnabled(True) self.predict_button.setEnabled(True) # 提取产量数据 self.extract_production_data() except Exception as e: QMessageBox.critical(self, "错误", f"加载文件失败: {str(e)}") # 重置状态 self.file_path = None self.df = None self.save_button.setEnabled(False) self.predict_button.setEnabled(False) self.file_label.setText("未加载文件") def extract_production_data(self): """从表格中提取产量数据 - 改进版本""" # 查找包含产量数据的行 for _, row in self.df.iterrows(): row_content = str(row[0]) # 检查是否包含产量关键词 if any(keyword in row_content for keyword in ["阴极锌", "电锌", "渣处理", "硫酸", "焙砂"]): # 使用更稳健的解析方法 parts = row_content.split(';') for part in parts: # 使用正则表达式提取数值 numbers = re.findall(r"[-+]?\d*\.\d+|\d+", part) if numbers: value = float(numbers[0]) if "阴极锌" in part: self.production_data["阴极锌"] = value elif "电锌" in part: self.production_data["电锌"] = value elif "渣处理" in part: self.production_data["渣处理量"] = value elif "硫酸" in part: self.production_data["硫酸产量"] = value elif "焙砂" in part: self.production_data["小窑焙砂处理量"] = value # 更新输入框 self.zn_input.setText(str(self.production_data["阴极锌"])) self.dx_input.setText(str(self.production_data["电锌"])) self.zl_input.setText(str(self.production_data["渣处理量"])) self.ls_input.setText(str(self.production_data["硫酸产量"])) self.by_input.setText(str(self.production_data["小窑焙砂处理量"])) break def save_production_data(self): """保存产量数据""" try: self.production_data["阴极锌"] = float(self.zn_input.text()) self.production_data["电锌"] = float(self.dx_input.text()) self.production_data["渣处理量"] = float(self.zl_input.text()) self.production_data["硫酸产量"] = float(self.ls_input.text()) self.production_data["小窑焙砂处理量"] = float(self.by_input.text()) QMessageBox.information(self, "成功", "产量数据已保存!") except ValueError: QMessageBox.warning(self, "错误", "请输入有效的数字!") def display_table(self): """在表格中显示数据 - 改进版本""" if self.df is not None: self.table_widget.clear() # 设置行列数 n_rows, n_cols = self.df.shape self.table_widget.setRowCount(n_rows) self.table_widget.setColumnCount(n_cols) # 设置表头 headers = self.df.columns.tolist() self.table_widget.setHorizontalHeaderLabels(headers) # 填充数据 for i in range(n_rows): for j in range(n_cols): # 安全获取单元格值 cell_value = self.df.iloc[i, j] if pd.isna(cell_value): cell_value = "" item = QTableWidgetItem(str(cell_value)) # 设置不可编辑的单元格 - 改进索引计算 # 仅当列数足够时才应用不可编辑 if j < len(headers): col_name = headers[j] # 对特定列设置为不可编辑 if any(keyword in col_name for keyword in ["单耗", "定额", "消耗量", "结果"]): item.setFlags(item.flags() & ~Qt.ItemIsEditable) item.setBackground(Qt.lightGray) self.table_widget.setItem(i, j, item) # 调整列宽 self.table_widget.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch) def save_changes(self): """保存修改到Excel文件 - 改进版本""" if self.df is None or self.file_path is None: return try: # 从表格中获取修改后的数据 for i in range(self.table_widget.rowCount()): for j in range(self.table_widget.columnCount()): if self.table_widget.item(i, j) is not None: new_value = self.table_widget.item(i, j).text() # 仅当列存在时才更新 if j < self.df.shape[1]: self.df.iloc[i, j] = new_value # 使用openpyxl保存,保留公式 wb = openpyxl.load_workbook(self.file_path) sheet = wb.active # 清除现有内容 for row in sheet.iter_rows(min_row=1, max_row=sheet.max_row, min_col=1, max_col=sheet.max_column): for cell in row: cell.value = None # 写入新数据 for r_idx, row in enumerate(dataframe_to_rows(self.df, index=False, header=True), 1): for c_idx, value in enumerate(row, 1): if c_idx <= sheet.max_column: # 确保列索引有效 sheet.cell(row=r_idx, column=c_idx, value=value) # 保存工作簿 wb.save(self.file_path) QMessageBox.information(self, "成功", "修改已保存到Excel文件!") except Exception as e: QMessageBox.critical(self, "错误", f"保存文件失败: {str(e)}") def show_prediction_dialog(self): """显示预测对话框""" if self.df is None: return dialog = PredictionDialog(self.df, self.production_data, self.historical_data, self.formulas, self) dialog.exec_() class PredictionDialog(QDialog): def init(self, df, production_data, historical_data, formulas, parent=None): super().init(parent) self.setWindowTitle(“辅料消耗预测”) self.setGeometry(200, 200, 1000, 800) self.df = df self.production_data = production_data self.historical_data = historical_data self.formulas = formulas self.material_models = {} # 存储每个辅料的预测模型 layout = QVBoxLayout() # 预测设置区域 settings_layout = QHBoxLayout() settings_layout.addWidget(QLabel("预测月份:")) self.month_combo = QComboBox() months = [f"{i}月" for i in range(1, 13)] current_month = datetime.now().month self.month_combo.addItems(months) self.month_combo.setCurrentIndex(current_month % 12) settings_layout.addWidget(self.month_combo) settings_layout.addWidget(QLabel("预测方法:")) self.method_combo = QComboBox() methods = ["基于产量比例", "线性回归", "移动平均"] if historical_data: methods.insert(0, "历史数据模型") self.method_combo.addItems(methods) settings_layout.addWidget(self.method_combo) self.predict_button = QPushButton("执行预测") self.predict_button.clicked.connect(self.run_prediction) settings_layout.addWidget(self.predict_button) layout.addLayout(settings_layout) # 产量输入区域 production_layout = QFormLayout() production_layout.addRow(QLabel("阴极锌产量:"), self.zn_pred_input := QLineEdit(str(production_data["阴极锌"]))) production_layout.addRow(QLabel("电锌产量:"), self.dx_pred_input := QLineEdit(str(production_data["电锌"]))) production_layout.addRow(QLabel("渣处理量:"), self.zl_pred_input := QLineEdit(str(production_data["渣处理量"]))) production_layout.addRow(QLabel("硫酸产量:"), self.ls_pred_input := QLineEdit(str(production_data["硫酸产量"]))) production_layout.addRow(QLabel("小窑焙砂处理量:"), self.by_pred_input := QLineEdit(str(production_data["小窑焙砂处理量"]))) layout.addLayout(production_layout) # 创建分割器用于结果和公式显示 splitter = QSplitter(Qt.Vertical) # 预测结果表格 result_widget = QWidget() result_layout = QVBoxLayout(result_widget) result_layout.addWidget(QLabel("预测结果:")) self.result_table = QTableWidget() self.result_table.setColumnCount(5) self.result_table.setHorizontalHeaderLabels(["车间", "辅料名称", "单位", "预测消耗量", "预测方法"]) self.result_table.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch) result_layout.addWidget(self.result_table) # 公式显示区域 formula_widget = QWidget() formula_layout = QVBoxLayout(formula_widget) formula_layout.addWidget(QLabel("当前使用的公式:")) self.formula_display = QTextEdit() self.formula_display.setReadOnly(True) self.update_formula_display() formula_layout.addWidget(self.formula_display) splitter.addWidget(result_widget) splitter.addWidget(formula_widget) splitter.setSizes([500, 200]) layout.addWidget(splitter) # 图表按钮 self.chart_button = QPushButton("生成预测图表") self.chart_button.clicked.connect(self.generate_chart) layout.addWidget(self.chart_button) self.setLayout(layout) # 初始化时执行预测 self.run_prediction() def update_formula_display(self): """更新公式显示""" text = "" for name, formula in self.formulas.items(): text += f"{name}: {formula}\n" self.formula_display.setText(text) def run_prediction(self): """执行预测计算 - 增强版本""" # 清空表格 self.result_table.setRowCount(0) # 获取预测设置 prediction_method = self.method_combo.currentText() # 获取预测产量 try: zn_production = float(self.zn_pred_input.text()) dx_production = float(self.dx_pred_input.text()) zl_production = float(self.zl_pred_input.text()) ls_production = float(self.ls_pred_input.text()) by_production = float(self.by_pred_input.text()) except ValueError: QMessageBox.warning(self, "错误", "请输入有效的产量数据!") return # 筛选有效的辅料数据行 valid_rows = [] for idx, row in self.df.iterrows(): if idx >= 4: # 跳过标题行 workshop = row[0] material = row[1] unit = row[2] if len(row) > 2 else "" # 检查是否有消耗量数据 if len(row) > 12 and pd.notna(row[12]) and str(row[12]).strip() != "": try: consumption = float(row[12]) valid_rows.append((workshop, material, unit, consumption)) except (ValueError, TypeError): continue # 训练模型(如果需要) if prediction_method == "历史数据模型" and self.historical_data: self.train_models() # 执行预测 predictions = [] for workshop, material, unit, consumption in valid_rows: method_used = prediction_method predicted = None # 根据预测方法选择预测逻辑 if prediction_method == "历史数据模型" and material in self.material_models: # 使用历史数据训练的模型 try: # 根据车间确定使用哪个产量数据 if "浸出" in workshop: input_data = [[zn_production]] elif "熔铸" in workshop: input_data = [[dx_production]] elif "渣处理" in workshop: input_data = [[zl_production]] elif "硫酸" in workshop: input_data = [[ls_production]] elif "焙砂" in workshop: input_data = [[by_production]] else: input_data = [[(zn_production + dx_production + zl_production) / 3]] predicted = self.material_models[material].predict(input_data)[0] except: predicted = consumption * 1.05 # 模型失败时使用默认方法 elif prediction_method == "线性回归": # 使用线性回归模型(简化) predicted = consumption * 1.05 elif prediction_method == "移动平均": # 移动平均法 predicted = consumption * 1.03 else: # 基于产量比例 # 根据产量变化调整 if "浸出" in workshop: ratio = zn_production / self.production_data["阴极锌"] predicted = consumption * ratio elif "熔铸" in workshop: ratio = dx_production / self.production_data["电锌"] predicted = consumption * ratio elif "渣处理" in workshop: ratio = zl_production / self.production_data["渣处理量"] predicted = consumption * ratio elif "硫酸" in workshop: ratio = ls_production / self.production_data["硫酸产量"] predicted = consumption * ratio elif "焙砂" in workshop: ratio = by_production / self.production_data["小窑焙砂处理量"] predicted = consumption * ratio else: predicted = consumption * 1.02 # 默认增长2% method_used = "默认增长" # 应用自定义公式(如果有) if "定额消耗" in self.formulas: try: # 在实际应用中应使用更安全的公式解析 predicted = eval(self.formulas["定额消耗"], { "定额单耗": consumption / (self.production_data["阴极锌"] / 1000), "下月计划产量": zn_production }) except: pass if predicted is not None: predictions.append((workshop, material, unit, predicted, method_used)) # 显示预测结果 self.result_table.setRowCount(len(predictions)) for i, (workshop, material, unit, predicted, method) in enumerate(predictions): self.result_table.setItem(i, 0, QTableWidgetItem(str(workshop))) self.result_table.setItem(i, 1, QTableWidgetItem(material)) self.result_table.setItem(i, 2, QTableWidgetItem(unit)) self.result_table.setItem(i, 3, QTableWidgetItem(f"{predicted:.2f}")) self.result_table.setItem(i, 4, QTableWidgetItem(method)) def train_models(self): """使用历史数据训练预测模型""" if not self.historical_data: return self.material_models = {} # 收集每个辅料的历史数据 material_data = {} for entry in self.historical_data: for material, consumption in entry["materials"].items(): if material not in material_data: material_data[material] = [] # 根据辅料类型确定使用哪个产量数据 production_key = self.detect_production_key(material) if production_key in entry["production"]: production_value = entry["production"][production_key] material_data[material].append((production_value, consumption)) # 为每个辅料训练模型 for material, data in material_data.items(): if len(data) > 2: # 至少需要3个数据点 try: X = np.array([d[0] for d in data]).reshape(-1, 1) y = np.array([d[1] for d in data]) model = LinearRegression() model.fit(X, y) self.material_models[material] = model except: # 模型训练失败,跳过该辅料 pass def detect_production_key(self, material): """根据辅料名称确定关联的产量类型""" material_lower = material.lower() if "浸出" in material_lower or "阴极锌" in material_lower: return "阴极锌" elif "熔铸" in material_lower or "电锌" in material_lower: return "电锌" elif "渣处理" in material_lower or "渣量" in material_lower: return "渣处理量" elif "硫酸" in material_lower: return "硫酸产量" elif "焙砂" in material_lower: return "小窑焙砂处理量" else: return "阴极锌" # 默认 def generate_chart(self): """生成预测图表 - 增强版本""" if self.result_table.rowCount() == 0: QMessageBox.warning(self, "警告", "没有可用的预测数据!") return # 获取预测结果 data = [] for i in range(self.result_table.rowCount()): workshop = self.result_table.item(i, 0).text() material = self.result_table.item(i, 1).text() consumption = float(self.result_table.item(i, 3).text()) method = self.result_table.item(i, 4).text() data.append((f"{workshop}-{material}", consumption, method)) # 按消耗量排序 data = sorted(data, key=lambda x: x[1], reverse=True)[:15] # 创建图表 plt.figure(figsize=(12, 8)) names = [item[0] for item in data] values = [item[1] for item in data] methods = [item[2] for item in data] # 为不同方法分配颜色 color_map = { "历史数据模型": "green", "线性回归": "blue", "移动平均": "orange", "基于产量比例": "purple", "默认增长": "red" } colors = [color_map.get(method, "gray") for method in methods] plt.barh(names, values, color=colors) plt.xlabel('预测消耗量') plt.title('辅料消耗预测 (前15项)') # 添加图例 from matplotlib.patches import Patch legend_elements = [Patch(facecolor=color, label=method) for method, color in color_map.items()] plt.legend(handles=legend_elements, loc='lower right') plt.tight_layout() plt.show() if name == “main”: app = QApplication(sys.argv) window = MaterialSystem() window.show() sys.exit(app.exec_())帮我检查该代码中有什么问题

filetype

import sys import json import sqlite3 import requests from PyQt5.QtWidgets import (QApplication, QMainWindow, QSplitter, QListWidget, QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QLineEdit, QLabel, QMessageBox, QStatusBar) from PyQt5.QtWebEngineWidgets import QWebEngineView, QWebEngineProfile from PyQt5.QtCore import QUrl, Qt, QTimer from PyQt5.QtNetwork import QNetworkCookie class AccountManager: """账号管理类,处理账号的存储和加载""" def __init__(self, db_name='bilibili_accounts.db'): self.conn = sqlite3.connect(db_name) self.cursor = self.conn.cursor() self._create_table() def _create_table(self): self.cursor.execute(''' CREATE TABLE IF NOT EXISTS accounts ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE, cookies TEXT NOT NULL ) ''') self.conn.commit() def add_account(self, name, cookies): """添加新账号""" try: self.cursor.execute( "INSERT INTO accounts (name, cookies) VALUES (?, ?)", (name, json.dumps(cookies)) ) self.conn.commit() return True except sqlite3.IntegrityError: return False def get_accounts(self): """获取所有账号""" self.cursor.execute("SELECT id, name, cookies FROM accounts") return [ {'id': row[0], 'name': row[1], 'cookies': json.loads(row[2])} for row in self.cursor.fetchall() ] def delete_account(self, account_id): """删除账号""" self.cursor.execute("DELETE FROM accounts WHERE id=?", (account_id,)) self.conn.commit() def __del__(self): self.conn.close() class BilibiliBrowser(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("B站多账号浏览器") self.resize(1200, 800) # 初始化账号管理器 self.account_manager = AccountManager() # 创建UI self.init_ui() # 加载账号列表 self.load_accounts() # 设置状态栏 self.status_bar = QStatusBar() self.setStatusBar(self.status_bar) self.status_bar.showMessage("就绪", 3000) def init_ui(self): """初始化用户界面""" # 主分割布局 main_splitter = QSplitter(Qt.Horizontal) # 左侧账号管理区域 account_widget = QWidget() account_layout = QVBoxLayout() # 账号列表 account_layout.addWidget(QLabel("账号列表")) self.account_list = QListWidget() self.account_list.itemClicked.connect(self.switch_account) account_layout.addWidget(self.account_list) # 添加账号区域 add_account_layout = QHBoxLayout() self.account_name_input = QLineEdit() self.account_name_input.setPlaceholderText("输入账号名称") add_account_layout.addWidget(self.account_name_input) add_btn = QPushButton("添加账号") add_btn.clicked.connect(self.add_account) add_account_layout.addWidget(add_btn) account_layout.addLayout(add_account_layout) # 删除账号按钮 del_btn = QPushButton("删除选中账号") del_btn.clicked.connect(self.delete_account) account_layout.addWidget(del_btn) # 检测账号状态按钮 check_btn = QPushButton("检测账号状态") check_btn.clicked.connect(self.check_account_status) account_layout.addWidget(check_btn) account_widget.setLayout(account_layout) # 右侧浏览器区域 browser_widget = QWidget() browser_layout = QVBoxLayout() # 导航栏 nav_layout = QHBoxLayout() self.url_bar = QLineEdit() self.url_bar.setPlaceholderText("输入网址或搜索内容") self.url_bar.returnPressed.connect(self.navigate_to_url) nav_layout.addWidget(self.url_bar) home_btn = QPushButton("首页") home_btn.clicked.connect(self.go_home) nav_layout.addWidget(home_btn) refresh_btn = QPushButton("刷新") refresh_btn.clicked.connect(self.refresh_page) nav_layout.addWidget(refresh_btn) browser_layout.addLayout(nav_layout) # 浏览器视图 self.browser = QWebEngineView() self.browser.load(QUrl("https://www.bilibili.com")) self.browser.urlChanged.connect(self.update_url_bar) browser_layout.addWidget(self.browser) browser_widget.setLayout(browser_layout) # 添加组件到主分割器 main_splitter.addWidget(account_widget) main_splitter.addWidget(browser_widget) main_splitter.setSizes([300, 900]) self.setCentralWidget(main_splitter) def load_accounts(self): """加载所有账号到列表""" self.account_list.clear() accounts = self.account_manager.get_accounts() for account in accounts: self.account_list.addItem(f"{account['name']} (ID:{account['id']})") def add_account(self): """添加新账号""" account_name = self.account_name_input.text().strip() if not account_name: QMessageBox.warning(self, "输入错误", "请输入账号名称") return # 获取当前浏览器的Cookie cookie_store = self.browser.page().profile().cookieStore() cookies = [] # 使用闭包捕获cookies列表 def capture_cookies(cookie): print(cookie) cookies.append({ 'name': cookie.name().data().decode(), 'value': cookie.value().data().decode(), 'domain': cookie.domain(), 'path': cookie.path(), 'expirationDate': cookie.expirationDate().toSecsSinceEpoch() if cookie.expirationDate().isValid() else None }) # 连接信号捕获Cookie cookie_store.cookieAdded.connect(capture_cookies) # 设置一个定时器来等待Cookie收集完成 QTimer.singleShot(1000, lambda: self.finalize_account_creation(account_name, cookies, cookie_store)) # 提示用户登录 self.browser.load(QUrl("https://passport.bilibili.com/login")) self.status_bar.showMessage("请在浏览器中登录B站账号...", 5000) def finalize_account_creation(self, account_name, cookies, cookie_store): """完成账号创建过程""" # 断开信号连接 cookie_store.cookieAdded.disconnect() print(cookies) # 过滤B站相关Cookie bili_cookies = [c for c in cookies if 'bilibili' in c['domain']] print(bili_cookies) if not bili_cookies: QMessageBox.warning(self, "添加失败", "未检测到有效的B站Cookie") return # 保存账号 if self.account_manager.add_account(account_name, bili_cookies): self.load_accounts() self.account_name_input.clear() self.status_bar.showMessage(f"账号 '{account_name}' 添加成功", 3000) else: QMessageBox.warning(self, "添加失败", "账号名称已存在") def delete_account(self): """删除选中账号""" current_item = self.account_list.currentItem() if not current_item: return account_id = int(current_item.text().split("ID:")[1].strip(")")) self.account_manager.delete_account(account_id) self.load_accounts() self.status_bar.showMessage("账号已删除", 3000) def switch_account(self, item): """切换到选中的账号""" account_id = int(item.text().split("ID:")[1].strip(")")) accounts = self.account_manager.get_accounts() account = next((a for a in accounts if a['id'] == account_id), None) if account: self.apply_cookies(account['cookies']) self.browser.load(QUrl("https://www.bilibili.com")) self.status_bar.showMessage(f"已切换到账号: {account['name']}", 3000) def apply_cookies(self, cookies): """应用Cookie到浏览器""" profile = QWebEngineProfile.defaultProfile() cookie_store = profile.cookieStore() cookie_store.deleteAllCookies() # 清除现有Cookie for cookie_data in cookies: cookie = QNetworkCookie() cookie.setName(cookie_data['name'].encode()) cookie.setValue(cookie_data['value'].encode()) cookie.setDomain(cookie_data['domain']) cookie.setPath(cookie_data['path']) if cookie_data['expirationDate']: cookie.setExpirationDate(cookie_data['expirationDate']) cookie_store.setCookie(cookie) def check_account_status(self): """检查选中账号的状态""" current_item = self.account_list.currentItem() if not current_item: return account_id = int(current_item.text().split("ID:")[1].strip(")")) accounts = self.account_manager.get_accounts() account = next((a for a in accounts if a['id'] == account_id), None) if account: # 使用requests验证Cookie有效性 session = requests.Session() for cookie in account['cookies']: session.cookies.set( cookie['name'], cookie['value'], domain=cookie['domain'], path=cookie['path'] ) try: response = session.get("https://api.bilibili.com/x/web-interface/nav") data = response.json() if data['code'] == 0: uname = data['data']['uname'] self.status_bar.showMessage(f"账号有效: {uname}", 5000) else: self.status_bar.showMessage("账号已失效,请重新登录", 5000) except Exception as e: self.status_bar.showMessage(f"检测失败: {str(e)}", 5000) def navigate_to_url(self): """导航到输入的URL""" url = self.url_bar.text().strip() if not url.startswith(('http://', 'https://')): url = 'https://' + url self.browser.load(QUrl(url)) def update_url_bar(self, q): """更新URL地址栏""" self.url_bar.setText(q.toString()) def go_home(self): """返回B站首页""" self.browser.load(QUrl("https://www.bilibili.com")) def refresh_page(self): """刷新当前页面""" self.browser.reload() if __name__ == "__main__": app = QApplication(sys.argv) browser = BilibiliBrowser() browser.show() sys.exit(app.exec_()) 获取不到cookie请帮我检查修改并且完整返回