请解释这段代码def load_dataset(datafiles): def read(data_path): with open(data_path, 'r', encoding='utf-8') as fp: next(fp) for line in fp.readlines(): words, labels = line.strip('\n').split('\t') words = words.split('\002') labels = labels.split('\002') yield words, labels if isinstance(datafiles, str): return MapDataset(list(read(datafiles))) elif isinstance(datafiles, list) or isinstance(datafiles, tuple): return [MapDataset(list(read(datafile))) for datafile in datafiles]

时间: 2023-12-03 14:02:12 浏览: 175
这段代码定义了一个函数叫做`load_dataset`,它接受一个或多个数据文件的路径作为输入。该函数使用嵌套函数`read`来读取每个数据文件,并将其转换为一个`MapDataset`对象。 在`read`函数中,它使用`with`语句打开数据文件并逐行读取文件内容。每一行都包含一个句子及其对应的标签,它们使用`\t`分隔。句子和标签使用`\002`分隔。在读取每一行后,它将句子和标签分别存储在`words`和`labels`变量中,然后将它们转换为列表。最后,它使用`yield`语句将句子和标签作为一对返回。 在`load_dataset`函数中,如果输入是一个字符串,则假定它是单个数据文件的路径,并将其传递给`read`函数。否则,如果输入是一个列表或元组,则假定它包含多个数据文件的路径,并为每个数据文件调用`read`函数。在每种情况下,`MapDataset`被用于将数据转换为可映射的对象,以便于后续操作。最后,该函数返回一个`MapDataset`对象或一个`MapDataset`对象的列表。
相关问题

# main_window.py(主窗口逻辑) import numpy as np from PySide6.QtWidgets import QMainWindow, QFileDialog, QGraphicsScene, QGraphicsView, QMessageBox, QGraphicsPathItem from PySide6.QtGui import QPainterPath, QPen, QBrush, QAction, QTransform, QImage, QPixmap, QColor from PySide6.QtCore import Qt, QRectF, QPointF from osgeo import ogr, gdal from PySide6.QtWidgets import QInputDialog # 新增输入对话框 # 新增自定义图形项类(用于存储属性) class FeatureItem(QGraphicsPathItem): def __init__(self, path, attributes): super().__init__(path) self.attributes = attributes # 存储属性字典 class MainWindow(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("GIS软件") self.setGeometry(100, 100, 800, 600) ogr.UseExceptions() self.init_ui() self.scene = QGraphicsScene(self) self.graphicsView.setScene(self.scene) # 新增:存储所有几何边界 self.total_bounds = QRectF() def init_ui(self): self.toolBar = self.addToolBar("工具") self.actionOpen_Vector_Data = QAction("打开矢量数据", self) self.toolBar.addAction(self.actionOpen_Vector_Data) # 新增栅格动作 self.actionOpen_Raster_Data = QAction("打开栅格数据", self) self.toolBar.addAction(self.actionOpen_Raster_Data) # 添加到工具栏 # 新增缓冲区分析按钮 self.actionBuffer_Analysis = QAction("缓冲区分析", self) self.toolBar.addAction(self.actionBuffer_Analysis) self.graphicsView = QGraphicsView() self.setCentralWidget(self.graphicsView) # 新增属性查询按钮 self.actionQuery_Attribute = QAction("属性查询", self) self.toolBar.addAction(self.actionQuery_Attribute) self.actionOpen_Vector_Data.triggered.connect(self.open_vector_data) self.actionOpen_Raster_Data.triggered.connect(self.open_raster_data) # 新增连接 self.actionBuffer_Analysis.triggered.connect(self.buffer_analysis) self.actionQuery_Attribute.triggered.connect(self.enable_query_mode) # 新增鼠标点击事件 self.graphicsView.setMouseTracking(True) self.is_query_mode = False # 新增波段组合按钮 self.actionBand_Combination = QAction("波段组合", self) self.toolBar.addAction(self.actionBand_Combination) self.actionBand_Combination.triggered.connect(self.open_band_combination) def open_vector_data(self): file_path, _ = QFileDialog.getOpenFileName( self, "打开矢量文件", "", "Shapefile (*.shp);;GeoJSON (*.geojson);;All Files (*)" ) if file_path: self.load_vector_data(file_path) # 新增:自动缩放视图 self.auto_zoom() def load_vector_data(self, file_path): self.scene.clear() self.total_bounds = QRectF() # 重置边界 try: data_source = ogr.Open(file_path, 0) layer = data_source.GetLayer(0) for feature in layer: geom = feature.GetGeometryRef() path = self.geometry_to_qpainterpath(geom) # 更新总边界 if path.boundingRect().isValid(): self.total_bounds = self.total_bounds.united(path.boundingRect()) pen = QPen(Qt.blue, 1) brush = QBrush(Qt.cyan) self.scene.addPath(path, pen, brush) data_source = None except Exception as e: print(f"加载失败: {str(e)}") self.current_vector_path = file_path # 新增这一行 data_source = None def geometry_to_qpainterpath(self, geom): path = QPainterPath() if geom.GetGeometryType() == ogr.wkbPolygon: for ring in range(geom.GetGeometryCount()): linear_ring = geom.GetGeometryRef(ring) points = linear_ring.GetPoints() if points: path.moveTo(points[0][0], points[0][1]) for p in points[1:]: path.lineTo(p[0], p[1]) path.closeSubpath() elif geom.GetGeometryType() == ogr.wkbLineString: points = geom.GetPoints() if points: path.moveTo(points[0][0], points[0][1]) for p in points[1:]: path.lineTo(p[0], p[1]) elif geom.GetGeometryType() == ogr.wkbPoint: x, y = geom.GetX(), geom.GetY() path.addEllipse(x - 2, y - 2, 4, 4) return path def auto_zoom(self): """自动缩放视图到数据范围并放大2倍""" if not self.total_bounds.isValid(): return # 设置场景边界 self.scene.setSceneRect(self.total_bounds) # 获取视图可视区域 view_rect = self.graphicsView.viewport().rect() # 计算缩放比例(自动适应 + 2倍放大) transform = QTransform() transform.scale(2, 2) # 先放大2倍 # 应用缩放并居中 self.graphicsView.setTransform(transform) self.graphicsView.fitInView(self.total_bounds, Qt.KeepAspectRatio) # 新增缓冲区分析方法 def buffer_analysis(self): """执行缓冲区分析""" if not hasattr(self, 'current_vector_path'): QMessageBox.warning(self, "警告", "请先打开矢量数据文件!") return # 获取缓冲距离 distance, ok = QInputDialog.getDouble( self, "缓冲区分析", "输入缓冲距离(单位与数据坐标系一致):", 0.0, 0 ) if not ok: return try: # 重新打开数据源获取几何 data_source = ogr.Open(self.current_vector_path, 0) layer = data_source.GetLayer(0) # 创建缓冲区路径 buffer_path = QPainterPath() pen = QPen(Qt.red, 2, Qt.DashLine) brush = QBrush(QColor(255, 0, 0, 50)) # 半透明红色填充 for feature in layer: geom = feature.GetGeometryRef() buffer_geom = geom.Buffer(distance) path = self.geometry_to_qpainterpath(buffer_geom) buffer_path.addPath(path) # 添加到场景 self.scene.addPath(buffer_path, pen, brush) # 更新视图边界 if buffer_path.boundingRect().isValid(): self.total_bounds = self.total_bounds.united(buffer_path.boundingRect()) self.auto_zoom() data_source = None except Exception as e: QMessageBox.critical(self, "错误", f"缓冲区分析失败: {str(e)}") def load_vector_data(self, file_path): self.scene.clear() self.total_bounds = QRectF() try: data_source = ogr.Open(file_path, 0) layer = data_source.GetLayer(0) # 获取字段定义 layer_defn = layer.GetLayerDefn() field_names = [layer_defn.GetFieldDefn(i).GetName() for i in range(layer_defn.GetFieldCount())] for feature in layer: geom = feature.GetGeometryRef() path = self.geometry_to_qpainterpath(geom) # 创建属性字典 attributes = { "FID": feature.GetFID(), **{name: feature.GetField(name) for name in field_names} } # 使用自定义图形项 item = FeatureItem(path, attributes) item.setPen(QPen(Qt.blue, 1)) item.setBrush(QBrush(Qt.cyan)) self.scene.addItem(item) if path.boundingRect().isValid(): self.total_bounds = self.total_bounds.united(path.boundingRect()) data_source = None except Exception as e: print(f"加载失败: {str(e)}") self.current_vector_path = file_path data_source = None # 新增属性查询方法 def enable_query_mode(self): """启用属性查询模式""" self.is_query_mode = not self.is_query_mode self.actionQuery_Attribute.setText("退出查询" if self.is_query_mode else "属性查询") self.graphicsView.setCursor(Qt.CrossCursor if self.is_query_mode else Qt.ArrowCursor) # 新增鼠标事件处理 def mousePressEvent(self, event): if self.is_query_mode and event.button() == Qt.LeftButton: scene_pos = self.graphicsView.mapToScene(event.pos()) items = self.scene.items(scene_pos, Qt.IntersectsItemShape, Qt.DescendingOrder) for item in items: if isinstance(item, FeatureItem): # 构建属性信息字符串 info = "\n".join([f"{k}: {v}" for k, v in item.attributes.items()]) QMessageBox.information(self, "要素属性", info) return super().mousePressEvent(event) def open_raster_data(self): """打开栅格数据文件""" file_path, _ = QFileDialog.getOpenFileName( self, "打开栅格文件", "", "GeoTIFF (*.tif);;JPEG (*.jpg *.jpeg);;PNG (*.png);;All Files (*)" ) if file_path: try: self.load_raster_data(file_path) self.auto_zoom() except Exception as e: QMessageBox.critical(self, "错误", f"加载栅格失败: {str(e)}") def load_raster_data(self, file_path): """加载栅格数据到视图""" # 打开栅格文件(需要用户修改路径的部分) dataset = gdal.Open(file_path) # 相对路径示例:"./data/raster.tif" # 读取第一个波段 band = dataset.GetRasterBand(1) width = dataset.RasterXSize height = dataset.RasterYSize # 转换为numpy数组 data = band.ReadAsArray() # 创建QImage(注意数据类型转换) if data.dtype == np.uint8: format = QImage.Format.Format_Grayscale8 else: format = QImage.Format.Format_ARGB32 q_img = QImage(data.tobytes(), width, height, format) # 创建像素图项 pixmap = QPixmap.fromImage(q_img) raster_item = self.scene.addPixmap(pixmap) # 处理地理坐标(如果存在) geotransform = dataset.GetGeoTransform() if geotransform: # 计算四个角的坐标 x_origin = geotransform[0] y_origin = geotransform[3] pixel_width = geotransform[1] pixel_height = geotransform[5] # 更新场景边界 x_min = x_origin x_max = x_origin + pixel_width * width y_min = y_origin + pixel_height * height y_max = y_origin self.total_bounds = QRectF( QPointF(x_min, y_min), QPointF(x_max, y_max) ) dataset = None # 关闭数据集 def open_band_combination(self): if not hasattr(self, 'current_raster_path'): QMessageBox.warning(self, "警告", "请先打开栅格数据文件!") return # 复用open_raster_data的逻辑 self.open_raster_data() def open_raster_data(self): file_path, _ = QFileDialog.getOpenFileName( self, "打开栅格文件", "", "GeoTIFF (*.tif);;JPEG (*.jpg *.jpeg);;PNG (*.png);;All Files (*)" ) if file_path: try: dataset = gdal.Open(file_path) num_bands = dataset.RasterCount # 获取用户输入的波段组合 red_band, ok1 = QInputDialog.getInt( self, "波段选择", f"红通道波段号 (1-{num_bands}):", 1, 1, num_bands ) green_band, ok2 = QInputDialog.getInt( self, "波段选择", f"绿通道波段号 (1-{num_bands}):", min(2, num_bands), 1, num_bands ) blue_band, ok3 = QInputDialog.getInt( self, "波段选择", f"蓝通道波段号 (1-{num_bands}):", min(3, num_bands), 1, num_bands ) if not (ok1 and ok2 and ok3): return self.load_raster_data(file_path, red_band, green_band, blue_band) self.auto_zoom() self.current_raster_path = file_path # 新增存储当前路径 except Exception as e: QMessageBox.critical(self, "错误", f"加载栅格失败: {str(e)}") def load_raster_data(self, file_path, red_band=1, green_band=2, blue_band=3): """加载栅格数据到视图(支持波段组合)""" dataset = gdal.Open(file_path) width = dataset.RasterXSize height = dataset.RasterYSize # 读取三个波段数据 def read_band(band_num): band = dataset.GetRasterBand(band_num) data = band.ReadAsArray() # 自动拉伸到0-255范围 data_min = data.min() data_max = data.max() return np.clip(((data - data_min) / (data_max - data_min) * 255), 0, 255).astype(np.uint8) # 合并波段 rgb_array = np.dstack([ read_band(red_band), read_band(green_band), read_band(blue_band) ]) # 创建QImage q_img = QImage( rgb_array.data, width, height, 3 * width, # 每像素3字节(RGB) QImage.Format.Format_RGB888 ) # 创建像素图项 pixmap = QPixmap.fromImage(q_img) self.scene.addPixmap(pixmap) # 处理地理坐标(保持原有逻辑) geotransform = dataset.GetGeoTransform() if geotransform: x_origin = geotransform[0] y_origin = geotransform[3] pixel_width = geotransform[1] pixel_height = geotransform[5] x_min = x_origin x_max = x_origin + pixel_width * width y_min = y_origin + pixel_height * height # 计算下边界 y_max = y_origin # 上边界 # 确保坐标顺序正确 if x_min > x_max: x_min, x_max = x_max, x_min if y_min > y_max: y_min, y_max = y_max, y_min self.total_bounds = QRectF(QPointF(x_min, y_min), QPointF(x_max, y_max)) dataset = None这是我的完整代码,里面以及实现了打开矢量数据和打开栅格数据的功能,请你在打开栅格数据的基础上增加一个显示裁剪的功能,这个功能是单独的哈,直接在我这个代码的基础上增加代码就行,原有的代码不要改,然后把增加裁剪这一功能的完整代码发给我就行。

<think>好的,我现在需要帮助用户在现有的GIS软件代码中添加一个栅格裁剪功能。用户希望这个功能是独立的,不要修改原有代码,只添加新的部分。首先,我需要理解用户现有的代码结构,找到合适的位置插入新功能。 首先,用户的主窗口类MainWindow中已经有打开栅格数据的功能,相关的函数是open_raster_data和load_raster_data。裁剪功能需要一个新的按钮,可能放在工具栏里,所以需要修改init_ui方法,添加一个QAction,比如“栅格裁剪”,并连接到新的处理函数。 接下来,处理函数需要执行以下步骤: 1. 检查是否已经打开了栅格数据,如果没有,弹出警告。 2. 打开文件对话框让用户选择矢量裁剪区域文件(比如Shapefile)。 3. 使用GDAL读取栅格数据和矢量数据,提取矢量的几何范围。 4. 根据矢量的范围对栅格进行裁剪,可能需要用到GDAL的Warp功能。 5. 将裁剪后的栅格显示在场景中,并调整视图。 需要注意的是,用户现有的代码中已经处理了栅格的加载和显示,所以裁剪后的结果应该复用现有的load_raster_data方法,或者类似的逻辑,确保显示正确。同时,要确保不修改原有代码,只添加新的部分。 在代码结构上,需要添加新的方法,比如clip_raster,处理裁剪逻辑。还需要在init_ui中添加新的按钮和动作。另外,可能需要导入一些新的模块,比如os模块来处理文件路径,但用户现有的代码中可能已经导入了必要的库,需要检查。 另外,错误处理也很重要,比如处理文件打开失败、GDAL操作异常等,需要添加try-except块,并弹出错误提示。 最后,测试新功能是否独立,不会影响原有功能,比如打开栅格和矢量数据、波段组合等。确保裁剪后的栅格正确显示,并且视图自动调整。 总结步骤: 1. 在init_ui中添加“栅格裁剪”按钮。 2. 编写clip_raster方法,处理文件选择、数据读取、裁剪操作。 3. 使用GDAL的Warp功能进行裁剪,可能需要设置目标范围。 4. 调用现有的load_raster_data显示结果,或者新建一个方法来显示裁剪后的栅格。 5. 添加必要的错误处理和用户提示。 现在需要将这些步骤转化为具体的代码,插入到用户现有的代码中,确保不修改原有部分,只添加新代码。 </think> 以下是在您现有代码基础上新增的栅格裁剪功能完整代码(新增部分用注释标出): ```python # 新增栅格裁剪功能(在init_ui方法中添加) def init_ui(self): # ... [原有其他按钮代码] ... # 新增栅格裁剪按钮(在init_ui方法末尾添加) self.actionClip_Raster = QAction("栅格裁剪", self) self.toolBar.addAction(self.actionClip_Raster) self.actionClip_Raster.triggered.connect(self.clip_raster) # 新增连接 # 新增栅格裁剪方法(在类末尾添加) def clip_raster(self): """执行栅格裁剪功能""" if not hasattr(self, 'current_raster_path'): QMessageBox.warning(self, "警告", "请先打开栅格数据文件!") return # 选择裁剪矢量文件 vector_path, _ = QFileDialog.getOpenFileName( self, "选择裁剪区域文件", "", "Shapefile (*.shp);;GeoJSON (*.geojson);;All Files (*)" ) if not vector_path: return try: # 获取原始栅格信息 src_ds = gdal.Open(self.current_raster_path) geotransform = src_ds.GetGeoTransform() proj = src_ds.GetProjection() # 获取矢量范围 vector_ds = ogr.Open(vector_path) layer = vector_ds.GetLayer() feature = layer.GetNextFeature() geom = feature.GetGeometryRef() x_min, x_max, y_min, y_max = geom.GetEnvelope() # 创建临时裁剪结果文件 import os output_path = os.path.splitext(self.current_raster_path)[0] + "_clipped.tif" # 执行裁剪操作 options = gdal.WarpOptions( format='GTiff', outputBounds=[x_min, y_min, x_max, y_max], dstSRS=proj ) gdal.Warp(output_path, src_ds, options=options) # 加载裁剪结果 self.load_raster_data(output_path) self.auto_zoom() # 清理资源 src_ds = None vector_ds = None except Exception as e: QMessageBox.critical(self, "错误", f"栅格裁剪失败: {str(e)}") ``` 需要新增的代码位置: 1. 在`init_ui`方法末尾添加新按钮(约第50行) 2. 在类定义末尾添加`clip_raster`方法(约第300行)

现在我已经完成了这些功能,最后我需要一个保存的功能,这个是我的完整的代码直接在我这个代码的基础上增加代码就行,原有的代码不要改,然后把保存这一功能的完整代码发给我就行。就是要保存之前改了的某些数据。# main_window.py(主窗口逻辑) import numpy as np from PySide6.QtWidgets import QMainWindow, QFileDialog, QGraphicsScene, QGraphicsView, QMessageBox, QGraphicsPathItem from PySide6.QtGui import QPainterPath, QPen, QBrush, QAction, QTransform, QImage, QPixmap, QColor from PySide6.QtCore import Qt, QRectF, QPointF from osgeo import ogr, gdal from PySide6.QtWidgets import QInputDialog # 新增输入对话框 # 新增自定义图形项类(用于存储属性) class FeatureItem(QGraphicsPathItem): def __init__(self, path, attributes): super().__init__(path) self.attributes = attributes # 存储属性字典 class MainWindow(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("GIS软件") self.setGeometry(100, 100, 800, 600) ogr.UseExceptions() self.init_ui() self.scene = QGraphicsScene(self) self.graphicsView.setScene(self.scene) # 新增:存储所有几何边界 self.total_bounds = QRectF() def init_ui(self): self.toolBar = self.addToolBar("工具") self.actionOpen_Vector_Data = QAction("打开矢量数据", self) self.toolBar.addAction(self.actionOpen_Vector_Data) # 新增栅格动作 self.actionOpen_Raster_Data = QAction("打开栅格数据", self) self.toolBar.addAction(self.actionOpen_Raster_Data) # 添加到工具栏 # 新增缓冲区分析按钮 self.actionBuffer_Analysis = QAction("缓冲区分析", self) self.toolBar.addAction(self.actionBuffer_Analysis) self.graphicsView = QGraphicsView() self.setCentralWidget(self.graphicsView) # 新增属性查询按钮 self.actionQuery_Attribute = QAction("属性查询", self) self.toolBar.addAction(self.actionQuery_Attribute) self.actionOpen_Vector_Data.triggered.connect(self.open_vector_data) self.actionOpen_Raster_Data.triggered.connect(self.open_raster_data) # 新增连接 self.actionBuffer_Analysis.triggered.connect(self.buffer_analysis) self.actionQuery_Attribute.triggered.connect(self.enable_query_mode) # 新增鼠标点击事件 self.graphicsView.setMouseTracking(True) self.is_query_mode = False # 新增波段组合按钮 self.actionBand_Combination = QAction("波段组合", self) self.toolBar.addAction(self.actionBand_Combination) self.actionBand_Combination.triggered.connect(self.open_band_combination) # 新增栅格裁剪按钮(在init_ui方法末尾添加) self.actionClip_Raster = QAction("栅格裁剪", self) self.toolBar.addAction(self.actionClip_Raster) self.actionClip_Raster.triggered.connect(self.clip_raster) # 新增连接 self.actionBand_Calculation = QAction("波段运算", self) self.toolBar.addAction(self.actionBand_Calculation) self.actionBand_Calculation.triggered.connect(self.band_calculation) # 新增质心绘制按钮(放在init_ui方法中) self.actionDraw_Centroids = QAction("绘制质心", self) self.toolBar.addAction(self.actionDraw_Centroids) self.actionDraw_Centroids.triggered.connect(self.draw_centroids) self.centroid_items = [] # 新增:存储质心图形项 # 新增空间查询按钮(放在init_ui方法中) self.actionSpatial_Query = QAction("空间查询", self) self.toolBar.addAction(self.actionSpatial_Query) self.actionSpatial_Query.triggered.connect(self.enable_spatial_query_mode) self.is_spatial_query_mode = False self.spatial_query_results = [] # 存储查询结果 def open_vector_data(self): file_path, _ = QFileDialog.getOpenFileName( self, "打开矢量文件", "", "Shapefile (*.shp);;GeoJSON (*.geojson);;All Files (*)" ) if file_path: self.load_vector_data(file_path) # 新增:自动缩放视图 self.auto_zoom() def load_vector_data(self, file_path): self.scene.clear() self.total_bounds = QRectF() # 重置边界 try: data_source = ogr.Open(file_path, 0) layer = data_source.GetLayer(0) for feature in layer: geom = feature.GetGeometryRef() path = self.geometry_to_qpainterpath(geom) # 更新总边界 if path.boundingRect().isValid(): self.total_bounds = self.total_bounds.united(path.boundingRect()) pen = QPen(Qt.blue, 1) brush = QBrush(Qt.cyan) self.scene.addPath(path, pen, brush) data_source = None except Exception as e: print(f"加载失败: {str(e)}") self.current_vector_path = file_path # 新增这一行 data_source = None def geometry_to_qpainterpath(self, geom): path = QPainterPath() if geom.GetGeometryType() == ogr.wkbPolygon: for ring in range(geom.GetGeometryCount()): linear_ring = geom.GetGeometryRef(ring) points = linear_ring.GetPoints() if points: path.moveTo(points[0][0], points[0][1]) for p in points[1:]: path.lineTo(p[0], p[1]) path.closeSubpath() elif geom.GetGeometryType() == ogr.wkbLineString: points = geom.GetPoints() if points: path.moveTo(points[0][0], points[0][1]) for p in points[1:]: path.lineTo(p[0], p[1]) elif geom.GetGeometryType() == ogr.wkbPoint: x, y = geom.GetX(), geom.GetY() path.addEllipse(x - 2, y - 2, 4, 4) return path def auto_zoom(self): """自动缩放视图到数据范围并放大2倍""" if not self.total_bounds.isValid(): return # 设置场景边界 self.scene.setSceneRect(self.total_bounds) # 获取视图可视区域 view_rect = self.graphicsView.viewport().rect() # 计算缩放比例(自动适应 + 2倍放大) transform = QTransform() transform.scale(2, 2) # 先放大2倍 # 应用缩放并居中 self.graphicsView.setTransform(transform) self.graphicsView.fitInView(self.total_bounds, Qt.KeepAspectRatio) # 新增缓冲区分析方法 def buffer_analysis(self): """执行缓冲区分析""" if not hasattr(self, 'current_vector_path'): QMessageBox.warning(self, "警告", "请先打开矢量数据文件!") return # 获取缓冲距离 distance, ok = QInputDialog.getDouble( self, "缓冲区分析", "输入缓冲距离(单位与数据坐标系一致):", 0.0, 0 ) if not ok: return try: # 重新打开数据源获取几何 data_source = ogr.Open(self.current_vector_path, 0) layer = data_source.GetLayer(0) # 创建缓冲区路径 buffer_path = QPainterPath() pen = QPen(Qt.red, 2, Qt.DashLine) brush = QBrush(QColor(255, 0, 0, 50)) # 半透明红色填充 for feature in layer: geom = feature.GetGeometryRef() buffer_geom = geom.Buffer(distance) path = self.geometry_to_qpainterpath(buffer_geom) buffer_path.addPath(path) # 添加到场景 self.scene.addPath(buffer_path, pen, brush) # 更新视图边界 if buffer_path.boundingRect().isValid(): self.total_bounds = self.total_bounds.united(buffer_path.boundingRect()) self.auto_zoom() data_source = None except Exception as e: QMessageBox.critical(self, "错误", f"缓冲区分析失败: {str(e)}") def load_vector_data(self, file_path): self.scene.clear() self.total_bounds = QRectF() try: data_source = ogr.Open(file_path, 0) layer = data_source.GetLayer(0) # 获取字段定义 layer_defn = layer.GetLayerDefn() field_names = [layer_defn.GetFieldDefn(i).GetName() for i in range(layer_defn.GetFieldCount())] for feature in layer: geom = feature.GetGeometryRef() path = self.geometry_to_qpainterpath(geom) # 创建属性字典 attributes = { "FID": feature.GetFID(), **{name: feature.GetField(name) for name in field_names} } # 使用自定义图形项 item = FeatureItem(path, attributes) item.setPen(QPen(Qt.blue, 1)) item.setBrush(QBrush(Qt.cyan)) self.scene.addItem(item) if path.boundingRect().isValid(): self.total_bounds = self.total_bounds.united(path.boundingRect()) data_source = None except Exception as e: print(f"加载失败: {str(e)}") self.current_vector_path = file_path data_source = None # 新增属性查询方法 def enable_query_mode(self): """启用属性查询模式""" self.is_query_mode = not self.is_query_mode self.actionQuery_Attribute.setText("退出查询" if self.is_query_mode else "属性查询") self.graphicsView.setCursor(Qt.CrossCursor if self.is_query_mode else Qt.ArrowCursor) # 新增鼠标事件处理 def mousePressEvent(self, event): if self.is_query_mode and event.button() == Qt.LeftButton: scene_pos = self.graphicsView.mapToScene(event.pos()) items = self.scene.items(scene_pos, Qt.IntersectsItemShape, Qt.DescendingOrder) for item in items: if isinstance(item, FeatureItem): # 构建属性信息字符串 info = "\n".join([f"{k}: {v}" for k, v in item.attributes.items()]) QMessageBox.information(self, "要素属性", info) return super().mousePressEvent(event) def draw_centroids(self): """独立质心绘制功能""" if not hasattr(self, 'current_vector_path'): QMessageBox.warning(self, "警告", "请先打开矢量数据文件!") return # 清除已有质心 for item in self.centroid_items: self.scene.removeItem(item) self.centroid_items.clear() try: data_source = ogr.Open(self.current_vector_path, 0) layer = data_source.GetLayer(0) for feature in layer: geom = feature.GetGeometryRef() centroid = geom.Centroid() if centroid: # 创建质心图形项 path = QPainterPath() path.addEllipse( QRectF( centroid.GetX() - 0.3, # 修改为0.3像素半径 centroid.GetY() - 0.3, 0.6, 0.6 # 直径0.6像素 ) ) item = self.scene.addPath( path, QPen(Qt.red, 0.1), QBrush(Qt.red) ) self.centroid_items.append(item) data_source = None self.auto_zoom() except Exception as e: QMessageBox.critical(self, "错误", f"质心绘制失败: {str(e)}") # 新增空间查询模式切换方法 def enable_spatial_query_mode(self): """启用空间查询模式""" self.is_spatial_query_mode = not self.is_spatial_query_mode self.actionSpatial_Query.setText("退出空间查询" if self.is_spatial_query_mode else "空间查询") self.graphicsView.setCursor(Qt.CrossCursor if self.is_spatial_query_mode else Qt.ArrowCursor) if not self.is_spatial_query_mode: self.clear_spatial_query_results() # 新增空间查询处理方法 def mousePressEvent(self, event): if self.is_spatial_query_mode and event.button() == Qt.LeftButton: scene_pos = self.graphicsView.mapToScene(event.pos()) items = self.scene.items(scene_pos, Qt.IntersectsItemShape, Qt.DescendingOrder) for item in items: if isinstance(item, FeatureItem): # 获取空间关系选择 relations = ["相交", "包含", "被包含", "接触", "重叠"] relation, ok = QInputDialog.getItem( self, "空间关系选择", "请选择空间关系:", relations, 0, False ) if not ok: return # 执行空间查询 self.perform_spatial_query(item, relation) return super().mousePressEvent(event) # 新增空间查询核心方法 def perform_spatial_query(self, source_item, relation): """执行空间查询并高亮结果""" self.clear_spatial_query_results() try: # 获取源要素几何 source_geom = self.item_to_geometry(source_item) if not source_geom: return # 获取所有要素 all_items = [item for item in self.scene.items() if isinstance(item, FeatureItem)] # 遍历检查空间关系 for target_item in all_items: target_geom = self.item_to_geometry(target_item) if not target_geom: continue # 执行空间关系判断 if relation == "相交" and source_geom.Intersects(target_geom): self.highlight_item(target_item) elif relation == "包含" and source_geom.Contains(target_geom): self.highlight_item(target_item) elif relation == "被包含" and target_geom.Contains(source_geom): self.highlight_item(target_item) elif relation == "接触" and source_geom.Touches(target_geom): self.highlight_item(target_item) elif relation == "重叠" and source_geom.Overlaps(target_geom): self.highlight_item(target_item) except Exception as e: QMessageBox.critical(self, "错误", f"空间查询失败: {str(e)}") # 新增辅助方法 def item_to_geometry(self, item): """将图形项转换为OGR几何对象""" path = item.path() elements = path.toSubpathPolygons(QTransform()) if not elements: return None # 创建多边形几何 geom = ogr.Geometry(ogr.wkbPolygon) ring = ogr.Geometry(ogr.wkbLinearRing) for point in elements[0]: ring.AddPoint(point.x(), point.y()) ring.CloseRings() geom.AddGeometry(ring) return geom def highlight_item(self, item): """高亮显示查询结果""" original_pen = item.pen() highlight_pen = QPen(Qt.yellow, 3) item.setPen(highlight_pen) self.spatial_query_results.append((item, original_pen)) def clear_spatial_query_results(self): """清除查询结果高亮""" for item, original_pen in self.spatial_query_results: item.setPen(original_pen) self.spatial_query_results.clear() def open_raster_data(self): """打开栅格数据文件""" file_path, _ = QFileDialog.getOpenFileName( self, "打开栅格文件", "", "GeoTIFF (*.tif);;JPEG (*.jpg *.jpeg);;PNG (*.png);;All Files (*)" ) if file_path: try: self.load_raster_data(file_path) self.auto_zoom() except Exception as e: QMessageBox.critical(self, "错误", f"加载栅格失败: {str(e)}") def load_raster_data(self, file_path): """加载栅格数据到视图""" # 打开栅格文件(需要用户修改路径的部分) dataset = gdal.Open(file_path) # 相对路径示例:"./data/raster.tif" # 读取第一个波段 band = dataset.GetRasterBand(1) width = dataset.RasterXSize height = dataset.RasterYSize # 转换为numpy数组 data = band.ReadAsArray() # 创建QImage(注意数据类型转换) if data.dtype == np.uint8: format = QImage.Format.Format_Grayscale8 else: format = QImage.Format.Format_ARGB32 q_img = QImage(data.tobytes(), width, height, format) # 创建像素图项 pixmap = QPixmap.fromImage(q_img) raster_item = self.scene.addPixmap(pixmap) # 处理地理坐标(如果存在) geotransform = dataset.GetGeoTransform() if geotransform: # 计算四个角的坐标 x_origin = geotransform[0] y_origin = geotransform[3] pixel_width = geotransform[1] pixel_height = geotransform[5] # 更新场景边界 x_min = x_origin x_max = x_origin + pixel_width * width y_min = y_origin + pixel_height * height y_max = y_origin self.total_bounds = QRectF( QPointF(x_min, y_min), QPointF(x_max, y_max) ) dataset = None # 关闭数据集 def open_band_combination(self): if not hasattr(self, 'current_raster_path'): QMessageBox.warning(self, "警告", "请先打开栅格数据文件!") return # 复用open_raster_data的逻辑 self.open_raster_data() def open_raster_data(self): file_path, _ = QFileDialog.getOpenFileName( self, "打开栅格文件", "", "GeoTIFF (*.tif);;JPEG (*.jpg *.jpeg);;PNG (*.png);;All Files (*)" ) if file_path: try: dataset = gdal.Open(file_path) num_bands = dataset.RasterCount # 获取用户输入的波段组合 red_band, ok1 = QInputDialog.getInt( self, "波段选择", f"红通道波段号 (1-{num_bands}):", 1, 1, num_bands ) green_band, ok2 = QInputDialog.getInt( self, "波段选择", f"绿通道波段号 (1-{num_bands}):", min(2, num_bands), 1, num_bands ) blue_band, ok3 = QInputDialog.getInt( self, "波段选择", f"蓝通道波段号 (1-{num_bands}):", min(3, num_bands), 1, num_bands ) if not (ok1 and ok2 and ok3): return self.load_raster_data(file_path, red_band, green_band, blue_band) self.auto_zoom() self.current_raster_path = file_path # 新增存储当前路径 except Exception as e: QMessageBox.critical(self, "错误", f"加载栅格失败: {str(e)}") def load_raster_data(self, file_path, red_band=1, green_band=2, blue_band=3): """加载栅格数据到视图(支持波段组合)""" dataset = gdal.Open(file_path) width = dataset.RasterXSize height = dataset.RasterYSize # 读取三个波段数据 def read_band(band_num): band = dataset.GetRasterBand(band_num) data = band.ReadAsArray() # 自动拉伸到0-255范围 data_min = data.min() data_max = data.max() return np.clip(((data - data_min) / (data_max - data_min) * 255), 0, 255).astype(np.uint8) # 合并波段 rgb_array = np.dstack([ read_band(red_band), read_band(green_band), read_band(blue_band) ]) # 创建QImage q_img = QImage( rgb_array.data, width, height, 3 * width, # 每像素3字节(RGB) QImage.Format.Format_RGB888 ) # 创建像素图项 pixmap = QPixmap.fromImage(q_img) self.scene.addPixmap(pixmap) # 处理地理坐标(保持原有逻辑) geotransform = dataset.GetGeoTransform() if geotransform: x_origin = geotransform[0] y_origin = geotransform[3] pixel_width = geotransform[1] pixel_height = geotransform[5] x_min = x_origin x_max = x_origin + pixel_width * width y_min = y_origin + pixel_height * height # 计算下边界 y_max = y_origin # 上边界 # 确保坐标顺序正确 if x_min > x_max: x_min, x_max = x_max, x_min if y_min > y_max: y_min, y_max = y_max, y_min self.total_bounds = QRectF(QPointF(x_min, y_min), QPointF(x_max, y_max)) dataset = None # 新增栅格裁剪方法(必须缩进在类内部) def clip_raster(self): """执行栅格裁剪功能""" if not hasattr(self, 'current_raster_path'): QMessageBox.warning(self, "警告", "请先打开栅格数据文件!") return # 选择裁剪矢量文件 vector_path, _ = QFileDialog.getOpenFileName( self, "选择裁剪区域文件", "", "Shapefile (*.shp);;GeoJSON (*.geojson);;All Files (*)" ) if not vector_path: return try: # 获取原始栅格信息 src_ds = gdal.Open(self.current_raster_path) geotransform = src_ds.GetGeoTransform() proj = src_ds.GetProjection() # 获取矢量范围 vector_ds = ogr.Open(vector_path) layer = vector_ds.GetLayer() feature = layer.GetNextFeature() geom = feature.GetGeometryRef() x_min, x_max, y_min, y_max = geom.GetEnvelope() # 创建临时裁剪结果文件 import os # 确保导入os模块 output_path = os.path.splitext(self.current_raster_path)[0] + "_clipped.tif" # 执行裁剪操作 options = gdal.WarpOptions( format='GTiff', outputBounds=[x_min, y_min, x_max, y_max], dstSRS=proj ) gdal.Warp(output_path, src_ds, options=options) # 加载裁剪结果 self.load_raster_data(output_path) self.auto_zoom() # 清理资源 src_ds = None vector_ds = None except Exception as e: QMessageBox.critical(self, "错误", f"栅格裁剪失败: {str(e)}") # 新增波段运算方法 def band_calculation(self): """执行波段运算(示例为NDVI计算)""" if not hasattr(self, 'current_raster_path'): QMessageBox.warning(self, "警告", "请先打开栅格数据文件!") return try: # 获取用户输入参数 red_band, ok1 = QInputDialog.getInt( self, "波段选择", "输入红波段编号 (1-based):", 1, 1, 100 ) nir_band, ok2 = QInputDialog.getInt( self, "波段选择", "输入近红外波段编号 (1-based):", 4, 1, 100 ) if not (ok1 and ok2): return # 读取栅格数据 dataset = gdal.Open(self.current_raster_path) red = dataset.GetRasterBand(red_band).ReadAsArray() nir = dataset.GetRasterBand(nir_band).ReadAsArray() # 执行NDVI计算 ndvi = np.where( (nir + red) == 0, 0, (nir - red) / (nir + red) ).astype(np.float32) # 创建输出文件 output_path, _ = QFileDialog.getSaveFileName( self, "保存结果", "", "GeoTIFF (*.tif)" ) if not output_path: return # 写入结果 driver = gdal.GetDriverByName('GTiff') out_ds = driver.Create( output_path, dataset.RasterXSize, dataset.RasterYSize, 1, gdal.GDT_Float32 ) out_ds.SetGeoTransform(dataset.GetGeoTransform()) out_ds.SetProjection(dataset.GetProjection()) out_ds.GetRasterBand(1).WriteArray(ndvi) out_ds.FlushCache() # 清理资源 dataset = None out_ds = None QMessageBox.information(self, "成功", f"NDVI计算结果已保存至:\n{output_path}") except Exception as e: QMessageBox.critical(self, "错误", f"波段运算失败: {str(e)}")

<think>好的,用户需要为现有的GIS软件添加保存功能,特别是保存之前修改过的数据。首先,我需要回顾用户提供的代码,了解当前的数据结构和功能。用户已经实现了矢量数据的加载、缓冲区分析、属性查询、空间查询等功能,现在需要保存这些修改后的数据。 首先,保存功能需要考虑保存哪些数据。用户可能修改了矢量数据(如缓冲区分析生成的缓冲区、质心、空间查询结果等),所以需要将这些修改保存到新的矢量文件中。栅格数据的处理可能涉及裁剪和波段运算,但用户当前的问题主要集中在矢量数据的保存上。 接下来,我需要确定如何将QGraphicsScene中的图形项转换回OGR的几何对象,并保存到文件中。现有的FeatureItem类存储了原始属性和路径,但修改后的图形项(如缓冲区、质心)可能没有这些属性,需要单独处理。 保存功能需要以下几个步骤: 1. 添加保存按钮到工具栏,并连接相应的槽函数。 2. 收集场景中所有需要保存的图形项,包括原始要素、缓冲区、质心等。 3. 将这些图形项转换为OGR的几何对象,并保留属性信息。 4. 创建新的矢量文件,定义图层和字段,将几何和属性写入文件。 需要注意的是,用户可能修改了原始数据,如添加了新的几何图形(缓冲区),这些图形可能没有原始属性,需要生成新的属性或保留原有属性。此外,空间查询结果的高亮显示可能只是视觉上的变化,不需要保存,除非用户明确要保存查询结果。 在代码实现中,需要遍历场景中的所有图形项,区分FeatureItem和其他图形项(如缓冲区、质心)。对于FeatureItem,可以直接使用其存储的属性和几何;对于其他图形项,可能需要生成新的属性或忽略。 还需要处理文件保存的对话框,让用户选择保存路径和文件名,以及选择保存的格式(如Shapefile、GeoJSON等)。 最后,测试保存功能,确保生成的矢量文件正确包含所有修改后的要素,并且属性信息完整。 可能遇到的问题包括几何转换的准确性、属性字段的定义、不同格式的支持等。需要确保使用OGR库正确创建数据源和图层,处理坐标系信息,以及正确写入几何和属性数据。 总结,保存功能的核心是将场景中的图形项转换回矢量数据格式,并保存到文件中。需要处理不同类型的图形项,保留必要的属性,并确保文件格式的正确性。 </think> 以下是添加保存功能的完整代码(在原有代码基础上新增部分): ```python # 在MainWindow类的init_ui方法中添加(保持原有代码不变) def init_ui(self): # ...原有代码... # 新增保存按钮(放在init_ui方法中) self.actionSave_Vector = QAction("保存矢量数据", self) self.toolBar.addAction(self.actionSave_Vector) self.actionSave_Vector.triggered.connect(self.save_vector_data) # 新增保存方法 def save_vector_data(self): """保存当前场景中的矢量数据""" if not hasattr(self, 'current_vector_path'): QMessageBox.warning(self, "警告", "没有可保存的矢量数据!") return # 获取保存路径 file_path, _ = QFileDialog.getSaveFileName( self, "保存矢量文件", "", "Shapefile (*.shp);;GeoJSON (*.geojson);;All Files (*)" ) if not file_path: return try: # 获取原始数据源信息 src_ds = ogr.Open(self.current_vector_path) src_layer = src_ds.GetLayer(0) src_defn = src_layer.GetLayerDefn() # 创建目标数据源 driver = ogr.GetDriverByName("ESRI Shapefile" if file_path.endswith(".shp") else "GeoJSON") if os.path.exists(file_path): driver.DeleteDataSource(file_path) dst_ds = driver.CreateDataSource(file_path) # 创建图层(保持与原始数据相同的坐标系) dst_layer = dst_ds.CreateLayer( "saved_features", srs=src_layer.GetSpatialRef(), geom_type=ogr.wkbPolygon ) # 复制字段定义 for i in range(src_defn.GetFieldCount()): field_defn = src_defn.GetFieldDefn(i) dst_layer.CreateField(field_defn) # 遍历场景中的所有要素项 for item in self.scene.items(): if isinstance(item, FeatureItem): # 创建新要素 feature = ogr.Feature(dst_layer.GetLayerDefn()) # 复制属性 for key, value in item.attributes.items(): if key == "FID": continue # FID通常自动生成 if feature.GetFieldIndex(key) != -1: feature.SetField(key, str(value)) # 转换几何 geom = self.item_to_geometry(item) if geom: feature.SetGeometry(geom) dst_layer.CreateFeature(feature) feature = None # 添加缓冲区要素(如果存在) self.save_additional_features(dst_layer, "buffer") # 添加质心要素(如果存在) self.save_additional_features(dst_layer, "centroid") dst_ds = None src_ds = None QMessageBox.information(self, "成功", f"数据已保存至:\n{file_path}") except Exception as e: QMessageBox.critical(self, "错误", f"保存失败: {str(e)}") # 新增辅助保存方法 def save_additional_features(self, layer, feature_type): """保存附加要素(缓冲区/质心)""" items = [] if feature_type == "buffer": items = [item for item in self.scene.items() if item.pen().style() == Qt.DashLine and item.pen().color() == Qt.red] elif feature_type == "centroid": items = self.centroid_items for item in items: geom = self.item_to_geometry(item) if geom: feature = ogr.Feature(layer.GetLayerDefn()) feature.SetGeometry(geom) # 添加类型标识字段 feature.SetField("FEATURE_TYPE", feature_type.upper()) layer.CreateFeature(feature) feature = None # 在item_to_geometry方法中添加点要素支持 def item_to_geometry(self, item): """增强版几何转换(支持点要素)""" path = item.path() elements = path.toSubpathPolygons(QTransform()) if not elements: # 处理点要素 if isinstance(item, QGraphicsPathItem): path = item.path() if path.elementCount() == 1 and path.elementAt(0).isMoveTo(): pt = path.elementAt(0) geom = ogr.Geometry(ogr.wkbPoint) geom.AddPoint(pt.x, pt.y) return geom return None # 原有多边形处理逻辑 geom = ogr.Geometry(ogr.wkbPolygon) ring = ogr.Geometry(ogr.wkbLinearRing) for point in elements[0]: ring.AddPoint(point.x(), point.y()) ring.CloseRings() geom.AddGeometry(ring) return geom ```
阅读全文

相关推荐

更改文件代码,其中训练集和测试集的分配方式如split_datasets所示不变,但是对于测试集,将用户未交互的项目都看作负样本,修改哪里是最高效的: import pickle import torch import Pre_train_function_3 as PF from ctr_dataset_mine_5_neg_pre_t3 import * from Pre_train_me_new import Pre_train import torch.nn as nn import tqdm # 定义数据保存路径 num_users = 500 DATA_CACHE_PATH = 'D:/Desk/code-try/data_read/4domains_.pkl' ITEM_REVIEWS_CACHE_PATH = 'D:/Desk/code-try/data_read/item_reviews.pkl' DATA_CACHE_PATH = DATA_CACHE_PATH.replace('.pkl', str(num_users)+'.pkl') ITEM_REVIEWS_CACHE_PATH = ITEM_REVIEWS_CACHE_PATH.replace('.pkl', str(num_users)+'.pkl') with open(ITEM_REVIEWS_CACHE_PATH, 'rb') as f: item_reviews = pickle.load(f) raw_source_files = [ 'D:/Desk/code-try/datasets/extract_t/Books_.jsonl', 'D:/Desk/code-try/datasets/extract_t/Kindle_Store_.jsonl', 'D:/Desk/code-try/datasets/extract_t/Music_.jsonl' ] source_files = [file.replace('.jsonl', str(num_users)+'.jsonl') for file in raw_source_files] target_file='D:\Desk\code-try\datasets\extract_t\Movies_&_TV_.jsonl' target_file = target_file.replace('.jsonl', str(num_users)+'.jsonl') # 构建跨域数据集 item_mapping_all, user_hist, domain_items = build_cross_domain_dataset( source_files, target_file, threshold=0.7, # 正样本评分阈值 neg_ratio=5 # 负样本采样比例(正样本数×5) ) pop_dict = PF.compute_item_popularity(source_files + [target_file]) print(len(pop_dict)) test_label = 3 train_loader, test_loader = create_data_loaders( item_mapping_all, user_hist, test_label, domain_items, item_reviews, pop_dict, test_rate=0.6, batch_size=32 )

给这段代码加上保存聚类模型的功能: import os import gc import numpy as np import pandas as pd import joblib import talib as ta from tqdm import tqdm import random # 新增random模块 from sklearn.cluster import MiniBatchKMeans from sklearn.preprocessing import StandardScaler from sklearn.model_selection import RandomizedSearchCV, GroupKFold from sklearn.feature_selection import SelectKBest, f_classif from sklearn.metrics import make_scorer, recall_score, classification_report import lightgbm as lgb import logging import psutil import warnings from scipy import sparse warnings.filterwarnings('ignore') # 设置日志记录 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('stock_prediction_fixed.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # ========== 配置类 ========== class StockConfig: def __init__(self): # 数据路径 self.SH_PATH = r"D:\股票量化数据库\股票csv数据\上证" self.SZ_PATH = r"D:\股票量化数据库\股票csv数据\深证" # 时间范围 self.START_DATE = "2018-01-01" self.END_DATE = "2020-12-31" self.TEST_START = "2021-01-01" self.TEST_END = "2021-12-31" # 聚类设置 self.CLUSTER_NUM = 8 self.CLUSTER_FEATURES = [ 'price_change', 'volatility', 'volume_change', 'MA5', 'MA20', 'RSI14', 'MACD_hist' ] # 预测特征 (初始列表,实际使用时会动态更新) self.PREDICT_FEATURES = [ 'open', 'high', 'low', 'close', 'volume', 'price_change', 'volatility', 'volume_change', 'MA5', 'MA20', 'RSI14', 'MACD_hist', 'cluster', 'MOM10', 'ATR14', 'VWAP', 'RSI_diff', 'price_vol_ratio', 'MACD_RSI', 'advance_decline', 'day_of_week', 'month' ] # 模型参数优化范围(内存优化版) self.PARAM_GRID = { 'boosting_type': ['gbdt'], # 减少选项 'num_leaves': [31, 63], # 减少选项 'max_depth': [-1, 7], # 减少选项 'learning_rate': [0.01, 0.05], 'n_estimators': [300, 500], # 减少选项 'min_child_samples': [50], # 固定值 'min_split_gain': [0.0, 0.1], 'reg_alpha': [0, 0.1], 'reg_lambda': [0, 0.1], 'feature_fraction': [0.7, 0.9], 'bagging_fraction': [0.7, 0.9], 'bagging_freq': [1] } # 目标条件 self.MIN_GAIN = 0.05 self.MIN_LOW_RATIO = 0.98 # 调试模式 self.DEBUG_MODE = False self.MAX_STOCKS = 50 if self.DEBUG_MODE else None self.SAMPLE_FRACTION = 0.3 if not self.DEBUG_MODE else 1.0 # 采样比例 # ========== 内存管理工具 ========== def reduce_mem_usage(df): """优化DataFrame内存使用""" start_mem = df.memory_usage().sum() / 1024**2 for col in df.columns: col_type = df[col].dtype if col_type != object: c_min = df[col].min() c_max = df[col].max() if str(col_type)[:3] == 'int': if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max: df[col] = df[col].astype(np.int8) elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max: df[col] = df[col].ast(np.int16) elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max: df[col] = df[col].astype(np.int32) elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max: df[col] = df[col].astype(np.int64) else: if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max: df[col] = df[col].astype(np.float16) elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max: df[col] = df[col].astype(np.float32) else: df[col] = df[col].astype(np.float64) end_mem = df.memory_usage().sum() / 1024**2 logger.info(f'内存优化: 从 {start_mem:.2f} MB 减少到 {end_mem:.2f} MB ({100*(start_mem-end_mem)/start_mem:.1f}%)') return df def print_memory_usage(): """打印当前内存使用情况""" process = psutil.Process(os.getpid()) mem = process.memory_info().rss / (1024 ** 2) logger.info(f"当前内存使用: {mem:.2f} MB") # ========== 数据加载 (修复版) ========== def load_stock_data(sh_path, sz_path, start_date, end_date, sample_fraction=1.0, debug_mode=False, max_stocks=None): """加载股票数据,并过滤日期范围(修复随机抽样问题)""" stock_data = {} # 创建文件列表 all_files = [] for exchange, path in [('SH', sh_path), ('SZ', sz_path)]: if os.path.exists(path): csv_files = [f for f in os.listdir(path) if f.endswith('.csv')] for file in csv_files: all_files.append((exchange, path, file)) if not all_files: logger.warning("没有找到任何CSV文件") return stock_data # 随机抽样(修复一维问题) if sample_fraction < 1.0: sample_size = max(1, int(len(all_files) * sample_fraction)) # 使用random.sample代替np.random.choice all_files = random.sample(all_files, sample_size) logger.info(f"抽样 {len(all_files)} 只股票文件 (比例: {sample_fraction})") total_files = len(all_files) pbar = tqdm(total=total_files, desc='加载股票数据') loaded_count = 0 for exchange, path, file in all_files: if max_stocks is not None and loaded_count >= max_stocks: break if file.endswith('.csv'): stock_code = f"{exchange}_{file.split('.')[0]}" file_path = os.path.join(path, file) try: # 读取数据并验证列名 df = pd.read_csv(file_path) # 验证必要的列是否存在 required_cols = ['date', 'open', 'high', 'low', 'close', 'volume'] if not all(col in df.columns for col in required_cols): logger.warning(f"股票 {stock_code} 缺少必要列,跳过") pbar.update(1) continue # 转换日期并过滤 df['date'] = pd.to_datetime(df['date']) df = df[(df['date'] >= start_date) & (df['date'] <= end_date)] if len(df) < 100: # 至少100个交易日 logger.info(f"股票 {stock_code} 数据不足({len(df)}条),跳过") pbar.update(1) continue # 转换数据类型 for col in ['open', 'high', 'low', 'close']: df[col] = pd.to_numeric(df[col], errors='coerce').astype(np.float32) df['volume'] = pd.to_numeric(df['volume'], errors='coerce').astype(np.uint32) # 删除包含NaN的行 df = df.dropna(subset=required_cols) if len(df) > 0: stock_data[stock_code] = df loaded_count += 1 logger.debug(f"成功加载股票 {stock_code},数据条数: {len(df)}") else: logger.warning(f"股票 {stock_code} 过滤后无数据") except Exception as e: logger.error(f"加载股票 {stock_code} 失败: {str(e)}", exc_info=True) pbar.update(1) # 调试模式只处理少量股票 if debug_mode and loaded_count >= 10: logger.info("调试模式: 已加载10只股票,提前结束") break pbar.close() logger.info(f"成功加载 {len(stock_data)} 只股票数据") return stock_data # ========== 特征工程 (修复版) ========== class FeatureEngineer: def __init__(self, config): self.config = config def safe_fillna(self, series, default=0): """安全填充NaN值""" if isinstance(series, pd.Series): return series.fillna(default) elif isinstance(series, np.ndarray): return np.nan_to_num(series, nan=default) return series def transform(self, df): """添加技术指标特征(修复NumPy数组问题)""" try: # 创建临时副本用于TA-Lib计算 df_temp = df.copy() # 将价格列转换为float64以满足TA-Lib要求 for col in ['open', 'high', 'low', 'close']: df_temp[col] = df_temp[col].astype(np.float64) # 基础特征 df['price_change'] = df['close'].pct_change().fillna(0) df['volatility'] = df['close'].rolling(5).std().fillna(0) df['volume_change'] = df['volume'].pct_change().fillna(0) df['MA5'] = df['close'].rolling(5).mean().fillna(0) df['MA20'] = df['close'].rolling(20).mean().fillna(0) # 技术指标 - 修复NumPy数组问题 rsi = ta.RSI(df_temp['close'].values, timeperiod=14) df['RSI14'] = self.safe_fillna(rsi, 50) macd, macd_signal, macd_hist = ta.MACD( df_temp['close'].values, fastperiod=12, slowperiod=26, signalperiod=9 ) df['MACD_hist'] = self.safe_fillna(macd_hist, 0) # 新增特征 mom = ta.MOM(df_temp['close'].values, timeperiod=10) df['MOM10'] = self.safe_fillna(mom, 0) atr = ta.ATR( df_temp['high'].values, df_temp['low'].values, df_temp['close'].values, timeperiod=14 ) df['ATR14'] = self.safe_fillna(atr, 0) # 成交量加权平均价 vwap = (df['volume'] * (df['high'] + df['low'] + df['close']) / 3).cumsum() / df['volume'].cumsum() df['VWAP'] = self.safe_fillna(vwap, 0) # 相对强弱指数差值 df['RSI_diff'] = df['RSI14'] - df['RSI14'].rolling(5).mean().fillna(0) # 价格波动比率 df['price_vol_ratio'] = df['price_change'] / (df['volatility'].replace(0, 1e-8) + 1e-8) # 技术指标组合特征 df['MACD_RSI'] = df['MACD_hist'] * df['RSI14'] # 市场情绪指标 df['advance_decline'] = (df['close'] > df['open']).astype(int).rolling(5).sum().fillna(0) # 时间特征 df['day_of_week'] = df['date'].dt.dayofweek df['month'] = df['date'].dt.month # 处理无穷大和NaN df = df.replace([np.inf, -np.inf], np.nan) df = df.fillna(0) # 优化内存 return reduce_mem_usage(df) except Exception as e: logger.error(f"特征工程失败: {str(e)}", exc_info=True) # 返回基本特征作为回退方案 df['price_change'] = df['close'].pct_change().fillna(0) df['volatility'] = df['close'].rolling(5).std().fillna(0) df['volume_change'] = df['volume'].pct_change().fillna(0) df['MA5'] = df['close'].rolling(5).mean().fillna(0) df['MA20'] = df['close'].rolling(20).mean().fillna(0) # 填充缺失的技术指标 for col in self.config.PREDICT_FEATURES: if col not in df.columns: df[col] = 0 return df # ========== 聚类模型 ========== class StockCluster: def __init__(self, config): self.config = config self.scaler = StandardScaler() self.kmeans = MiniBatchKMeans( n_clusters=config.CLUSTER_NUM, random_state=42, batch_size=1000 ) self.cluster_map = {} # 股票代码到聚类ID的映射 def fit(self, stock_data): """训练聚类模型""" logger.info("开始股票聚类分析...") cluster_features = [] # 提取每只股票的特征 for stock_code, df in tqdm(stock_data.items(), desc="提取聚类特征"): if len(df) < 100: # 至少100个交易日 continue features = {} for feat in self.config.CLUSTER_FEATURES: if feat in df.columns: # 使用统计特征 features[f"{feat}_mean"] = df[feat].mean() features[f"{feat}_std"] = df[feat].std() else: # 特征缺失时填充0 features[f"{feat}_mean"] = 0 features[f"{feat}_std"] = 0 cluster_features.append(features) if not cluster_features: logger.warning("没有可用的聚类特征,使用默认聚类") # 创建默认聚类映射 self.cluster_map = {code: 0 for code in stock_data.keys()} return self # 创建特征DataFrame feature_df = pd.DataFrame(cluster_features) feature_df = reduce_mem_usage(feature_df) # 标准化特征 scaled_features = self.scaler.fit_transform(feature_df) # 聚类 self.kmeans.fit(scored_features) clusters = self.kmeans.predict(scaled_features) feature_df['cluster'] = clusters # 创建股票到聚类的映射 stock_codes = list(stock_data.keys())[:len(clusters)] # 确保长度匹配 for i, stock_code in enumerate(stock_codes): self.cluster_map[stock_code] = clusters[i] logger.info("聚类分布统计:") logger.info(feature_df['cluster'].value_counts().to_string()) logger.info(f"股票聚类完成,共分为 {self.config.CLUSTER_NUM} 个类别") return self def transform(self, df, stock_code): """为数据添加聚类特征""" cluster_id = self.cluster_map.get(stock_code, -1) # 默认为-1表示未知聚类 df['cluster'] = cluster_id return df # ========== 目标创建 ========== class TargetCreator: def __init__(self, config): self.config = config def create_targets(self, df): """创建目标变量""" # 计算次日涨幅 df['next_day_gain'] = df['close'].shift(-1) / df['close'] - 1 # 计算次日最低价与开盘价比例 df['next_day_low_ratio'] = df['low'].shift(-1) / df['open'].shift(-1) # 创建复合目标:涨幅>5% 且 最低价≥开盘价98% df['target'] = 0 mask = (df['next_day_gain'] > self.config.MIN_GAIN) & \ (df['next_day_low_ratio'] >= self.config.MIN_LOW_RATIO) df.loc[mask, 'target'] = 1 # 删除最后一行(没有次日数据) df = df.iloc[:-1] # 检查目标分布 target_counts = df['target'].value_counts() logger.info(f"目标分布: 0={target_counts.get(0, 0)}, 1={target_counts.get(1, 0)}") return df # ========== 模型训练 (内存优化版) ========== class StockModelTrainer: def __init__(self, config): self.config = config self.model_name = "stock_prediction_model" self.feature_importance = None def prepare_dataset(self, stock_data, cluster_model, feature_engineer): """准备训练数据集(内存优化版)""" logger.info("准备训练数据集...") X_list = [] y_list = [] stock_group_list = [] # 用于分组交叉验证 target_creator = TargetCreator(self.config) # 使用生成器减少内存占用 for stock_code, df in tqdm(stock_data.items(), desc="处理股票数据"): try: # 特征工程 df = feature_engineer.transform(df.copy()) # 添加聚类特征 df = cluster_model.transform(df, stock_code) # 创建目标 df = target_creator.create_targets(df) # 只保留所需特征和目标 features = self.config.PREDICT_FEATURES if 'target' not in df.columns: logger.warning(f"股票 {stock_code} 缺少目标列,跳过") continue X = df[features] y = df['target'] # 确保没有NaN值 if X.isnull().any().any(): logger.warning(f"股票 {stock_code} 特征包含NaN值,跳过") continue # 使用稀疏矩阵存储(减少内存) sparse_X = sparse.csr_matrix(X.values.astype(np.float32)) X_list.append(sparse_X) y_list.append(y.values) stock_group_list.extend([stock_code] * len(X)) # 为每个样本添加股票代码作为组标识 # 定期清理内存 if len(X_list) % 100 == 0: gc.collect() print_memory_usage() except Exception as e: logger.error(f"处理股票 {stock_code} 失败: {str(e)}", exc_info=True) if not X_list: logger.error("没有可用的训练数据") return None, None, None # 合并所有数据 X_full = sparse.vstack(X_list) y_full = np.concatenate(y_list) groups = np.array(stock_group_list) logger.info(f"数据集准备完成,样本数: {X_full.shape[0]}") logger.info(f"目标分布: 0={sum(y_full==0)}, 1={sum(y_full==1)}") return X_full, y_full, groups def feature_selection(self, X, y): """执行特征选择(内存优化版)""" logger.info("执行特征选择...") # 使用基模型评估特征重要性 base_model = lgb.LGBMClassifier( n_estimators=100, random_state=42, n_jobs=-1 ) # 分批训练(减少内存占用) batch_size = 100000 for i in range(0, X.shape[0], batch_size): end_idx = min(i + batch_size, X.shape[0]) X_batch = X[i:end_idx].toarray() if sparse.issparse(X) else X[i:end_idx] y_batch = y[i:end_idx] if i == 0: base_model.fit(X_batch, y_batch) else: base_model.fit(X_batch, y_batch, init_model=base_model) # 获取特征重要性 importance = pd.Series(base_model.feature_importances_, index=self.config.PREDICT_FEATURES) importance = importance.sort_values(ascending=False) logger.info("特征重要性:\n" + importance.to_string()) # 选择前K个重要特征 k = min(15, len(self.config.PREDICT_FEATURES)) selected_features = importance.head(k).index.tolist() logger.info(f"选择前 {k} 个特征: {selected_features}") # 更新配置中的特征列表 self.config.PREDICT_FEATURES = selected_features # 转换特征矩阵 if sparse.issparse(X): # 对于稀疏矩阵,我们需要重新索引 feature_indices = [self.config.PREDICT_FEATURES.index(f) for f in selected_features] X_selected = X[:, feature_indices] else: X_selected = X[selected_features] return X_selected, selected_features def train_model(self, X, y, groups): """训练并优化模型(内存优化版)""" if X is None or len(y) == 0: logger.error("训练数据为空,无法训练模型") return None logger.info("开始训练模型...") # 1. 处理类别不平衡 pos_count = sum(y == 1) neg_count = sum(y == 0) scale_pos_weight = neg_count / pos_count logger.info(f"类别不平衡处理: 正样本权重 = {scale_pos_weight:.2f}") # 2. 特征选择 X_selected, selected_features = self.feature_selection(X, y) # 3. 自定义评分函数 - 关注正类召回率 def positive_recall_score(y_true, y_pred): return recall_score(y_true, y_pred, pos_label=1) custom_scorer = make_scorer(positive_recall_score, greater_is_better=True) # 4. 使用分组时间序列交叉验证(减少折数) group_kfold = GroupKFold(n_splits=2) # 减少折数以节省内存 cv = list(group_kfold.split(X_selected, y, groups=groups)) # 5. 创建模型 model = lgb.LGBMClassifier( objective='binary', random_state=42, n_jobs=-1, scale_pos_weight=scale_pos_weight, verbose=-1 ) # 6. 参数搜索(减少迭代次数) search = RandomizedSearchCV( estimator=model, param_distributions=self.config.PARAM_GRID, n_iter=10, # 减少迭代次数以节省内存 scoring=custom_scorer, cv=cv, verbose=2, n_jobs=1, # 减少并行任务以节省内存 pre_dispatch='2*n_jobs', # 控制任务分发 random_state=42 ) logger.info("开始参数搜索...") # 分批处理数据(减少内存占用) if sparse.issparse(X_selected): X_dense = X_selected.toarray() # 转换为密集矩阵用于搜索 else: X_dense = X_selected search.fit(X_dense, y) # 7. 使用最佳参数训练最终模型 best_params = search.best_params_ logger.info(f"最佳参数: {best_params}") logger.info(f"最佳召回率: {search.best_score_}") final_model = lgb.LGBMClassifier( **best_params, objective='binary', random_state=42, n_jobs=-1, scale_pos_weight=scale_pos_weight ) # 使用早停策略训练最终模型 logger.info("训练最终模型...") final_model.fit( X_dense, y, eval_set=[(X_dense, y)], eval_metric='binary_logloss', callbacks=[ lgb.early_stopping(stopping_rounds=50, verbose=False), lgb.log_evaluation(period=100) ] ) # 保存特征重要性 self.feature_importance = pd.Series( final_model.feature_importances_, index=selected_features ).sort_values(ascending=False) # 8. 保存模型 model_path = f"{self.model_name}.pkl" joblib.dump((final_model, selected_features), model_path) logger.info(f"模型已保存到: {model_path}") return final_model def evaluate_model(self, model, X_test, y_test): """评估模型性能""" if model is None or len(X_test) == 0: logger.warning("无法评估模型,缺少数据或模型") return # 预测测试集 y_pred = model.predict(X_test) # 计算召回率 recall = recall_score(y_test, y_pred, pos_label=1) logger.info(f"测试集召回率: {recall:.4f}") # 计算满足条件的样本比例 condition_ratio = sum(y_test == 1) / len(y_test) logger.info(f"满足条件的样本比例: {condition_ratio:.4f}") # 详细分类报告 report = classification_report(y_test, y_pred) logger.info("分类报告:\n" + report) # 特征重要性 if self.feature_importance is not None: logger.info("特征重要性:\n" + self.feature_importance.to_string()) # ========== 主程序 ========== def main(): # 初始化配置 config = StockConfig() logger.info("===== 股票上涨预测程序 (修复版) =====") # 加载训练数据(添加抽样) logger.info(f"加载训练数据: {config.START_DATE} 至 {config.END_DATE}") train_data = load_stock_data( config.SH_PATH, config.SZ_PATH, config.START_DATE, config.END_DATE, sample_fraction=config.SAMPLE_FRACTION, debug_mode=config.DEBUG_MODE, max_stocks=config.MAX_STOCKS ) if not train_data: logger.error("错误: 没有加载到任何股票数据,请检查数据路径和格式") return # 特征工程 feature_engineer = FeatureEngineer(config) # 聚类分析 cluster_model = StockCluster(config) try: cluster_model.fit(train_data) except Exception as e: logger.error(f"聚类分析失败: {str(e)}", exc_info=True) # 创建默认聚类映射 cluster_model.cluster_map = {code: 0 for code in train_data.keys()} logger.info("使用默认聚类(所有股票归为同一类)") # 准备训练数据 trainer = StockModelTrainer(config) try: X_train, y_train, groups = trainer.prepare_dataset( train_data, cluster_model, feature_engineer ) except Exception as e: logger.error(f"准备训练数据失败: {str(e)}", exc_info=True) return if X_train is None or len(y_train) == 0: logger.error("错误: 没有可用的训练数据") return # 训练模型 model = trainer.train_model(X_train, y_train, groups) if model is None: logger.error("模型训练失败") return # 加载测试数据(添加抽样) logger.info(f"\n加载测试数据: {config.TEST_START} 至 {config.TEST_END}") test_data = load_stock_data( config.SH_PATH, config.SZ_PATH, config.TEST_START, config.TEST_END, sample_fraction=config.SAMPLE_FRACTION, debug_mode=config.DEBUG_MODE, max_stocks=config.MAX_STOCKS ) if test_data: # 准备测试数据 X_test, y_test, _ = trainer.prepare_dataset( test_data, cluster_model, feature_engineer ) if X_test is not None and len(y_test) > 0: # 评估模型 if sparse.issparse(X_test): X_test = X_test.toarray() trainer.evaluate_model(model, X_test, y_test) else: logger.warning("测试数据准备失败,无法评估模型") else: logger.warning("没有测试数据可用") logger.info("===== 程序执行完成 =====") if __name__ == "__main__": main()

帮我解释下下面代码的意思: # -*- coding: utf-8 -*- """ Created on Sun Jul 20 16:00:01 2025 @author: srx20 """ import os import gc import numpy as np import pandas as pd import joblib import talib as ta from tqdm import tqdm import random from sklearn.cluster import MiniBatchKMeans from sklearn.preprocessing import StandardScaler from sklearn.model_selection import RandomizedSearchCV, GroupKFold from sklearn.feature_selection import SelectKBest, f_classif from sklearn.metrics import make_scorer, recall_score, classification_report import lightgbm as lgb import logging import psutil import warnings from scipy import sparse warnings.filterwarnings('ignore') # 设置日志记录 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('stock_prediction_fixed.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # ========== 配置类 ========== class StockConfig: def __init__(self): # 数据路径 self.SH_PATH = r"D:\股票量化数据库\股票csv数据\上证" self.SZ_PATH = r"D:\股票量化数据库\股票csv数据\深证" # 时间范围 self.START_DATE = "2018-01-01" self.END_DATE = "2020-12-31" self.TEST_START = "2021-01-01" self.TEST_END = "2021-12-31" # 聚类设置 self.CLUSTER_NUM = 8 self.CLUSTER_FEATURES = [ 'price_change', 'volatility', 'volume_change', 'MA5', 'MA20', 'RSI14', 'MACD_hist' ] # 预测特征 (初始列表,实际使用时会动态更新) self.PREDICT_FEATURES = [ 'open', 'high', 'low', 'close', 'volume', 'price_change', 'volatility', 'volume_change', 'MA5', 'MA20', 'RSI14', 'MACD_hist', 'cluster', 'MOM10', 'ATR14', 'VWAP', 'RSI_diff', 'price_vol_ratio', 'MACD_RSI', 'advance_decline', 'day_of_week', 'month' ] # 模型参数优化范围(内存优化版) self.PARAM_GRID = { 'boosting_type': ['gbdt'], # 减少选项 'num_leaves': [31, 63], # 减少选项 'max_depth': [-1, 7], # 减少选项 'learning_rate': [0.01, 0.05], 'n_estimators': [300, 500], # 减少选项 'min_child_samples': [50], # 固定值 'min_split_gain': [0.0, 0.1], 'reg_alpha': [0, 0.1], 'reg_lambda': [0, 0.1], 'feature_fraction': [0.7, 0.9], 'bagging_fraction': [0.7, 0.9], 'bagging_freq': [1] } # 目标条件 self.MIN_GAIN = 0.05 self.MIN_LOW_RATIO = 0.98 # 调试模式 self.DEBUG_MODE = False self.MAX_STOCKS = 50 if self.DEBUG_MODE else None self.SAMPLE_FRACTION = 0.3 if not self.DEBUG_MODE else 1.0 # 采样比例 # ========== 内存管理工具 ========== def reduce_mem_usage(df): """优化DataFrame内存使用""" start_mem = df.memory_usage().sum() / 1024**2 for col in df.columns: col_type = df[col].dtype if col_type != object: c_min = df[col].min() c_max = df[col].max() if str(col_type)[:3] == 'int': if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max: df[col] = df[col].astype(np.int8) elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max: df[col] = df[col].astype(np.int16) elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max: df[col] = df[col].astype(np.int32) elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max: df[col] = df[col].astype(np.int64) else: if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max: df[col] = df[col].astype(np.float16) elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max: df[col] = df[col].astype(np.float32) else: df[col] = df[col].astype(np.float64) end_mem = df.memory_usage().sum() / 1024**2 logger.info(f'内存优化: 从 {start_mem:.2f} MB 减少到 {end_mem:.2f} MB ({100*(start_mem-end_mem)/start_mem:.1f}%)') return df def print_memory_usage(): """打印当前内存使用情况""" process = psutil.Process(os.getpid()) mem = process.memory_info().rss / (1024 ** 2) logger.info(f"当前内存使用: {mem:.2f} MB") # ========== 数据加载 (修复版) ========== def load_stock_data(sh_path, sz_path, start_date, end_date, sample_fraction=1.0, debug_mode=False, max_stocks=None): """加载股票数据,并过滤日期范围(修复随机抽样问题)""" stock_data = {} # 创建文件列表 all_files = [] for exchange, path in [('SH', sh_path), ('SZ', sz_path)]: if os.path.exists(path): csv_files = [f for f in os.listdir(path) if f.endswith('.csv')] for file in csv_files: all_files.append((exchange, path, file)) if not all_files: logger.warning("没有找到任何CSV文件") return stock_data # 随机抽样(修复一维问题) if sample_fraction < 1.0: sample_size = max(1, int(len(all_files) * sample_fraction)) # 使用random.sample代替np.random.choice all_files = random.sample(all_files, sample_size) logger.info(f"抽样 {len(all_files)} 只股票文件 (比例: {sample_fraction})") total_files = len(all_files) pbar = tqdm(total=total_files, desc='加载股票数据') loaded_count = 0 for exchange, path, file in all_files: if max_stocks is not None and loaded_count >= max_stocks: break if file.endswith('.csv'): stock_code = f"{exchange}_{file.split('.')[0]}" file_path = os.path.join(path, file) try: # 读取数据并验证列名 df = pd.read_csv(file_path) # 验证必要的列是否存在 required_cols = ['date', 'open', 'high', 'low', 'close', 'volume'] if not all(col in df.columns for col in required_cols): logger.warning(f"股票 {stock_code} 缺少必要列,跳过") pbar.update(1) continue # 转换日期并过滤 df['date'] = pd.to_datetime(df['date']) df = df[(df['date'] >= start_date) & (df['date'] <= end_date)] if len(df) < 100: # 至少100个交易日 logger.info(f"股票 {stock_code} 数据不足({len(df)}条),跳过") pbar.update(1) continue # 转换数据类型 for col in ['open', 'high', 'low', 'close']: df[col] = pd.to_numeric(df[col], errors='coerce').astype(np.float32) df['volume'] = pd.to_numeric(df['volume'], errors='coerce').astype(np.uint32) # 删除包含NaN的行 df = df.dropna(subset=required_cols) if len(df) > 0: stock_data[stock_code] = df loaded_count += 1 logger.debug(f"成功加载股票 {stock_code},数据条数: {len(df)}") else: logger.warning(f"股票 {stock_code} 过滤后无数据") except Exception as e: logger.error(f"加载股票 {stock_code} 失败: {str(e)}", exc_info=True) pbar.update(1) # 调试模式只处理少量股票 if debug_mode and loaded_count >= 10: logger.info("调试模式: 已加载10只股票,提前结束") break pbar.close() logger.info(f"成功加载 {len(stock_data)} 只股票数据") return stock_data # ========== 特征工程 (修复版) ========== class FeatureEngineer: def __init__(self, config): self.config = config def safe_fillna(self, series, default=0): """安全填充NaN值""" if isinstance(series, pd.Series): return series.fillna(default) elif isinstance(series, np.ndarray): return np.nan_to_num(series, nan=default) return series def transform(self, df): """添加技术指标特征(修复NumPy数组问题)""" try: # 创建临时副本用于TA-Lib计算 df_temp = df.copy() # 将价格列转换为float64以满足TA-Lib要求 for col in ['open', 'high', 'low', 'close']: df_temp[col] = df_temp[col].astype(np.float64) # 基础特征 df['price_change'] = df['close'].pct_change().fillna(0) df['volatility'] = df['close'].rolling(5).std().fillna(0) df['volume_change'] = df['volume'].pct_change().fillna(0) df['MA5'] = df['close'].rolling(5).mean().fillna(0) df['MA20'] = df['close'].rolling(20).mean().fillna(0) # 技术指标 - 修复NumPy数组问题 rsi = ta.RSI(df_temp['close'].values, timeperiod=14) df['RSI14'] = self.safe_fillna(rsi, 50) macd, macd_signal, macd_hist = ta.MACD( df_temp['close'].values, fastperiod=12, slowperiod=26, signalperiod=9 ) df['MACD_hist'] = self.safe_fillna(macd_hist, 0) # 新增特征 mom = ta.MOM(df_temp['close'].values, timeperiod=10) df['MOM10'] = self.safe_fillna(mom, 0) atr = ta.ATR( df_temp['high'].values, df_temp['low'].values, df_temp['close'].values, timeperiod=14 ) df['ATR14'] = self.safe_fillna(atr, 0) # 成交量加权平均价 vwap = (df['volume'] * (df['high'] + df['low'] + df['close']) / 3).cumsum() / df['volume'].cumsum() df['VWAP'] = self.safe_fillna(vwap, 0) # 相对强弱指数差值 df['RSI_diff'] = df['RSI14'] - df['RSI14'].rolling(5).mean().fillna(0) # 价格波动比率 df['price_vol_ratio'] = df['price_change'] / (df['volatility'].replace(0, 1e-8) + 1e-8) # 技术指标组合特征 df['MACD_RSI'] = df['MACD_hist'] * df['RSI14'] # 市场情绪指标 df['advance_decline'] = (df['close'] > df['open']).astype(int).rolling(5).sum().fillna(0) # 时间特征 df['day_of_week'] = df['date'].dt.dayofweek df['month'] = df['date'].dt.month # 处理无穷大和NaN df = df.replace([np.inf, -np.inf], np.nan) df = df.fillna(0) # 优化内存 return reduce_mem_usage(df) except Exception as e: logger.error(f"特征工程失败: {str(e)}", exc_info=True) # 返回基本特征作为回退方案 df['price_change'] = df['close'].pct_change().fillna(0) df['volatility'] = df['close'].rolling(5).std().fillna(0) df['volume_change'] = df['volume'].pct_change().fillna(0) df['MA5'] = df['close'].rolling(5).mean().fillna(0) df['MA20'] = df['close'].rolling(20).mean().fillna(0) # 填充缺失的技术指标 for col in self.config.PREDICT_FEATURES: if col not in df.columns: df[col] = 0 return df # ========== 聚类模型 (添加保存/加载功能) ========== class StockCluster: def __init__(self, config): self.config = config self.scaler = StandardScaler() self.kmeans = MiniBatchKMeans( n_clusters=config.CLUSTER_NUM, random_state=42, batch_size=1000 ) self.cluster_map = {} # 股票代码到聚类ID的映射 self.model_file = "stock_cluster_model.pkl" # 模型保存路径 def save(self): """保存聚类模型到文件""" # 创建包含所有必要组件的字典 model_data = { 'kmeans': self.kmeans, 'scaler': self.scaler, 'cluster_map': self.cluster_map, 'config_cluster_num': self.config.CLUSTER_NUM } # 使用joblib保存模型 joblib.dump(model_data, self.model_file) logger.info(f"聚类模型已保存到: {self.model_file}") def load(self): """从文件加载聚类模型""" if os.path.exists(self.model_file): model_data = joblib.load(self.model_file) self.kmeans = model_data['kmeans'] self.scaler = model_data['scaler'] self.cluster_map = model_data['cluster_map'] logger.info(f"从 {self.model_file} 加载聚类模型") return True else: logger.warning("聚类模型文件不存在,需要重新训练") return False def fit(self, stock_data): """训练聚类模型""" logger.info("开始股票聚类分析...") cluster_features = [] # 提取每只股票的特征 for stock_code, df in tqdm(stock_data.items(), desc="提取聚类特征"): if len(df) < 100: # 至少100个交易日 continue features = {} for feat in self.config.CLUSTER_FEATURES: if feat in df.columns: # 使用统计特征 features[f"{feat}_mean"] = df[feat].mean() features[f"{feat}_std"] = df[feat].std() else: # 特征缺失时填充0 features[f"{feat}_mean"] = 0 features[f"{feat}_std"] = 0 cluster_features.append(features) if not cluster_features: logger.warning("没有可用的聚类特征,使用默认聚类") # 创建默认聚类映射 self.cluster_map = {code: 0 for code in stock_data.keys()} return self # 创建特征DataFrame feature_df = pd.DataFrame(cluster_features) feature_df = reduce_mem_usage(feature_df) # 标准化特征 scaled_features = self.scaler.fit_transform(feature_df) # 聚类 self.kmeans.fit(scaled_features) clusters = self.kmeans.predict(scaled_features) feature_df['cluster'] = clusters # 创建股票到聚类的映射 stock_codes = list(stock_data.keys())[:len(clusters)] # 确保长度匹配 for i, stock_code in enumerate(stock_codes): self.cluster_map[stock_code] = clusters[i] logger.info("聚类分布统计:") logger.info(feature_df['cluster'].value_counts().to_string()) logger.info(f"股票聚类完成,共分为 {self.config.CLUSTER_NUM} 个类别") # 训练完成后自动保存模型 self.save() return self def transform(self, df, stock_code): """为数据添加聚类特征""" cluster_id = self.cluster_map.get(stock_code, -1) # 默认为-1表示未知聚类 df['cluster'] = cluster_id return df # ========== 目标创建 ========== class TargetCreator: def __init__(self, config): self.config = config def create_targets(self, df): """创建目标变量""" # 计算次日涨幅 df['next_day_gain'] = df['close'].shift(-1) / df['close'] - 1 # 计算次日最低价与开盘价比例 df['next_day_low_ratio'] = df['low'].shift(-1) / df['open'].shift(-1) # 创建复合目标:涨幅>5% 且 最低价≥开盘价98% df['target'] = 0 mask = (df['next_day_gain'] > self.config.MIN_GAIN) & \ (df['next_day_low_ratio'] >= self.config.MIN_LOW_RATIO) df.loc[mask, 'target'] = 1 # 删除最后一行(没有次日数据) df = df.iloc[:-1] # 检查目标分布 target_counts = df['target'].value_counts() logger.info(f"目标分布: 0={target_counts.get(0, 0)}, 1={target_counts.get(1, 0)}") return df # ========== 模型训练 (内存优化版) ========== class StockModelTrainer: def __init__(self, config): self.config = config self.model_name = "stock_prediction_model" self.feature_importance = None def prepare_dataset(self, stock_data, cluster_model, feature_engineer): """准备训练数据集(内存优化版)""" logger.info("准备训练数据集...") X_list = [] y_list = [] stock_group_list = [] # 用于分组交叉验证 target_creator = TargetCreator(self.config) # 使用生成器减少内存占用 for stock_code, df in tqdm(stock_data.items(), desc="处理股票数据"): try: # 特征工程 df = feature_engineer.transform(df.copy()) # 添加聚类特征 df = cluster_model.transform(df, stock_code) # 创建目标 df = target_creator.create_targets(df) # 只保留所需特征和目标 features = self.config.PREDICT_FEATURES if 'target' not in df.columns: logger.warning(f"股票 {stock_code} 缺少目标列,跳过") continue X = df[features] y = df['target'] # 确保没有NaN值 if X.isnull().any().any(): logger.warning(f"股票 {stock_code} 特征包含NaN值,跳过") continue # 使用稀疏矩阵存储(减少内存) sparse_X = sparse.csr_matrix(X.values.astype(np.float32)) X_list.append(sparse_X) y_list.append(y.values) stock_group_list.extend([stock_code] * len(X)) # 为每个样本添加股票代码作为组标识 # 定期清理内存 if len(X_list) % 100 == 0: gc.collect() print_memory_usage() except Exception as e: logger.error(f"处理股票 {stock_code} 失败: {str(e)}", exc_info=True) if not X_list: logger.error("没有可用的训练数据") return None, None, None # 合并所有数据 X_full = sparse.vstack(X_list) y_full = np.concatenate(y_list) groups = np.array(stock_group_list) logger.info(f"数据集准备完成,样本数: {X_full.shape[0]}") logger.info(f"目标分布: 0={sum(y_full==0)}, 1={sum(y_full==1)}") return X_full, y_full, groups def feature_selection(self, X, y): """执行特征选择(内存优化版)""" logger.info("执行特征选择...") # 使用基模型评估特征重要性 base_model = lgb.LGBMClassifier( n_estimators=100, random_state=42, n_jobs=-1 ) # 分批训练(减少内存占用) batch_size = 100000 for i in range(0, X.shape[0], batch_size): end_idx = min(i + batch_size, X.shape[0]) X_batch = X[i:end_idx].toarray() if sparse.issparse(X) else X[i:end_idx] y_batch = y[i:end_idx] if i == 0: base_model.fit(X_batch, y_batch) else: base_model.fit(X_batch, y_batch, init_model=base_model) # 获取特征重要性 importance = pd.Series(base_model.feature_importances_, index=self.config.PREDICT_FEATURES) importance = importance.sort_values(ascending=False) logger.info("特征重要性:\n" + importance.to_string()) # 选择前K个重要特征 k = min(15, len(self.config.PREDICT_FEATURES)) selected_features = importance.head(k).index.tolist() logger.info(f"选择前 {k} 个特征: {selected_features}") # 更新配置中的特征列表 self.config.PREDICT_FEATURES = selected_features # 转换特征矩阵 if sparse.issparse(X): # 对于稀疏矩阵,我们需要重新索引 feature_indices = [self.config.PREDICT_FEATURES.index(f) for f in selected_features] X_selected = X[:, feature_indices] else: X_selected = X[selected_features] return X_selected, selected_features def train_model(self, X, y, groups): """训练并优化模型(内存优化版)""" if X is None or len(y) == 0: logger.error("训练数据为空,无法训练模型") return None logger.info("开始训练模型...") # 1. 处理类别不平衡 pos_count = sum(y == 1) neg_count = sum(y == 0) scale_pos_weight = neg_count / pos_count logger.info(f"类别不平衡处理: 正样本权重 = {scale_pos_weight:.2f}") # 2. 特征选择 X_selected, selected_features = self.feature_selection(X, y) # 3. 自定义评分函数 - 关注正类召回率 def positive_recall_score(y_true, y_pred): return recall_score(y_true, y_pred, pos_label=1) custom_scorer = make_scorer(positive_recall_score, greater_is_better=True) # 4. 使用分组时间序列交叉验证(减少折数) group_kfold = GroupKFold(n_splits=2) # 减少折数以节省内存 cv = list(group_kfold.split(X_selected, y, groups=groups)) # 5. 创建模型 model = lgb.LGBMClassifier( objective='binary', random_state=42, n_jobs=-1, scale_pos_weight=scale_pos_weight, verbose=-1 ) # 6. 参数搜索(减少迭代次数) search = RandomizedSearchCV( estimator=model, param_distributions=self.config.PARAM_GRID, n_iter=10, # 减少迭代次数以节省内存 scoring=custom_scorer, cv=cv, verbose=2, n_jobs=1, # 减少并行任务以节省内存 pre_dispatch='2*n_jobs', # 控制任务分发 random_state=42 ) logger.info("开始参数搜索...") # 分批处理数据(减少内存占用) if sparse.issparse(X_selected): X_dense = X_selected.toarray() # 转换为密集矩阵用于搜索 else: X_dense = X_selected search.fit(X_dense, y) # 7. 使用最佳参数训练最终模型 best_params = search.best_params_ logger.info(f"最佳参数: {best_params}") logger.info(f"最佳召回率: {search.best_score_}") final_model = lgb.LGBMClassifier( **best_params, objective='binary', random_state=42, n_jobs=-1, scale_pos_weight=scale_pos_weight ) # 使用早停策略训练最终模型 logger.info("训练最终模型...") final_model.fit( X_dense, y, eval_set=[(X_dense, y)], eval_metric='binary_logloss', callbacks=[ lgb.early_stopping(stopping_rounds=50, verbose=False), lgb.log_evaluation(period=100) ] ) # 保存特征重要性 self.feature_importance = pd.Series( final_model.feature_importances_, index=selected_features ).sort_values(ascending=False) # 8. 保存模型 model_path = f"{self.model_name}.pkl" joblib.dump((final_model, selected_features), model_path) logger.info(f"模型已保存到: {model_path}") return final_model def evaluate_model(self, model, X_test, y_test): """评估模型性能""" if model is None or len(X_test) == 0: logger.warning("无法评估模型,缺少数据或模型") return # 预测测试集 y_pred = model.predict(X_test) # 计算召回率 recall = recall_score(y_test, y_pred, pos_label=1) logger.info(f"测试集召回率: {recall:.4f}") # 计算满足条件的样本比例 condition_ratio = sum(y_test == 1) / len(y_test) logger.info(f"满足条件的样本比例: {condition_ratio:.4f}") # 详细分类报告 report = classification_report(y_test, y_pred) logger.info("分类报告:\n" + report) # 特征重要性 if self.feature_importance is not None: logger.info("特征重要性:\n" + self.feature_importance.to_string()) # ========== 主程序 ========== def main(): # 初始化配置 config = StockConfig() logger.info("===== 股票上涨预测程序 (修复版) =====") # 加载训练数据(添加抽样) logger.info(f"加载训练数据: {config.START_DATE} 至 {config.END_DATE}") train_data = load_stock_data( config.SH_PATH, config.SZ_PATH, config.START_DATE, config.END_DATE, sample_fraction=config.SAMPLE_FRACTION, debug_mode=config.DEBUG_MODE, max_stocks=config.MAX_STOCKS ) if not train_data: logger.error("错误: 没有加载到任何股票数据,请检查数据路径和格式") return # 特征工程 feature_engineer = FeatureEngineer(config) # 聚类分析 - 尝试加载现有模型,否则训练新模型 cluster_model = StockCluster(config) if not cluster_model.load(): # 尝试加载模型 try: cluster_model.fit(train_data) except Exception as e: logger.error(f"聚类分析失败: {str(e)}", exc_info=True) # 创建默认聚类映射 cluster_model.cluster_map = {code: 0 for code in train_data.keys()} logger.info("使用默认聚类(所有股票归为同一类)") cluster_model.save() # 保存默认聚类模型 # 准备训练数据 trainer = StockModelTrainer(config) try: X_train, y_train, groups = trainer.prepare_dataset( train_data, cluster_model, feature_engineer ) except Exception as e: logger.error(f"准备训练数据失败: {str(e)}", exc_info=True) return if X_train is None or len(y_train) == 0: logger.error("错误: 没有可用的训练数据") return # 训练模型 model = trainer.train_model(X_train, y_train, groups) if model is None: logger.error("模型训练失败") return # 加载测试数据(添加抽样) logger.info(f"\n加载测试数据: {config.TEST_START} 至 {config.TEST_END}") test_data = load_stock_data( config.SH_PATH, config.SZ_PATH, config.TEST_START, config.TEST_END, sample_fraction=config.SAMPLE_FRACTION, debug_mode=config.DEBUG_MODE, max_stocks=config.MAX_STOCKS ) if test_data: # 准备测试数据 X_test, y_test, _ = trainer.prepare_dataset( test_data, cluster_model, feature_engineer ) if X_test is not None and len(y_test) > 0: # 评估模型 if sparse.issparse(X_test): X_test = X_test.toarray() trainer.evaluate_model(model, X_test, y_test) else: logger.warning("测试数据准备失败,无法评估模型") else: logger.warning("没有测试数据可用") logger.info("===== 程序执行完成 =====") if __name__ == "__main__": main()

那请帮我修改下这个代码: 就把涨幅百分之五改成收盘价高于开盘价百分之五即可 # -*- coding: utf-8 -*- """ Created on Sun Jul 20 16:00:01 2025 @author: srx20 """ import os import gc import numpy as np import pandas as pd import joblib import talib as ta from tqdm import tqdm import random from sklearn.cluster import MiniBatchKMeans from sklearn.preprocessing import StandardScaler from sklearn.model_selection import RandomizedSearchCV, GroupKFold from sklearn.feature_selection import SelectKBest, f_classif from sklearn.metrics import make_scorer, recall_score, classification_report import lightgbm as lgb import logging import psutil import warnings from scipy import sparse warnings.filterwarnings('ignore') # 设置日志记录 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('stock_prediction_fixed.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # ========== 配置类 ========== class StockConfig: def __init__(self): # 数据路径 self.SH_PATH = r"D:\股票量化数据库\股票csv数据\上证" self.SZ_PATH = r"D:\股票量化数据库\股票csv数据\深证" # 时间范围 self.START_DATE = "2018-01-01" self.END_DATE = "2020-12-31" self.TEST_START = "2021-01-01" self.TEST_END = "2021-12-31" # 聚类设置 self.CLUSTER_NUM = 8 self.CLUSTER_FEATURES = [ 'price_change', 'volatility', 'volume_change', 'MA5', 'MA20', 'RSI14', 'MACD_hist' ] # 预测特征 (初始列表,实际使用时会动态更新) self.PREDICT_FEATURES = [ 'open', 'high', 'low', 'close', 'volume', 'price_change', 'volatility', 'volume_change', 'MA5', 'MA20', 'RSI14', 'MACD_hist', 'cluster', 'MOM10', 'ATR14', 'VWAP', 'RSI_diff', 'price_vol_ratio', 'MACD_RSI', 'advance_decline', 'day_of_week', 'month' ] # 模型参数优化范围(内存优化版) self.PARAM_GRID = { 'boosting_type': ['gbdt'], # 减少选项 'num_leaves': [31, 63], # 减少选项 'max_depth': [-1, 7], # 减少选项 'learning_rate': [0.01, 0.05], 'n_estimators': [300, 500], # 减少选项 'min_child_samples': [50], # 固定值 'min_split_gain': [0.0, 0.1], 'reg_alpha': [0, 0.1], 'reg_lambda': [0, 0.1], 'feature_fraction': [0.7, 0.9], 'bagging_fraction': [0.7, 0.9], 'bagging_freq': [1] } # 目标条件 self.MIN_GAIN = 0.05 self.MIN_LOW_RATIO = 0.98 # 调试模式 self.DEBUG_MODE = False self.MAX_STOCKS = 50 if self.DEBUG_MODE else None self.SAMPLE_FRACTION = 0.3 if not self.DEBUG_MODE else 1.0 # 采样比例 # ========== 内存管理工具 ========== def reduce_mem_usage(df): """优化DataFrame内存使用""" start_mem = df.memory_usage().sum() / 1024**2 for col in df.columns: col_type = df[col].dtype if col_type != object: c_min = df[col].min() c_max = df[col].max() if str(col_type)[:3] == 'int': if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max: df[col] = df[col].astype(np.int8) elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max: df[col] = df[col].astype(np.int16) elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max: df[col] = df[col].astype(np.int32) elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max: df[col] = df[col].astype(np.int64) else: if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max: df[col] = df[col].astype(np.float16) elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max: df[col] = df[col].astype(np.float32) else: df[col] = df[col].astype(np.float64) end_mem = df.memory_usage().sum() / 1024**2 logger.info(f'内存优化: 从 {start_mem:.2f} MB 减少到 {end_mem:.2f} MB ({100*(start_mem-end_mem)/start_mem:.1f}%)') return df def print_memory_usage(): """打印当前内存使用情况""" process = psutil.Process(os.getpid()) mem = process.memory_info().rss / (1024 ** 2) logger.info(f"当前内存使用: {mem:.2f} MB") # ========== 数据加载 (修复版) ========== def load_stock_data(sh_path, sz_path, start_date, end_date, sample_fraction=1.0, debug_mode=False, max_stocks=None): """加载股票数据,并过滤日期范围(修复随机抽样问题)""" stock_data = {} # 创建文件列表 all_files = [] for exchange, path in [('SH', sh_path), ('SZ', sz_path)]: if os.path.exists(path): csv_files = [f for f in os.listdir(path) if f.endswith('.csv')] for file in csv_files: all_files.append((exchange, path, file)) if not all_files: logger.warning("没有找到任何CSV文件") return stock_data # 随机抽样(修复一维问题) if sample_fraction < 1.0: sample_size = max(1, int(len(all_files) * sample_fraction)) # 使用random.sample代替np.random.choice all_files = random.sample(all_files, sample_size) logger.info(f"抽样 {len(all_files)} 只股票文件 (比例: {sample_fraction})") total_files = len(all_files) pbar = tqdm(total=total_files, desc='加载股票数据') loaded_count = 0 for exchange, path, file in all_files: if max_stocks is not None and loaded_count >= max_stocks: break if file.endswith('.csv'): stock_code = f"{exchange}_{file.split('.')[0]}" file_path = os.path.join(path, file) try: # 读取数据并验证列名 df = pd.read_csv(file_path) # 验证必要的列是否存在 required_cols = ['date', 'open', 'high', 'low', 'close', 'volume'] if not all(col in df.columns for col in required_cols): logger.warning(f"股票 {stock_code} 缺少必要列,跳过") pbar.update(1) continue # 转换日期并过滤 df['date'] = pd.to_datetime(df['date']) df = df[(df['date'] >= start_date) & (df['date'] <= end_date)] if len(df) < 100: # 至少100个交易日 logger.info(f"股票 {stock_code} 数据不足({len(df)}条),跳过") pbar.update(1) continue # 转换数据类型 for col in ['open', 'high', 'low', 'close']: df[col] = pd.to_numeric(df[col], errors='coerce').astype(np.float32) df['volume'] = pd.to_numeric(df['volume'], errors='coerce').astype(np.uint32) # 删除包含NaN的行 df = df.dropna(subset=required_cols) if len(df) > 0: stock_data[stock_code] = df loaded_count += 1 logger.debug(f"成功加载股票 {stock_code},数据条数: {len(df)}") else: logger.warning(f"股票 {stock_code} 过滤后无数据") except Exception as e: logger.error(f"加载股票 {stock_code} 失败: {str(e)}", exc_info=True) pbar.update(1) # 调试模式只处理少量股票 if debug_mode and loaded_count >= 10: logger.info("调试模式: 已加载10只股票,提前结束") break pbar.close() logger.info(f"成功加载 {len(stock_data)} 只股票数据") return stock_data # ========== 特征工程 (修复版) ========== class FeatureEngineer: def __init__(self, config): self.config = config def safe_fillna(self, series, default=0): """安全填充NaN值""" if isinstance(series, pd.Series): return series.fillna(default) elif isinstance(series, np.ndarray): return np.nan_to_num(series, nan=default) return series def transform(self, df): """添加技术指标特征(修复NumPy数组问题)""" try: # 创建临时副本用于TA-Lib计算 df_temp = df.copy() # 将价格列转换为float64以满足TA-Lib要求 for col in ['open', 'high', 'low', 'close']: df_temp[col] = df_temp[col].astype(np.float64) # 基础特征 df['price_change'] = df['close'].pct_change().fillna(0) df['volatility'] = df['close'].rolling(5).std().fillna(0) df['volume_change'] = df['volume'].pct_change().fillna(0) df['MA5'] = df['close'].rolling(5).mean().fillna(0) df['MA20'] = df['close'].rolling(20).mean().fillna(0) # 技术指标 - 修复NumPy数组问题 rsi = ta.RSI(df_temp['close'].values, timeperiod=14) df['RSI14'] = self.safe_fillna(rsi, 50) macd, macd_signal, macd_hist = ta.MACD( df_temp['close'].values, fastperiod=12, slowperiod=26, signalperiod=9 ) df['MACD_hist'] = self.safe_fillna(macd_hist, 0) # 新增特征 mom = ta.MOM(df_temp['close'].values, timeperiod=10) df['MOM10'] = self.safe_fillna(mom, 0) atr = ta.ATR( df_temp['high'].values, df_temp['low'].values, df_temp['close'].values, timeperiod=14 ) df['ATR14'] = self.safe_fillna(atr, 0) # 成交量加权平均价 vwap = (df['volume'] * (df['high'] + df['low'] + df['close']) / 3).cumsum() / df['volume'].cumsum() df['VWAP'] = self.safe_fillna(vwap, 0) # 相对强弱指数差值 df['RSI_diff'] = df['RSI14'] - df['RSI14'].rolling(5).mean().fillna(0) # 价格波动比率 df['price_vol_ratio'] = df['price_change'] / (df['volatility'].replace(0, 1e-8) + 1e-8) # 技术指标组合特征 df['MACD_RSI'] = df['MACD_hist'] * df['RSI14'] # 市场情绪指标 df['advance_decline'] = (df['close'] > df['open']).astype(int).rolling(5).sum().fillna(0) # 时间特征 df['day_of_week'] = df['date'].dt.dayofweek df['month'] = df['date'].dt.month # 处理无穷大和NaN df = df.replace([np.inf, -np.inf], np.nan) df = df.fillna(0) # 优化内存 return reduce_mem_usage(df) except Exception as e: logger.error(f"特征工程失败: {str(e)}", exc_info=True) # 返回基本特征作为回退方案 df['price_change'] = df['close'].pct_change().fillna(0) df['volatility'] = df['close'].rolling(5).std().fillna(0) df['volume_change'] = df['volume'].pct_change().fillna(0) df['MA5'] = df['close'].rolling(5).mean().fillna(0) df['MA20'] = df['close'].rolling(20).mean().fillna(0) # 填充缺失的技术指标 for col in self.config.PREDICT_FEATURES: if col not in df.columns: df[col] = 0 return df # ========== 聚类模型 (添加保存/加载功能) ========== class StockCluster: def __init__(self, config): self.config = config self.scaler = StandardScaler() self.kmeans = MiniBatchKMeans( n_clusters=config.CLUSTER_NUM, random_state=42, batch_size=1000 ) self.cluster_map = {} # 股票代码到聚类ID的映射 self.model_file = "stock_cluster_model.pkl" # 模型保存路径 def save(self): """保存聚类模型到文件""" # 创建包含所有必要组件的字典 model_data = { 'kmeans': self.kmeans, 'scaler': self.scaler, 'cluster_map': self.cluster_map, 'config_cluster_num': self.config.CLUSTER_NUM } # 使用joblib保存模型 joblib.dump(model_data, self.model_file) logger.info(f"聚类模型已保存到: {self.model_file}") def load(self): """从文件加载聚类模型""" if os.path.exists(self.model_file): model_data = joblib.load(self.model_file) self.kmeans = model_data['kmeans'] self.scaler = model_data['scaler'] self.cluster_map = model_data['cluster_map'] logger.info(f"从 {self.model_file} 加载聚类模型") return True else: logger.warning("聚类模型文件不存在,需要重新训练") return False def fit(self, stock_data): """训练聚类模型""" logger.info("开始股票聚类分析...") cluster_features = [] # 提取每只股票的特征 for stock_code, df in tqdm(stock_data.items(), desc="提取聚类特征"): if len(df) < 100: # 至少100个交易日 continue features = {} for feat in self.config.CLUSTER_FEATURES: if feat in df.columns: # 使用统计特征 features[f"{feat}_mean"] = df[feat].mean() features[f"{feat}_std"] = df[feat].std() else: # 特征缺失时填充0 features[f"{feat}_mean"] = 0 features[f"{feat}_std"] = 0 cluster_features.append(features) if not cluster_features: logger.warning("没有可用的聚类特征,使用默认聚类") # 创建默认聚类映射 self.cluster_map = {code: 0 for code in stock_data.keys()} return self # 创建特征DataFrame feature_df = pd.DataFrame(cluster_features) feature_df = reduce_mem_usage(feature_df) # 标准化特征 scaled_features = self.scaler.fit_transform(feature_df) # 聚类 self.kmeans.fit(scaled_features) clusters = self.kmeans.predict(scaled_features) feature_df['cluster'] = clusters # 创建股票到聚类的映射 stock_codes = list(stock_data.keys())[:len(clusters)] # 确保长度匹配 for i, stock_code in enumerate(stock_codes): self.cluster_map[stock_code] = clusters[i] logger.info("聚类分布统计:") logger.info(feature_df['cluster'].value_counts().to_string()) logger.info(f"股票聚类完成,共分为 {self.config.CLUSTER_NUM} 个类别") # 训练完成后自动保存模型 self.save() return self def transform(self, df, stock_code): """为数据添加聚类特征""" cluster_id = self.cluster_map.get(stock_code, -1) # 默认为-1表示未知聚类 df['cluster'] = cluster_id return df # ========== 目标创建 ========== class TargetCreator: def __init__(self, config): self.config = config def create_targets(self, df): """创建目标变量""" # 计算次日涨幅 df['next_day_gain'] = df['close'].shift(-1) / df['close'] - 1 # 计算次日最低价与开盘价比例 df['next_day_low_ratio'] = df['low'].shift(-1) / df['open'].shift(-1) # 创建复合目标:涨幅>5% 且 最低价≥开盘价98% df['target'] = 0 mask = (df['next_day_gain'] > self.config.MIN_GAIN) & \ (df['next_day_low_ratio'] >= self.config.MIN_LOW_RATIO) df.loc[mask, 'target'] = 1 # 删除最后一行(没有次日数据) df = df.iloc[:-1] # 检查目标分布 target_counts = df['target'].value_counts() logger.info(f"目标分布: 0={target_counts.get(0, 0)}, 1={target_counts.get(1, 0)}") return df # ========== 模型训练 (内存优化版) ========== class StockModelTrainer: def __init__(self, config): self.config = config self.model_name = "stock_prediction_model" self.feature_importance = None def prepare_dataset(self, stock_data, cluster_model, feature_engineer): """准备训练数据集(内存优化版)""" logger.info("准备训练数据集...") X_list = [] y_list = [] stock_group_list = [] # 用于分组交叉验证 target_creator = TargetCreator(self.config) # 使用生成器减少内存占用 for stock_code, df in tqdm(stock_data.items(), desc="处理股票数据"): try: # 特征工程 df = feature_engineer.transform(df.copy()) # 添加聚类特征 df = cluster_model.transform(df, stock_code) # 创建目标 df = target_creator.create_targets(df) # 只保留所需特征和目标 features = self.config.PREDICT_FEATURES if 'target' not in df.columns: logger.warning(f"股票 {stock_code} 缺少目标列,跳过") continue X = df[features] y = df['target'] # 确保没有NaN值 if X.isnull().any().any(): logger.warning(f"股票 {stock_code} 特征包含NaN值,跳过") continue # 使用稀疏矩阵存储(减少内存) sparse_X = sparse.csr_matrix(X.values.astype(np.float32)) X_list.append(sparse_X) y_list.append(y.values) stock_group_list.extend([stock_code] * len(X)) # 为每个样本添加股票代码作为组标识 # 定期清理内存 if len(X_list) % 100 == 0: gc.collect() print_memory_usage() except Exception as e: logger.error(f"处理股票 {stock_code} 失败: {str(e)}", exc_info=True) if not X_list: logger.error("没有可用的训练数据") return None, None, None # 合并所有数据 X_full = sparse.vstack(X_list) y_full = np.concatenate(y_list) groups = np.array(stock_group_list) logger.info(f"数据集准备完成,样本数: {X_full.shape[0]}") logger.info(f"目标分布: 0={sum(y_full==0)}, 1={sum(y_full==1)}") return X_full, y_full, groups def feature_selection(self, X, y): """执行特征选择(内存优化版)""" logger.info("执行特征选择...") # 使用基模型评估特征重要性 base_model = lgb.LGBMClassifier( n_estimators=100, random_state=42, n_jobs=-1 ) # 分批训练(减少内存占用) batch_size = 100000 for i in range(0, X.shape[0], batch_size): end_idx = min(i + batch_size, X.shape[0]) X_batch = X[i:end_idx].toarray() if sparse.issparse(X) else X[i:end_idx] y_batch = y[i:end_idx] if i == 0: base_model.fit(X_batch, y_batch) else: base_model.fit(X_batch, y_batch, init_model=base_model) # 获取特征重要性 importance = pd.Series(base_model.feature_importances_, index=self.config.PREDICT_FEATURES) importance = importance.sort_values(ascending=False) logger.info("特征重要性:\n" + importance.to_string()) # 选择前K个重要特征 k = min(15, len(self.config.PREDICT_FEATURES)) selected_features = importance.head(k).index.tolist() logger.info(f"选择前 {k} 个特征: {selected_features}") # 更新配置中的特征列表 self.config.PREDICT_FEATURES = selected_features # 转换特征矩阵 if sparse.issparse(X): # 对于稀疏矩阵,我们需要重新索引 feature_indices = [self.config.PREDICT_FEATURES.index(f) for f in selected_features] X_selected = X[:, feature_indices] else: X_selected = X[selected_features] return X_selected, selected_features def train_model(self, X, y, groups): """训练并优化模型(内存优化版)""" if X is None or len(y) == 0: logger.error("训练数据为空,无法训练模型") return None logger.info("开始训练模型...") # 1. 处理类别不平衡 pos_count = sum(y == 1) neg_count = sum(y == 0) scale_pos_weight = neg_count / pos_count logger.info(f"类别不平衡处理: 正样本权重 = {scale_pos_weight:.2f}") # 2. 特征选择 X_selected, selected_features = self.feature_selection(X, y) # 3. 自定义评分函数 - 关注正类召回率 def positive_recall_score(y_true, y_pred): return recall_score(y_true, y_pred, pos_label=1) custom_scorer = make_scorer(positive_recall_score, greater_is_better=True) # 4. 使用分组时间序列交叉验证(减少折数) group_kfold = GroupKFold(n_splits=2) # 减少折数以节省内存 cv = list(group_kfold.split(X_selected, y, groups=groups)) # 5. 创建模型 model = lgb.LGBMClassifier( objective='binary', random_state=42, n_jobs=-1, scale_pos_weight=scale_pos_weight, verbose=-1 ) # 6. 参数搜索(减少迭代次数) search = RandomizedSearchCV( estimator=model, param_distributions=self.config.PARAM_GRID, n_iter=10, # 减少迭代次数以节省内存 scoring=custom_scorer, cv=cv, verbose=2, n_jobs=1, # 减少并行任务以节省内存 pre_dispatch='2*n_jobs', # 控制任务分发 random_state=42 ) logger.info("开始参数搜索...") # 分批处理数据(减少内存占用) if sparse.issparse(X_selected): X_dense = X_selected.toarray() # 转换为密集矩阵用于搜索 else: X_dense = X_selected search.fit(X_dense, y) # 7. 使用最佳参数训练最终模型 best_params = search.best_params_ logger.info(f"最佳参数: {best_params}") logger.info(f"最佳召回率: {search.best_score_}") final_model = lgb.LGBMClassifier( **best_params, objective='binary', random_state=42, n_jobs=-1, scale_pos_weight=scale_pos_weight ) # 使用早停策略训练最终模型 logger.info("训练最终模型...") final_model.fit( X_dense, y, eval_set=[(X_dense, y)], eval_metric='binary_logloss', callbacks=[ lgb.early_stopping(stopping_rounds=50, verbose=False), lgb.log_evaluation(period=100) ] ) # 保存特征重要性 self.feature_importance = pd.Series( final_model.feature_importances_, index=selected_features ).sort_values(ascending=False) # 8. 保存模型 model_path = f"{self.model_name}.pkl" joblib.dump((final_model, selected_features), model_path) logger.info(f"模型已保存到: {model_path}") return final_model def evaluate_model(self, model, X_test, y_test): """评估模型性能""" if model is None or len(X_test) == 0: logger.warning("无法评估模型,缺少数据或模型") return # 预测测试集 y_pred = model.predict(X_test) # 计算召回率 recall = recall_score(y_test, y_pred, pos_label=1) logger.info(f"测试集召回率: {recall:.4f}") # 计算满足条件的样本比例 condition_ratio = sum(y_test == 1) / len(y_test) logger.info(f"满足条件的样本比例: {condition_ratio:.4f}") # 详细分类报告 report = classification_report(y_test, y_pred) logger.info("分类报告:\n" + report) # 特征重要性 if self.feature_importance is not None: logger.info("特征重要性:\n" + self.feature_importance.to_string()) # ========== 主程序 ========== def main(): # 初始化配置 config = StockConfig() logger.info("===== 股票上涨预测程序 (修复版) =====") # 加载训练数据(添加抽样) logger.info(f"加载训练数据: {config.START_DATE} 至 {config.END_DATE}") train_data = load_stock_data( config.SH_PATH, config.SZ_PATH, config.START_DATE, config.END_DATE, sample_fraction=config.SAMPLE_FRACTION, debug_mode=config.DEBUG_MODE, max_stocks=config.MAX_STOCKS ) if not train_data: logger.error("错误: 没有加载到任何股票数据,请检查数据路径和格式") return # 特征工程 feature_engineer = FeatureEngineer(config) # 聚类分析 - 尝试加载现有模型,否则训练新模型 cluster_model = StockCluster(config) if not cluster_model.load(): # 尝试加载模型 try: cluster_model.fit(train_data) except Exception as e: logger.error(f"聚类分析失败: {str(e)}", exc_info=True) # 创建默认聚类映射 cluster_model.cluster_map = {code: 0 for code in train_data.keys()} logger.info("使用默认聚类(所有股票归为同一类)") cluster_model.save() # 保存默认聚类模型 # 准备训练数据 trainer = StockModelTrainer(config) try: X_train, y_train, groups = trainer.prepare_dataset( train_data, cluster_model, feature_engineer ) except Exception as e: logger.error(f"准备训练数据失败: {str(e)}", exc_info=True) return if X_train is None or len(y_train) == 0: logger.error("错误: 没有可用的训练数据") return # 训练模型 model = trainer.train_model(X_train, y_train, groups) if model is None: logger.error("模型训练失败") return # 加载测试数据(添加抽样) logger.info(f"\n加载测试数据: {config.TEST_START} 至 {config.TEST_END}") test_data = load_stock_data( config.SH_PATH, config.SZ_PATH, config.TEST_START, config.TEST_END, sample_fraction=config.SAMPLE_FRACTION, debug_mode=config.DEBUG_MODE, max_stocks=config.MAX_STOCKS ) if test_data: # 准备测试数据 X_test, y_test, _ = trainer.prepare_dataset( test_data, cluster_model, feature_engineer ) if X_test is not None and len(y_test) > 0: # 评估模型 if sparse.issparse(X_test): X_test = X_test.toarray() trainer.evaluate_model(model, X_test, y_test) else: logger.warning("测试数据准备失败,无法评估模型") else: logger.warning("没有测试数据可用") logger.info("===== 程序执行完成 =====") if __name__ == "__main__": main()

# -*- coding: utf-8 -*- """ Created on Sun Jul 20 16:20:23 2025 @author: srx20 """ # -*- coding: utf-8 -*- """ Created on Sun Jul 20 16:00:01 2025 @author: srx20 """ import os import gc import numpy as np import pandas as pd import joblib import talib as ta from tqdm import tqdm import random from sklearn.cluster import MiniBatchKMeans from sklearn.preprocessing import StandardScaler from sklearn.model_selection import RandomizedSearchCV, GroupKFold from sklearn.feature_selection import SelectKBest, f_classif from sklearn.metrics import make_scorer, recall_score, classification_report import lightgbm as lgb import logging import psutil import warnings from scipy import sparse warnings.filterwarnings('ignore') # 设置日志记录 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('stock_prediction_fixed.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # ========== 配置类 ========== class StockConfig: def __init__(self): # 数据路径 self.SH_PATH = r"D:\股票量化数据库\股票csv数据\上证" self.SZ_PATH = r"D:\股票量化数据库\股票csv数据\深证" # 时间范围 self.START_DATE = "2018-01-01" self.END_DATE = "2025-7-18" self.TEST_START = "2021-01-01" self.TEST_END = "2024-12-31" # 聚类设置 self.CLUSTER_NUM = 8 self.CLUSTER_FEATURES = [ 'price_change', 'volatility', 'volume_change', 'MA5', 'MA20', 'RSI14', 'MACD_hist' ] # 预测特征 (初始列表,实际使用时会动态更新) self.PREDICT_FEATURES = [ 'open', 'high', 'low', 'close', 'volume', 'price_change', 'volatility', 'volume_change', 'MA5', 'MA20', 'RSI14', 'MACD_hist', 'cluster', 'MOM10', 'ATR14', 'VWAP', 'RSI_diff', 'price_vol_ratio', 'MACD_RSI', 'advance_decline', 'day_of_week', 'month' ] # 模型参数优化范围(内存优化版) self.PARAM_GRID = { 'boosting_type': ['gbdt'], # 减少选项 'num_leaves': [31, 63], # 减少选项 'max_depth': [-1, 7], # 减少选项 'learning_rate': [0.01, 0.05], 'n_estimators': [300, 500], # 减少选项 'min_child_samples': [50], # 固定值 'min_split_gain': [0.0, 0.1], 'reg_alpha': [0, 0.1], 'reg_lambda': [0, 0.1], 'feature_fraction': [0.7, 0.9], 'bagging_fraction': [0.7, 0.9], 'bagging_freq': [1] } # 目标条件 self.MIN_GAIN = 0.05 self.MIN_LOW_RATIO = 0.98 # 调试模式 self.DEBUG_MODE = False self.MAX_STOCKS = 50 if self.DEBUG_MODE else None self.SAMPLE_FRACTION = 0.3 if not self.DEBUG_MODE else 1.0 # 采样比例 # ========== 内存管理工具 ========== def reduce_mem_usage(df): """优化DataFrame内存使用""" start_mem = df.memory_usage().sum() / 1024**2 for col in df.columns: col_type = df[col].dtype if col_type != object: c_min = df[col].min() c_max = df[col].max() if str(col_type)[:3] == 'int': if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max: df[col] = df[col].astype(np.int8) elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max: df[col] = df[col].astype(np.int16) elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max: df[col] = df[col].astype(np.int32) elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max: df[col] = df[col].astype(np.int64) else: if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max: df[col] = df[col].astype(np.float16) elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max: df[col] = df[col].astype(np.float32) else: df[col] = df[col].astype(np.float64) end_mem = df.memory_usage().sum() / 1024**2 logger.info(f'内存优化: 从 {start_mem:.2f} MB 减少到 {end_mem:.2f} MB ({100*(start_mem-end_mem)/start_mem:.1f}%)') return df def print_memory_usage(): """打印当前内存使用情况""" process = psutil.Process(os.getpid()) mem = process.memory_info().rss / (1024 ** 2) logger.info(f"当前内存使用: {mem:.2f} MB") # ========== 数据加载 (修复版) ========== def load_stock_data(sh_path, sz_path, start_date, end_date, sample_fraction=1.0, debug_mode=False, max_stocks=None): """加载股票数据,并过滤日期范围(修复随机抽样问题)""" stock_data = {} # 创建文件列表 all_files = [] for exchange, path in [('SH', sh_path), ('SZ', sz_path)]: if os.path.exists(path): csv_files = [f for f in os.listdir(path) if f.endswith('.csv')] for file in csv_files: all_files.append((exchange, path, file)) if not all_files: logger.warning("没有找到任何CSV文件") return stock_data # 随机抽样(修复一维问题) if sample_fraction < 1.0: sample_size = max(1, int(len(all_files) * sample_fraction)) # 使用random.sample代替np.random.choice all_files = random.sample(all_files, sample_size) logger.info(f"抽样 {len(all_files)} 只股票文件 (比例: {sample_fraction})") total_files = len(all_files) pbar = tqdm(total=total_files, desc='加载股票数据') loaded_count = 0 for exchange, path, file in all_files: if max_stocks is not None and loaded_count >= max_stocks: break if file.endswith('.csv'): stock_code = f"{exchange}_{file.split('.')[0]}" file_path = os.path.join(path, file) try: # 读取数据并验证列名 df = pd.read_csv(file_path) # 验证必要的列是否存在 required_cols = ['date', 'open', 'high', 'low', 'close', 'volume'] if not all(col in df.columns for col in required_cols): logger.warning(f"股票 {stock_code} 缺少必要列,跳过") pbar.update(1) continue # 转换日期并过滤 df['date'] = pd.to_datetime(df['date']) df = df[(df['date'] >= start_date) & (df['date'] <= end_date)] if len(df) < 100: # 至少100个交易日 logger.info(f"股票 {stock_code} 数据不足({len(df)}条),跳过") pbar.update(1) continue # 转换数据类型 for col in ['open', 'high', 'low', 'close']: df[col] = pd.to_numeric(df[col], errors='coerce').astype(np.float32) df['volume'] = pd.to_numeric(df['volume'], errors='coerce').astype(np.uint32) # 删除包含NaN的行 df = df.dropna(subset=required_cols) if len(df) > 0: stock_data[stock_code] = df loaded_count += 1 logger.debug(f"成功加载股票 {stock_code},数据条数: {len(df)}") else: logger.warning(f"股票 {stock_code} 过滤后无数据") except Exception as e: logger.error(f"加载股票 {stock_code} 失败: {str(e)}", exc_info=True) pbar.update(1) # 调试模式只处理少量股票 if debug_mode and loaded_count >= 10: logger.info("调试模式: 已加载10只股票,提前结束") break pbar.close() logger.info(f"成功加载 {len(stock_data)} 只股票数据") return stock_data # ========== 特征工程 (修复版) ========== class FeatureEngineer: def __init__(self, config): self.config = config def safe_fillna(self, series, default=0): """安全填充NaN值""" if isinstance(series, pd.Series): return series.fillna(default) elif isinstance(series, np.ndarray): return np.nan_to_num(series, nan=default) return series def transform(self, df): """添加技术指标特征(修复NumPy数组问题)""" try: # 创建临时副本用于TA-Lib计算 df_temp = df.copy() # 将价格列转换为float64以满足TA-Lib要求 for col in ['open', 'high', 'low', 'close']: df_temp[col] = df_temp[col].astype(np.float64) # 基础特征 df['price_change'] = df['close'].pct_change().fillna(0) df['volatility'] = df['close'].rolling(5).std().fillna(0) df['volume_change'] = df['volume'].pct_change().fillna(0) df['MA5'] = df['close'].rolling(5).mean().fillna(0) df['MA20'] = df['close'].rolling(20).mean().fillna(0) # 技术指标 - 修复NumPy数组问题 rsi = ta.RSI(df_temp['close'].values, timeperiod=14) df['RSI14'] = self.safe_fillna(rsi, 50) macd, macd_signal, macd_hist = ta.MACD( df_temp['close'].values, fastperiod=12, slowperiod=26, signalperiod=9 ) df['MACD_hist'] = self.safe_fillna(macd_hist, 0) # 新增特征 mom = ta.MOM(df_temp['close'].values, timeperiod=10) df['MOM10'] = self.safe_fillna(mom, 0) atr = ta.ATR( df_temp['high'].values, df_temp['low'].values, df_temp['close'].values, timeperiod=14 ) df['ATR14'] = self.safe_fillna(atr, 0) # 成交量加权平均价 vwap = (df['volume'] * (df['high'] + df['low'] + df['close']) / 3).cumsum() / df['volume'].cumsum() df['VWAP'] = self.safe_fillna(vwap, 0) # 相对强弱指数差值 df['RSI_diff'] = df['RSI14'] - df['RSI14'].rolling(5).mean().fillna(0) # 价格波动比率 df['price_vol_ratio'] = df['price_change'] / (df['volatility'].replace(0, 1e-8) + 1e-8) # 技术指标组合特征 df['MACD_RSI'] = df['MACD_hist'] * df['RSI14'] # 市场情绪指标 df['advance_decline'] = (df['close'] > df['open']).astype(int).rolling(5).sum().fillna(0) # 时间特征 df['day_of_week'] = df['date'].dt.dayofweek df['month'] = df['date'].dt.month # 处理无穷大和NaN df = df.replace([np.inf, -np.inf], np.nan) df = df.fillna(0) # 优化内存 return reduce_mem_usage(df) except Exception as e: logger.error(f"特征工程失败: {str(e)}", exc_info=True) # 返回基本特征作为回退方案 df['price_change'] = df['close'].pct_change().fillna(0) df['volatility'] = df['close'].rolling(5).std().fillna(0) df['volume_change'] = df['volume'].pct_change().fillna(0) df['MA5'] = df['close'].rolling(5).mean().fillna(0) df['MA20'] = df['close'].rolling(20).mean().fillna(0) # 填充缺失的技术指标 for col in self.config.PREDICT_FEATURES: if col not in df.columns: df[col] = 0 return df # ========== 聚类模型 (添加保存/加载功能) ========== class StockCluster: def __init__(self, config): self.config = config self.scaler = StandardScaler() self.kmeans = MiniBatchKMeans( n_clusters=config.CLUSTER_NUM, random_state=42, batch_size=1000 ) self.cluster_map = {} # 股票代码到聚类ID的映射 self.model_file = "stock_cluster_model.pkl" # 模型保存路径 def save(self): """保存聚类模型到文件""" # 创建包含所有必要组件的字典 model_data = { 'kmeans': self.kmeans, 'scaler': self.scaler, 'cluster_map': self.cluster_map, 'config_cluster_num': self.config.CLUSTER_NUM } # 使用joblib保存模型 joblib.dump(model_data, self.model_file) logger.info(f"聚类模型已保存到: {self.model_file}") def load(self): """从文件加载聚类模型""" if os.path.exists(self.model_file): model_data = joblib.load(self.model_file) self.kmeans = model_data['kmeans'] self.scaler = model_data['scaler'] self.cluster_map = model_data['cluster_map'] logger.info(f"从 {self.model_file} 加载聚类模型") return True else: logger.warning("聚类模型文件不存在,需要重新训练") return False def fit(self, stock_data): """训练聚类模型""" logger.info("开始股票聚类分析...") cluster_features = [] # 提取每只股票的特征 for stock_code, df in tqdm(stock_data.items(), desc="提取聚类特征"): if len(df) < 100: # 至少100个交易日 continue features = {} for feat in self.config.CLUSTER_FEATURES: if feat in df.columns: # 使用统计特征 features[f"{feat}_mean"] = df[feat].mean() features[f"{feat}_std"] = df[feat].std() else: # 特征缺失时填充0 features[f"{feat}_mean"] = 0 features[f"{feat}_std"] = 0 cluster_features.append(features) if not cluster_features: logger.warning("没有可用的聚类特征,使用默认聚类") # 创建默认聚类映射 self.cluster_map = {code: 0 for code in stock_data.keys()} return self # 创建特征DataFrame feature_df = pd.DataFrame(cluster_features) feature_df = reduce_mem_usage(feature_df) # 标准化特征 scaled_features = self.scaler.fit_transform(feature_df) # 聚类 self.kmeans.fit(scaled_features) clusters = self.kmeans.predict(scaled_features) feature_df['cluster'] = clusters # 创建股票到聚类的映射 stock_codes = list(stock_data.keys())[:len(clusters)] # 确保长度匹配 for i, stock_code in enumerate(stock_codes): self.cluster_map[stock_code] = clusters[i] logger.info("聚类分布统计:") logger.info(feature_df['cluster'].value_counts().to_string()) logger.info(f"股票聚类完成,共分为 {self.config.CLUSTER_NUM} 个类别") # 训练完成后自动保存模型 self.save() return self def transform(self, df, stock_code): """为数据添加聚类特征""" cluster_id = self.cluster_map.get(stock_code, -1) # 默认为-1表示未知聚类 df['cluster'] = cluster_id return df # ========== 目标创建 ========== class TargetCreator: def __init__(self, config): self.config = config def create_targets(self, df): """创建目标变量 - 修改为收盘价高于开盘价5%""" # 计算次日收盘价相对于开盘价的涨幅 df['next_day_open_to_close_gain'] = df['close'].shift(-1) / df['open'].shift(-1) - 1 # 计算次日最低价与开盘价比例 df['next_day_low_ratio'] = df['low'].shift(-1) / df['open'].shift(-1) # 创建复合目标:收盘价比开盘价高5% 且 最低价≥开盘价98% df['target'] = 0 mask = (df['next_day_open_to_close_gain'] > self.config.MIN_GAIN) & \ (df['next_day_low_ratio'] >= self.config.MIN_LOW_RATIO) df.loc[mask, 'target'] = 1 # 删除最后一行(没有次日数据) df = df.iloc[:-1] # 检查目标分布 target_counts = df['target'].value_counts() logger.info(f"目标分布: 0={target_counts.get(0, 0)}, 1={target_counts.get(1, 0)}") # 添加调试信息 if self.config.DEBUG_MODE: sample_targets = df[['open', 'close', 'next_day_open_to_close_gain', 'target']].tail(5) logger.debug(f"目标创建示例:\n{sample_targets}") return df # ========== 模型训练 (内存优化版) ========== class StockModelTrainer: def __init__(self, config): self.config = config self.model_name = "stock_prediction_model" self.feature_importance = None def prepare_dataset(self, stock_data, cluster_model, feature_engineer): """准备训练数据集(内存优化版)""" logger.info("准备训练数据集...") X_list = [] y_list = [] stock_group_list = [] # 用于分组交叉验证 target_creator = TargetCreator(self.config) # 使用生成器减少内存占用 for stock_code, df in tqdm(stock_data.items(), desc="处理股票数据"): try: # 特征工程 df = feature_engineer.transform(df.copy()) # 添加聚类特征 df = cluster_model.transform(df, stock_code) # 创建目标 df = target_creator.create_targets(df) # 只保留所需特征和目标 features = self.config.PREDICT_FEATURES if 'target' not in df.columns: logger.warning(f"股票 {stock_code} 缺少目标列,跳过") continue X = df[features] y = df['target'] # 确保没有NaN值 if X.isnull().any().any(): logger.warning(f"股票 {stock_code} 特征包含NaN值,跳过") continue # 使用稀疏矩阵存储(减少内存) sparse_X = sparse.csr_matrix(X.values.astype(np.float32)) X_list.append(sparse_X) y_list.append(y.values) stock_group_list.extend([stock_code] * len(X)) # 为每个样本添加股票代码作为组标识 # 定期清理内存 if len(X_list) % 100 == 0: gc.collect() print_memory_usage() except Exception as e: logger.error(f"处理股票 {stock_code} 失败: {str(e)}", exc_info=True) if not X_list: logger.error("没有可用的训练数据") return None, None, None # 合并所有数据 X_full = sparse.vstack(X_list) y_full = np.concatenate(y_list) groups = np.array(stock_group_list) logger.info(f"数据集准备完成,样本数: {X_full.shape[0]}") logger.info(f"目标分布: 0={sum(y_full==0)}, 1={sum(y_full==1)}") return X_full, y_full, groups def feature_selection(self, X, y): """执行特征选择(内存优化版)""" logger.info("执行特征选择...") # 使用基模型评估特征重要性 base_model = lgb.LGBMClassifier( n_estimators=100, random_state=42, n_jobs=-1 ) # 分批训练(减少内存占用) batch_size = 100000 for i in range(0, X.shape[0], batch_size): end_idx = min(i + batch_size, X.shape[0]) X_batch = X[i:end_idx].toarray() if sparse.issparse(X) else X[i:end_idx] y_batch = y[i:end_idx] if i == 0: base_model.fit(X_batch, y_batch) else: base_model.fit(X_batch, y_batch, init_model=base_model) # 获取特征重要性 importance = pd.Series(base_model.feature_importances_, index=self.config.PREDICT_FEATURES) importance = importance.sort_values(ascending=False) logger.info("特征重要性:\n" + importance.to_string()) # 选择前K个重要特征 k = min(15, len(self.config.PREDICT_FEATURES)) selected_features = importance.head(k).index.tolist() logger.info(f"选择前 {k} 个特征: {selected_features}") # 更新配置中的特征列表 self.config.PREDICT_FEATURES = selected_features # 转换特征矩阵 if sparse.issparse(X): # 对于稀疏矩阵,我们需要重新索引 feature_indices = [self.config.PREDICT_FEATURES.index(f) for f in selected_features] X_selected = X[:, feature_indices] else: X_selected = X[selected_features] return X_selected, selected_features def train_model(self, X, y, groups): """训练并优化模型(内存优化版)""" if X is None or len(y) == 0: logger.error("训练数据为空,无法训练模型") return None logger.info("开始训练模型...") # 1. 处理类别不平衡 pos_count = sum(y == 1) neg_count = sum(y == 0) scale_pos_weight = neg_count / pos_count logger.info(f"类别不平衡处理: 正样本权重 = {scale_pos_weight:.2f}") # 2. 特征选择 X_selected, selected_features = self.feature_selection(X, y) # 3. 自定义评分函数 - 关注正类召回率 def positive_recall_score(y_true, y_pred): return recall_score(y_true, y_pred, pos_label=1) custom_scorer = make_scorer(positive_recall_score, greater_is_better=True) # 4. 使用分组时间序列交叉验证(减少折数) group_kfold = GroupKFold(n_splits=2) # 减少折数以节省内存 cv = list(group_kfold.split(X_selected, y, groups=groups)) # 5. 创建模型 model = lgb.LGBMClassifier( objective='binary', random_state=42, n_jobs=-1, scale_pos_weight=scale_pos_weight, verbose=-1 ) # 6. 参数搜索(减少迭代次数) search = RandomizedSearchCV( estimator=model, param_distributions=self.config.PARAM_GRID, n_iter=10, # 减少迭代次数以节省内存 scoring=custom_scorer, cv=cv, verbose=2, n_jobs=1, # 减少并行任务以节省内存 pre_dispatch='2*n_jobs', # 控制任务分发 random_state=42 ) logger.info("开始参数搜索...") # 分批处理数据(减少内存占用) if sparse.issparse(X_selected): X_dense = X_selected.toarray() # 转换为密集矩阵用于搜索 else: X_dense = X_selected search.fit(X_dense, y) # 7. 使用最佳参数训练最终模型 best_params = search.best_params_ logger.info(f"最佳参数: {best_params}") logger.info(f"最佳召回率: {search.best_score_}") final_model = lgb.LGBMClassifier( **best_params, objective='binary', random_state=42, n_jobs=-1, scale_pos_weight=scale_pos_weight ) # 使用早停策略训练最终模型 logger.info("训练最终模型...") final_model.fit( X_dense, y, eval_set=[(X_dense, y)], eval_metric='binary_logloss', callbacks=[ lgb.early_stopping(stopping_rounds=50, verbose=False), lgb.log_evaluation(period=100) ] ) # 保存特征重要性 self.feature_importance = pd.Series( final_model.feature_importances_, index=selected_features ).sort_values(ascending=False) # 8. 保存模型 model_path = f"{self.model_name}.pkl" joblib.dump((final_model, selected_features), model_path) logger.info(f"模型已保存到: {model_path}") return final_model def evaluate_model(self, model, X_test, y_test): """评估模型性能""" if model is None or len(X_test) == 0: logger.warning("无法评估模型,缺少数据或模型") return # 预测测试集 y_pred = model.predict(X_test) # 计算召回率 recall = recall_score(y_test, y_pred, pos_label=1) logger.info(f"测试集召回率: {recall:.4f}") # 计算满足条件的样本比例 condition_ratio = sum(y_test == 1) / len(y_test) logger.info(f"满足条件的样本比例: {condition_ratio:.4f}") # 详细分类报告 report = classification_report(y_test, y_pred) logger.info("分类报告:\n" + report) # 特征重要性 if self.feature_importance is not None: logger.info("特征重要性:\n" + self.feature_importance.to_string()) # ========== 主程序 ========== def main(): # 初始化配置 config = StockConfig() logger.info("===== 股票上涨预测程序 (修复版) =====") # 加载训练数据(添加抽样) logger.info(f"加载训练数据: {config.START_DATE} 至 {config.END_DATE}") train_data = load_stock_data( config.SH_PATH, config.SZ_PATH, config.START_DATE, config.END_DATE, sample_fraction=config.SAMPLE_FRACTION, debug_mode=config.DEBUG_MODE, max_stocks=config.MAX_STOCKS ) if not train_data: logger.error("错误: 没有加载到任何股票数据,请检查数据路径和格式") return # 特征工程 feature_engineer = FeatureEngineer(config) # 聚类分析 - 尝试加载现有模型,否则训练新模型 cluster_model = StockCluster(config) if not cluster_model.load(): # 尝试加载模型 try: cluster_model.fit(train_data) except Exception as e: logger.error(f"聚类分析失败: {str(e)}", exc_info=True) # 创建默认聚类映射 cluster_model.cluster_map = {code: 0 for code in train_data.keys()} logger.info("使用默认聚类(所有股票归为同一类)") cluster_model.save() # 保存默认聚类模型 # 准备训练数据 trainer = StockModelTrainer(config) try: X_train, y_train, groups = trainer.prepare_dataset( train_data, cluster_model, feature_engineer ) except Exception as e: logger.error(f"准备训练数据失败: {str(e)}", exc_info=True) return if X_train is None or len(y_train) == 0: logger.error("错误: 没有可用的训练数据") return # 训练模型 model = trainer.train_model(X_train, y_train, groups) if model is None: logger.error("模型训练失败") return # 加载测试数据(添加抽样) logger.info(f"\n加载测试数据: {config.TEST_START} 至 {config.TEST_END}") test_data = load_stock_data( config.SH_PATH, config.SZ_PATH, config.TEST_START, config.TEST_END, sample_fraction=config.SAMPLE_FRACTION, debug_mode=config.DEBUG_MODE, max_stocks=config.MAX_STOCKS ) if test_data: # 准备测试数据 X_test, y_test, _ = trainer.prepare_dataset( test_data, cluster_model, feature_engineer ) if X_test is not None and len(y_test) > 0: # 评估模型 if sparse.issparse(X_test): X_test = X_test.toarray() trainer.evaluate_model(model, X_test, y_test) else: logger.warning("测试数据准备失败,无法评估模型") else: logger.warning("没有测试数据可用") logger.info("===== 程序执行完成 =====") if __name__ == "__main__": main() 这串代码有以下报错: Traceback (most recent call last): File "d:\股票量化数据库\股票量化数据库\大涨预测模型训练程序3.0.py", line 303, in transform return reduce_mem_usage(df) ^^^^^^^^^^^^^^^^^^^^ File "d:\股票量化数据库\股票量化数据库\大涨预测模型训练程序3.0.py", line 121, in reduce_mem_usage if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ TypeError: '>' not supported between instances of 'Timestamp' and 'numpy.float16' 2025-07-20 17:55:12,250 - INFO - 目标分布: 0=265, 1=0 2025-07-20 17:55:12,266 - ERROR - 特征工程失败: '>' not supported between instances of 'Timestamp' and 'numpy.float16'

from sklearn.neighbors import KDTree from os.path import join, exists, dirname, abspath import numpy as np import pandas as pd import os, sys, glob, pickle BASE_DIR = dirname(abspath(__file__)) ROOT_DIR = dirname(BASE_DIR) sys.path.append(BASE_DIR) sys.path.append(ROOT_DIR) from helper_ply import write_ply from helper_tool import DataProcessing as DP dataset_path = '/data/S3DIS/Stanford3dDataset_v1.2_Aligned_Version' anno_paths = [line.rstrip() for line in open(join(BASE_DIR, 'meta/anno_paths.txt'))] anno_paths = [join(dataset_path, p) for p in anno_paths] gt_class = [x.rstrip() for x in open(join(BASE_DIR, 'meta/class_names.txt'))] gt_class2label = {cls: i for i, cls in enumerate(gt_class)} sub_grid_size = 0.04 original_pc_folder = join(dirname(dataset_path), 'original_ply') sub_pc_folder = join(dirname(dataset_path), 'input_{:.3f}'.format(sub_grid_size)) os.mkdir(original_pc_folder) if not exists(original_pc_folder) else None os.mkdir(sub_pc_folder) if not exists(sub_pc_folder) else None out_format = '.ply' def convert_pc2ply(anno_path, save_path): """ Convert original dataset files to ply file (each line is XYZRGBL). We aggregated all the points from each instance in the room. :param anno_path: path to annotations. e.g. Area_1/office_2/Annotations/ :param save_path: path to save original point clouds (each line is XYZRGBL) :return: None """ data_list = [] for f in glob.glob(join(anno_path, '*.txt')): class_name = os.path.basename(f).split('_')[0] if class_name not in gt_class: # note: in some room there is 'staris' class.. class_name = 'clutter' pc = pd.read_csv(f, header=None, delim_whitespace=True).values labels = np.ones((pc.shape[0], 1)) * gt_class2label[class_name] data_list.append(np.concatenate([pc, labels], 1)) # Nx7 pc_label = np.concatenate(data_list, 0) xyz_min = np.amin(pc_label, axis=0)[0:3] pc_label[:, 0:3] -= xyz_min 这段代码是用来做什么

这个代码报错 File "D:\pycharm project\jiyi\5133.py", line 31, in chinese_seg ltp = LTP() File "D:\python 3.10.10\lib\site-packages\ltp\interface.py", line 117, in LTP raise FileNotFoundError(f"{CONFIG_NAME} not found in {model_id}") FileNotFoundError: config.json not found in LTP/small # -*- coding: utf-8 -*- import numpy as np import torch from ltp import LTP from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.model_selection import train_test_split from torch.utils.data import Dataset, DataLoader # ==================== # 1. 数据预处理 # ==================== # 假设数据已加载为dataframe格式 import pandas as pd import pandas as pd data = pd.read_csv("D:\pycharm project\jiyi\douban2.csv",encoding='iso-8859-1') # 包含text和label两列 # 分词函数(只能用LTP) def chinese_seg(text, tool="ltp"): if tool == "ltp": ltp = LTP() seg, _ = ltp.seg([text]) return ' '.join(seg[0]) # 全量数据分词处理 data['seg_text'] = data['text'].apply(lambda x: chinese_seg(x, tool="ltp")) # ==================== # 2. TF-IDF向量化 # ==================== vectorizer = TfidfVectorizer(max_features=3000) # 控制特征维度[^3] tfidf_matrix = vectorizer.fit_transform(data['seg_text']) # 转换为PyTorch张量 X = torch.from_numpy(tfidf_matrix.toarray()).float() y = torch.from_numpy(data['label'].values).long() # 划分训练集/测试集 X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2) # ==================== # 3. 构建数据集管道 # ==================== class CommentDataset(Dataset): def __init__(self, features, labels): self.features = features self.labels = labels def __len__(self): return len(self.labels) def __getitem__(self, idx): # RNN需要序列输入,将特征向量reshape为(seq_len, input_size) return self.features[idx].view(1, -1), self.labels[idx] # seq_len=1 train_loader = DataLoader(CommentDataset(X_train, y_train), batch_size=32, shuffle=True) test_loader = DataLoader(CommentDataset(X_test, y_test), batch_size=32) # ==================== # 4. 定义RNN模型 # ==================== class RNNClassifier(torch.nn.Module): def __init__(self, input_size, hidden_size, num_classes): super().__init__() self.rnn = torch.nn.RNN(input_size, hidden_size, batch_first=True) self.fc = torch.nn.Linear(hidden_size, num_classes) def forward(self, x): # x形状: (batch_size, seq_len=1, input_size) out, _ = self.rnn(x) # 输出形状: (batch_size, seq_len, hidden_size) return self.fc(out[:, -1, :]) # 初始化模型 model = RNNClassifier( input_size=3000, # 对应TF-IDF特征维度 hidden_size=128, # 根据引用[2]建议设置 num_classes=2 ) # ==================== # 5. 训练与评估 # ==================== device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') model.to(device) criterion = torch.nn.CrossEntropyLoss() optimizer = torch.optim.Adam(model.parameters(), lr=0.001) # 训练循环 for epoch in range(10): model.train() for inputs, labels in train_loader: inputs, labels = inputs.to(device), labels.to(device) outputs = model(inputs) loss = criterion(outputs, labels) optimizer.zero_grad() loss.backward() optimizer.step() # 评估函数(含准确率和F1值[^4]) from sklearn.metrics import accuracy_score, f1_score def evaluate(model, loader): model.eval() all_preds, all_labels = [], [] with torch.no_grad(): for inputs, labels in loader: inputs = inputs.to(device) outputs = model(inputs) preds = torch.argmax(outputs, dim=1).cpu().numpy() all_preds.extend(preds) all_labels.extend(labels.numpy()) return { "accuracy": accuracy_score(all_labels, all_preds), "f1": f1_score(all_labels, all_preds, average='macro') } print("测试集性能:", evaluate(model, test_loader))

大家在看

recommend-type

公开公开公开公开-openprotocol_specification 2.7

LY-WCS-2012-01-06-01 V 1.0 公开公开公开公开 产品名称:产品名称:产品名称:产品名称: WCS 系统简介系统简介系统简介系统简介-公开版公开版公开版公开版 共共共共 13 页页页页 WCSWCSWCSWCS 系统简介系统简介系统简介系统简介 ((((客户交流用客户交流用客户交流用客户交流用)))) 文文文文 档档档档 作作作作 者:者:者:者: 王 超 日期:日期:日期:日期:2012/01/06 开发开发开发开发/测试经理:测试经理:测试经理:测试经理: 程 达 日期:日期:日期:日期:2012/01/06 项项项项 目目目目 经经经经 理:理:理:理: 程 达 日期:日期:日期:日期:2012/01/06 文文文文 档档档档 编编编编 号:号:号:号: ___________ ___ LY-WCS-2012-01-06-01______________ 上海朗因智能科技有限公司上海朗因智能科技有限公司上海朗因智能科技有限公司上海朗因智能科技有限公司 版权所有版权所有版权所有版权所有 不得复制不得复制不得复制不得复制
recommend-type

中国联通OSS系统总体框架

中国联通OSS系统总体框架。中国联通OSS系统总体框架。
recommend-type

基于 ADS9110的隔离式数据采集 (DAQ) 系统方案(待编辑)-电路方案

描述 该“可实现最大 SNR 和采样率的 18 位 2Msps 隔离式数据采集参考设计”演示了如何应对隔离式数据采集系统设计中的典型性能限制挑战: 通过将数字隔离器引入的传播延迟降至最低,使采样率达到最大 通过有效地减轻数字隔离器引入的 ADC 采样时钟抖动,使高频交流信号链性能 (SNR) 达到最大 特性 18 位、2Msps、1 通道、差分输入、隔离式数据采集 (DAQ) 系统 利用 ADS9110 的 multiSPI:trade_mark: 数字接口实现 2MSPS 采样率,同时保持低 SPI 数据速率 源同步 SPI 数据传输模式,可将隔离器传播延迟降至最低并提高采样率 可降低隔离器引入的抖动的技术,能够将 SNR 提高 12dB(100kHz Fin,2MSPS) 经测试的设计包含理论和计算、组件选择、PCB 设计和测量结果 原理图 附件文档: 方案相关器件: ISO1541:低功耗、双向 I2C 隔离器 ISO7840:高性能 5.7kVRMS 增强型四通道数字隔离器 ISO7842:高性能 5.7kVRMS 增强型四通道数字隔离器
recommend-type

自动化图书管理系统 v7.0

自动化图书馆管理系统包含了目前图书馆管理业务的每个环节,能同时管理图书和期刊,能打印条码、书标,并制作借书证,最大藏书量在300万册以上。系统采用CNMARC标准及中图法第四版分类,具有Web检索与发布功能,条码扫描,支持一卡通,支持触摸屏。系统包括系统管理、读者管理、编目、流通、统计、查询等功能。能够在一个界面下实现图书、音像、期刊的管理,设置假期、设置暂离锁(提高安全性)、暂停某些读者的借阅权、导入导出读者、交换MARC数据、升级辅助编目库等。安装本系统前请先安装SQL 2000SQL 下载地址 http://pan.baidu.com/s/145vkr安装过程如有问题可咨询: TEL 13851381727  QQ 306404635
recommend-type

MOXA UPort1110drvUSB转串口驱动

MOXA UPort1110drvUSB转串口驱动,解决没有com口的烦恼

最新推荐

recommend-type

{团队建设}网络渠道部团队规划手册.pdf

{团队建设}网络渠道部团队规划手册.pdf
recommend-type

快速付里叶变换FFTFastFourietTransformer(1).pptx

快速付里叶变换FFTFastFourietTransformer(1).pptx
recommend-type

Wamp5: 一键配置ASP/PHP/HTML服务器工具

根据提供的文件信息,以下是关于标题、描述和文件列表中所涉及知识点的详细阐述。 ### 标题知识点 标题中提到的是"PHP集成版工具wamp5.rar",这里面包含了以下几个重要知识点: 1. **PHP**: PHP是一种广泛使用的开源服务器端脚本语言,主要用于网站开发。它可以嵌入到HTML中,从而让网页具有动态内容。PHP因其开源、跨平台、面向对象、安全性高等特点,成为最流行的网站开发语言之一。 2. **集成版工具**: 集成版工具通常指的是将多个功能组合在一起的软件包,目的是为了简化安装和配置流程。在PHP开发环境中,这样的集成工具通常包括了PHP解释器、Web服务器以及数据库管理系统等关键组件。 3. **Wamp5**: Wamp5是这类集成版工具的一种,它基于Windows操作系统。Wamp5的名称来源于它包含的主要组件的首字母缩写,即Windows、Apache、MySQL和PHP。这种工具允许开发者快速搭建本地Web开发环境,无需分别安装和配置各个组件。 4. **RAR压缩文件**: RAR是一种常见的文件压缩格式,它以较小的体积存储数据,便于传输和存储。RAR文件通常需要特定的解压缩软件进行解压缩操作。 ### 描述知识点 描述中提到了工具的一个重要功能:“可以自动配置asp/php/html等的服务器, 不用辛辛苦苦的为怎么配置服务器而烦恼”。这里面涵盖了以下知识点: 1. **自动配置**: 自动配置功能意味着该工具能够简化服务器的搭建过程,用户不需要手动进行繁琐的配置步骤,如修改配置文件、启动服务等。这是集成版工具的一项重要功能,极大地降低了初学者的技术门槛。 2. **ASP/PHP/HTML**: 这三种技术是Web开发中常用的组件。ASP (Active Server Pages) 是微软开发的服务器端脚本环境;HTML (HyperText Markup Language) 是用于创建网页的标准标记语言;PHP是服务器端脚本语言。在Wamp5这类集成环境中,可以很容易地对这些技术进行测试和开发,因为它们已经预配置在一起。 3. **服务器**: 在Web开发中,服务器是一个运行Web应用程序并响应客户端请求的软件或硬件系统。常见的服务器软件包括Apache、Nginx等。集成版工具提供了一个本地服务器环境,使得开发者可以在本地测试他们的应用程序。 ### 标签知识点 标签中仅出现了“PHP”一个关键词,这意味着该工具专注于与PHP相关的开发环境配置。 ### 压缩包子文件的文件名称列表知识点 1. **wamp.exe**: 这是Wamp5集成版工具的可执行文件,用户通过运行这个文件,即可启动Wamp5环境,开始进行PHP等相关开发。 2. **使用说明文档.txt**: 通常这样的文本文件包含了软件的安装指南、功能介绍、常见问题解答以及操作教程等内容。它是用户了解和使用软件的重要辅助资料,可以帮助用户快速上手。 ### 总结 Wamp5作为一个PHP集成版工具,为PHP开发者提供了一个便捷的本地开发环境。它通过自动化配置减少了开发者在搭建环境时的麻烦,整合了Web开发的关键组件,从而使得开发者能够专注于代码的编写,而不是环境配置。此外,它还提供了详尽的使用文档,帮助用户理解和运用这一工具,有效地提高了开发效率。对于希望快速入门PHP或需要一个简单易用的本地开发环境的开发者来说,Wamp5是一个非常好的选择。
recommend-type

【数据融合技术】:甘肃土壤类型空间分析中的专业性应用

# 摘要 数据融合技术作为一种集成多源数据信息的方法,在土壤类型空间分析中发挥着关键作用。本文介绍了数据融合技术的基本概念及其理论基础,阐述了数据预处理、同步整合及冲突解决等关键技术,并详细描述了甘肃土壤类型数据准备的流程,包括数据采集、质量评估、空间化处理及融合实践准备。通过具体案例分析,展示了数据融合在土壤类型空间分布分析、土壤质量评估及土壤保护规划中的应用。同时,文
recommend-type

sht20温湿度传感器使用什么将上拉电阻和滤波电容引出

<think>好的,我需要解决用户关于SHT20温湿度传感器的上拉电阻和滤波电容的引出方法问题。首先,我得回忆一下SHT20的基本特性和常见电路设计。SHT20是数字传感器,通常使用I2C接口,所以上拉电阻是必须的,用于确保数据线和时钟线的稳定。根据引用[2],SHT10是通过SCK和DATA线与单片机通信,而SHT30在引用[3]中使用I2C协议,需要上拉电阻。虽然用户问的是SHT20,但SHT系列通常设计类似,所以可以推断SHT20也需要类似的上拉电阻配置。通常I2C总线的上拉电阻值在4.7kΩ到10kΩ之间,但具体值可能取决于总线速度和电源电压。需要确认数据手册中的推荐值,但用户可能没有
recommend-type

Delphi仿速达财务软件导航条组件开发教程

Delphi作为一款历史悠久的集成开发环境(IDE),由Embarcadero Technologies公司开发,它使用Object Pascal语言,被广泛应用于Windows平台下的桌面应用程序开发。在Delphi中开发组件是一项核心技术,它允许开发者创建可复用的代码单元,提高开发效率和软件模块化水平。本文将详细介绍如何在Delphi环境下仿制速达财务软件中的导航条组件,这不仅涉及到组件的创建和使用,还会涉及界面设计和事件处理等技术点。 首先,需要了解Delphi组件的基本概念。在Delphi中,组件是一种特殊的对象,它们被放置在窗体(Form)上,可以响应用户操作并进行交互。组件可以是可视的,也可以是不可视的,可视组件在设计时就能在窗体上看到,如按钮、编辑框等;不可视组件则主要用于后台服务,如定时器、数据库连接等。组件的源码可以分为接口部分和实现部分,接口部分描述组件的属性和方法,实现部分包含方法的具体代码。 在开发仿速达财务软件的导航条组件时,我们需要关注以下几个方面的知识点: 1. 组件的继承体系 仿制组件首先需要确定继承体系。在Delphi中,大多数可视组件都继承自TControl或其子类,如TPanel、TButton等。导航条组件通常会继承自TPanel或者TWinControl,这取决于导航条是否需要支持子组件的放置。如果导航条只是单纯的一个显示区域,TPanel即可满足需求;如果导航条上有多个按钮或其他控件,可能需要继承自TWinControl以提供对子组件的支持。 2. 界面设计与绘制 组件的外观和交互是用户的第一印象。在Delphi中,可视组件的界面主要通过重写OnPaint事件来完成。Delphi提供了丰富的绘图工具,如Canvas对象,使用它可以绘制各种图形,如直线、矩形、椭圆等,并且可以对字体、颜色进行设置。对于导航条,可能需要绘制背景图案、分隔线条、选中状态的高亮等。 3. 事件处理 导航条组件需要响应用户的交互操作,例如鼠标点击事件。在Delphi中,可以通过重写组件的OnClick事件来响应用户的点击操作,进而实现导航条的导航功能。如果导航条上的项目较多,还可能需要考虑使用滚动条,让更多的导航项能够显示在窗体上。 4. 用户自定义属性和方法 为了使组件更加灵活和强大,开发者通常会为组件添加自定义的属性和方法。在导航条组件中,开发者可能会添加属性来定义按钮个数、按钮文本、按钮位置等;同时可能会添加方法来处理特定的事件,如自动调整按钮位置以适应不同的显示尺寸等。 5. 数据绑定和状态同步 在财务软件中,导航条往往需要与软件其他部分的状态进行同步。例如,用户当前所处的功能模块会影响导航条上相应项目的选中状态。这通常涉及到数据绑定技术,Delphi支持组件间的属性绑定,通过数据绑定可以轻松实现组件状态的同步。 6. 导航条组件的封装和发布 开发完毕后,组件需要被封装成独立的单元供其他项目使用。封装通常涉及将组件源码保存为pas文件,并在设计时能够在组件面板中找到。发布组件可能还需要编写相应的安装包和使用文档,方便其他开发者安装和使用。 7. Delphi IDE的支持 Delphi IDE提供了组件面板编辑器(Component Palette),允许开发者将开发好的组件添加到组件面板中。在组件面板编辑器中,可以自定义组件的图标和分类,使得组件在Delphi中的使用更为便捷。 通过以上的知识点梳理,可以看出Delphi仿速达导航条组件的开发涉及到的不仅仅是简单的代码编写,还涉及到用户界面设计、事件驱动编程、组件封装等多个方面。掌握这些知识点,对于一名Delphi开发者而言,是十分重要的。
recommend-type

【空间分布规律】:甘肃土壤类型与农业生产的关联性研究

# 摘要 本文对甘肃土壤类型及其在农业生产中的作用进行了系统性研究。首先概述了甘肃土壤类型的基础理论,并探讨了土壤类型与农业生产的理论联系。通过GIS技术分析,本文详细阐述了甘肃土壤的空间分布规律,并对其特征和影响因素进行了深入分析。此外,本文还研究了甘肃土壤类型对农业生产实际影响,包括不同区域土壤改良和作物种植案例,以及土壤养分、水分管理对作物生长周期和产量的具体影响。最后,提出了促进甘肃土壤与农业可持续发展的策略,包括土壤保护、退化防治对策以及土壤类型优化与农业创新的结合。本文旨在为
recommend-type

常见运放电路的基本结构和基本原理

<think>首先,用户的问题是关于运算放大器电路的基本结构和工作原理。我需要根据系统级指令来构建回答。系统级指令:-所有行内数学表达式必须使用$...$格式。-独立公式必须使用$$...$$格式并单独成段。-LaTeX语法正确。-使用中文回答。-生成相关问题。-回答中引用的段落末尾自然地添加引用标识,例如[^1]。-回答结构清晰,帮助用户逐步解决问题,尽量保证回答真实可靠。参考站内引用:-引用[1]:关于运算放大器基本电路用法,特别是反相放大器电路。-引用[2]:关于uA741运算放大器电路的基本原理,包括输入级、输出级等。用户的问题:"我想了解运放电路的基本结构和工作原理请问运算放大器电路
recommend-type

ASP.NET2.0初学者个人网站实例分享

标题:“ASP.NET 2.0个人网站”指向了一个网站开发项目,这个项目是使用ASP.NET 2.0框架构建的。ASP.NET 2.0是微软公司推出的一种用于Web开发的服务器端技术,它是.NET Framework的一部分。这个框架允许开发者构建动态网站、网络应用程序和网络服务。开发者可以使用C#或VB.NET等编程语言来编写应用程序。由于这被标签为“2.0”,我们可以假设这是一个较早版本的ASP.NET,相较于后来的版本,它可能没有那么先进的特性,但对于初学者来说,它提供了基础并且易于上手的工具和控件来学习Web开发。 描述:“个人练习所做,适合ASP.NET初学者参考啊,有兴趣的可以前来下载去看看,同时帮小弟我赚些积分”提供了关于该项目的背景信息。它是某个个人开发者或学习者为了实践和学习ASP.NET 2.0而创建的个人网站项目。这个项目被描述为适合初学者作为学习参考。开发者可能是为了积累积分或网络声誉,鼓励他人下载该项目。这样的描述说明了该项目可以被其他人获取,进行学习和参考,或许还能给予原作者一些社区积分或其他形式的回报。 标签:“2.0”表明这个项目专门针对ASP.NET的2.0版本,可能意味着它不是最新的项目,但是它可以帮助初学者理解早期ASP.NET版本的设计和开发模式。这个标签对于那些寻找具体版本教程或资料的人来说是有用的。 压缩包子文件的文件名称列表:“MySelf”表示在分享的压缩文件中,可能包含了与“ASP.NET 2.0个人网站”项目相关的所有文件。文件名“我的”是中文,可能是指创建者以“我”为中心构建了这个个人网站。虽然文件名本身没有提供太多的信息,但我们可以推测它包含的是网站源代码、相关资源文件、数据库文件(如果有的话)、配置文件和可能的文档说明等。 知识点总结: 1. ASP.NET 2.0是.NET Framework下的一个用于构建Web应用程序的服务器端框架。 2. 它支持使用C#和VB.NET等.NET支持的编程语言进行开发。 3. ASP.NET 2.0提供了一组丰富的控件,可帮助开发者快速构建Web表单、用户界面以及实现后台逻辑。 4. 它还提供了一种称作“Web站点”项目模板,使得初学者能够方便地开始Web开发项目。 5. ASP.NET 2.0是微软.NET历史上一个重要的里程碑,引入了许多创新特性,如成员资格和角色管理、主题和皮肤、网站导航和个性化设置等。 6. 在学习ASP.NET 2.0的过程中,初学者可以了解到如HTTP请求和响应、服务器控件、状态管理、数据绑定、缓存策略等基础概念。 7. 本项目可作为ASP.NET初学者的实践平台,帮助他们理解框架的基本结构和工作流程,从而为学习更高版本的ASP.NET打下坚实基础。 8. 个人网站项目的构建可以涵盖前端设计(HTML, CSS, JavaScript)和后端逻辑(C#或VB.NET)的综合应用。 9. 在学习过程中,初学者应该学会如何配置和使用IIS(Internet Information Services)来部署ASP.NET网站。 10. “赚取积分”可能指的是在某个在线社区、论坛或代码托管平台上,通过分享项目来获得一定的积分或奖励,这通常是用来衡量用户对社区贡献大小的一种方式。 综上所述,该“ASP.NET 2.0个人网站”项目不仅为初学者提供了一个实用的学习资源,同时体现了开发者对于开源共享精神的实践,对社区贡献出自己的力量。通过这样的实践,初学者能够更好地理解ASP.NET框架的运作,逐步建立起自己的Web开发技能。
recommend-type

【制图技术】:甘肃高质量土壤分布TIF图件的成图策略

# 摘要 本文针对甘肃土壤分布数据的TIF图件制作进行了系统研究。首先概述了甘肃土壤的分布情况,接着介绍了TIF图件的基础知识,包括其格式特点、空间数据表达以及质量控制方法。随后,文中构建了成图策略的理论框架,分析了土壤分布图的信息需求与数据处理流程,并探讨了成图原则与标准。在实践操作部分,详细阐述了制图软