活动介绍

#coding:gbk import pandas as pd import numpy as np import talib from xtquant import xtdata def init(C): C.stock = C.stockcode + '.' + C.market C.accountid = "testS" C.position_rate = 0.1 C.profit_target = 0.05 C.loss_target = -0.02 C.max_daily_buy = 3 C.daily_buy_count = 0 C.last_trade_date = None # 下载历史数据 xtdata.download_history_data(C.stock, '5m') # 初始化技术指标变量 C.close = [] C.ma5 = [] C.ma20 = [] C.rsi = [] C.macd = [] C.signal = [] C.volume_ratio = 0 def handlebar(C): # 获取当前时间 bar_timetag = C.get_bar_timetag(C.barpos) bar_date = timetag_to_datetime(bar_timetag, '%Y%m%d%H%M%S') current_date = bar_date[:8] # 重置每日买入计数 if C.last_trade_date != current_date: C.daily_buy_count = 0 C.last_trade_date = current_date # 获取5分钟数据 min5_data = xtdata.get_market_data_ex(['open', 'high', 'low', 'close', 'volume'], [C.stock], period='5m', count=100) if len(min5_data) < 100: print(f"{bar_date} 5分钟数据不足,跳过") return # 计算技术指标 close = np.array(min5_data[C.stock]['close']) volume = np.array(min5_data[C.stock]['volume']) C.close = close C.ma5 = talib.MA(close, timeperiod=5) C.ma20 = talib.MA(close, timeperiod=20) C.rsi = talib.RSI(close, timeperiod=14) C.macd, C.signal, _ = talib.MACD(close, fastperiod=12, slowperiod=26, signalperiod=9) avg_volume = np.mean(volume[-6:-1]) C.volume_ratio = volume[-1] / avg_volume # 获取账户和持仓信息 account = get_trade_detail_data('test', 'stock', 'account')[0] available_cash = int(account.m_dAvailable) total_asset = int(account.m_dBalance) holdings = get_trade_detail_data('test', 'stock', 'position') holdings = {i.m_strInstrumentID + '.' + i.m_strExchangeID: {'volume': i.m_nVolum

时间: 2025-06-26 22:24:29 浏览: 20
### 技术指标计算与市场数据处理 #### 使用 Pandas 计算移动平均线 (MA) 通过 `pandas` 的滚动窗口功能可以轻松实现移动平均线的计算。以下是基于给定代码片段扩展的一个示例: ```python import pandas as pd def calculate_ma(dataframe, window_size): """ 计算指定窗口大小的简单移动平均线。 参数: dataframe: 包含收盘价的数据框。 window_size: 移动平均线的时间窗口长度。 返回: 新增一列 MA_<window_size> 表示移动平均值的结果。 """ ma_series = dataframe['Close'].rolling(window=window_size).mean() dataframe[f'MA_{window_size}'] = ma_series return dataframe ``` 上述函数利用了 `DataFrame.rolling()` 方法,该方法支持多种统计运算,例如均值、标准差等[^4]。 --- #### 使用 NumPy 实现相对强弱指数 (RSI) 相对强弱指数 RSI 是一种衡量资产价格变动幅度的技术分析工具。下面是一个简单的 RSI 计算方式: ```python import numpy as np def calculate_rsi(dataframe, period=14): """ 计算相对强弱指数(RSI)。 参数: dataframe: 数据框对象,需包含 Close 列。 period: 时间周期,默认为 14 天。 返回: 增加了一列名为 'RSI' 的新数据框。 """ delta = dataframe['Close'].diff().dropna() gain = (delta.where(delta > 0, 0)).fillna(0) loss = (-delta.where(delta < 0, 0)).fillna(0) avg_gain = gain.rolling(period).mean() avg_loss = loss.rolling(period).mean() rs = avg_gain / avg_loss rsi = 100 - (100 / (1 + rs)) dataframe['RSI'] = rsi.fillna(method='bfill') return dataframe ``` 此部分实现了经典的 Wilder 平滑算法来计算 RSI 值[^2]。 --- #### 使用 TA-Lib 库简化 MACD 指标的计算 TA-Lib 是一个广泛使用的金融技术分析库,能够快速完成复杂指标的计算。以下是如何使用它来生成 MACD 指标: ```python import talib def calculate_macd(dataframe, fast_period=12, slow_period=26, signal_period=9): """ 使用 TA-Lib 计算 MACD 指标。 参数: dataframe: 输入数据框,必须有 'Close' 列。 fast_period: 快速 EMA 的时间周期。 slow_period: 缓慢 EMA 的时间周期。 signal_period: 信号线的时间周期。 返回: 修改后的数据框新增三列:MACD、Signal 和 Histogram。 """ macd_line, signal_line, hist = talib.MACD( dataframe['Close'], fastperiod=fast_period, slowperiod=slow_period, signalperiod=signal_period ) dataframe['MACD'] = macd_line dataframe['Signal'] = signal_line dataframe['Histogram'] = hist return dataframe ``` 这段代码调用了 TA-Lib 中预定义好的 `MACD` 函数,从而避免手动编写复杂的指数平滑过程[^3]。 --- #### xtquant 接口获取市场数据 xtquant 是通达信提供的量化交易平台 API,可用于实时行情抓取和回测模拟等功能。下面是其基本用法之一——获取历史 K 线数据: ```python from xtquant import xtdata def fetch_stock_data(stock_code, frequency, count, end_time=None): """ 获取某只股票的历史K线数据。 参数: stock_code: 股票代码字符串形式,如 "SHSE.600519"。 frequency: 数据频率,可选值:"1m", "5m", "1d" 等。 count: 请求记录条数。 end_time: 结束时间戳,默认当前时刻。 返回: DataFrame 类型的历史数据。 """ data = xtdata.get_kline_data_extended( security=stock_code, period=frequency, start_time='', end_time=end_time or '', count=count, fields=['time_key', 'open', 'high', 'low', 'close'] ) df = pd.DataFrame(data) df.set_index('time_key', inplace=True) return df ``` 以上脚本展示了如何通过 xtquant 来请求特定时间段内的高频或日频数据[^1]。 --- ### 总结 综上所述,在 Python 中进行股票量化交易策略开发时,可以选择不同的技术和框架组合。对于基础数据分析任务来说,Pandas 已经足够强大;而当涉及更高级别的技术指标时,则推荐引入第三方专用库比如 TA-Lib 或者自定义实现逻辑。
阅读全文

相关推荐

帮我把出去排名前20的股票改成输出排名前50的股票:# -*- coding: utf-8 -*- """ Created on Sun Jul 20 16:28:49 2025 @author: srx20 """ # -*- coding: utf-8 -*- """ 股票上涨概率预测程序 - 输出前20名股票 """ import os import pandas as pd import numpy as np import joblib import talib as ta from tqdm import tqdm import logging import datetime # 设置日志记录 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('stock_prediction_inference.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # ========== 配置类 ========== class PredictionConfig: def __init__(self): # 数据路径 self.SH_PATH = r"D:\股票量化数据库\股票csv数据\上证" self.SZ_PATH = r"D:\股票量化数据库\股票csv数据\深证" # 预测日期范围 self.START_DATE = "2024-01-01" # 足够历史数据计算特征 self.END_DATE = datetime.datetime.now().strftime("%Y-%m-%d") # 当前日期 # 模型路径 self.CLUSTER_MODEL_PATH = "stock_cluster_model.pkl" self.PREDICTION_MODEL_PATH = "stock_prediction_model.pkl" # 输出设置 self.TOP_N = 20 # 输出前N名股票 self.OUTPUT_FILE = "top_stocks.csv" # ========== 内存管理工具 ========== def reduce_mem_usage(df): """优化DataFrame内存使用""" for col in df.columns: col_type = df[col].dtype if col_type != object: c_min = df[col].min() c_max = df[col].max() if str(col_type)[:3] == 'int': if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max: df[col] = df[col].astype(np.int8) elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max: df[col] = df[col].astype(np.int16) elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max: df[col] = df[col].astype(np.int32) elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max: df[col] = df[col].astype(np.int64) else: if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max: df[col] = df[col].ast(np.float16) elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max: df[col] = df[col].astype(np.float32) else: df[col] = df[col].astype(np.float64) return df # ========== 数据加载 ========== def load_stock_data_for_prediction(sh_path, sz_path, start_date, end_date): """加载股票数据用于预测""" stock_data = {} # 创建文件列表 all_files = [] for exchange, path in [('SH', sh_path), ('SZ', sz_path)]: if os.path.exists(path): csv_files = [f for f in os.listdir(path) if f.endswith('.csv')] for file in csv_files: all_files.append((exchange, path, file)) if not all_files: logger.warning("没有找到任何CSV文件") return stock_data total_files = len(all_files) pbar = tqdm(total=total_files, desc='加载股票数据') loaded_count = 0 for exchange, path, file in all_files: if file.endswith('.csv'): stock_code = f"{exchange}_{file.split('.')[0]}" file_path = os.path.join(path, file) try: # 读取数据并验证列名 df = pd.read_csv(file_path) # 验证必要的列是否存在 required_cols = ['date', 'open', 'high', 'low', 'close', 'volume'] if not all(col in df.columns for col in required_cols): logger.warning(f"股票 {stock_code} 缺少必要列,跳过") pbar.update(1) continue # 转换日期并过滤 df['date'] = pd.to_datetime(df['date']) df = df[(df['date'] >= start_date) & (df['date'] <= end_date)] if len(df) < 100: # 至少100个交易日 logger.info(f"股票 {stock_code} 数据不足({len(df)}条),跳过") pbar.update(1) continue # 转换数据类型 for col in ['open', 'high', 'low', 'close']: df[col] = pd.to_numeric(df[col], errors='coerce').astype(np.float32) df['volume'] = pd.to_numeric(df['volume'], errors='coerce').astype(np.uint32) # 删除包含NaN的行 df = df.dropna(subset=required_cols) if len(df) > 0: stock_data[stock_code] = df loaded_count += 1 logger.debug(f"成功加载股票 {stock_code},数据条数: {len(df)}") else: logger.warning(f"股票 {stock_code} 过滤后无数据") except Exception as e: logger.error(f"加载股票 {stock_code} 失败: {str(e)}", exc_info=True) pbar.update(1) pbar.close() logger.info(f"成功加载 {len(stock_data)} 只股票数据") return stock_data # ========== 特征工程 ========== class FeatureEngineer: def __init__(self): pass def safe_fillna(self, series, default=0): """安全填充NaN值""" if isinstance(series, pd.Series): return series.fillna(default) elif isinstance(series, np.ndarray): return np.nan_to_num(series, nan=default) return series def transform(self, df): """添加技术指标特征""" try: # 创建临时副本用于TA-Lib计算 df_temp = df.copy() # 将价格列转换为float64以满足TA-Lib要求 for col in ['open', 'high', 'low', 'close']: df_temp[col] = df_temp[col].astype(np.float64) # 基础特征 df['price_change'] = df['close'].pct_change().fillna(0) df['volatility'] = df['close'].rolling(5).std().fillna(0) df['volume_change'] = df['volume'].pct_change().fillna(0) df['MA5'] = df['close'].rolling(5).mean().fillna(0) df['MA20'] = df['close'].rolling(20).mean().fillna(0) # 技术指标 rsi = ta.RSI(df_temp['close'].values, timeperiod=14) df['RSI14'] = self.safe_fillna(rsi, 50) macd, macd_signal, macd_hist = ta.MACD( df_temp['close'].values, fastperiod=12, slowperiod=26, signalperiod=9 ) df['MACD_hist'] = self.safe_fillna(macd_hist, 0) # 新增特征 mom = ta.MOM(df_temp['close'].values, timeperiod=10) df['MOM10'] = self.safe_fillna(mom, 0) atr = ta.ATR( df_temp['high'].values, df_temp['low'].values, df_temp['close'].values, timeperiod=14 ) df['ATR14'] = self.safe_fillna(atr, 0) # 成交量加权平均价 vwap = (df['volume'] * (df['high'] + df['low'] + df['close']) / 3).cumsum() / df['volume'].cumsum() df['VWAP'] = self.safe_fillna(vwap, 0) # 相对强弱指数差值 df['RSI_diff'] = df['RSI14'] - df['RSI14'].rolling(5).mean().fillna(0) # 价格波动比率 df['price_vol_ratio'] = df['price_change'] / (df['volatility'].replace(0, 1e-8) + 1e-8) # 技术指标组合特征 df['MACD_RSI'] = df['MACD_hist'] * df['RSI14'] # 市场情绪指标 df['advance_decline'] = (df['close'] > df['open']).astype(int).rolling(5).sum().fillna(0) # 时间特征 df['day_of_week'] = df['date'].dt.dayofweek df['month'] = df['date'].dt.month # 处理无穷大和NaN df = df.replace([np.inf, -np.inf], np.nan) df = df.fillna(0) # 优化内存 return reduce_mem_usage(df) except Exception as e: logger.error(f"特征工程失败: {str(e)}", exc_info=True) # 返回基本特征作为回退方案 df['price_change'] = df['close'].pct_change().fillna(0) df['volatility'] = df['close'].rolling(5).std().fillna(0) df['volume_change'] = df['volume'].pct_change().fillna(0) df['MA5'] = df['close'].rolling(5).mean().fillna(0) df['MA20'] = df['close'].rolling(20).mean().fillna(0) # 填充缺失的技术指标 for col in ['RSI14', 'MACD_hist', 'MOM10', 'ATR14', 'VWAP', 'RSI_diff', 'price_vol_ratio', 'MACD_RSI', 'advance_decline']: if col not in df.columns: df[col] = 0 return df # ========== 聚类模型 ========== class StockCluster: def __init__(self, model_path): # 加载聚类模型 if os.path.exists(model_path): model_data = joblib.load(model_path) self.kmeans = model_data['kmeans'] self.scaler = model_data['scaler'] self.cluster_map = model_data['cluster_map'] logger.info(f"从 {model_path} 加载聚类模型") else: logger.error("聚类模型文件不存在") self.cluster_map = {} def transform(self, df, stock_code): """为数据添加聚类特征""" cluster_id = self.cluster_map.get(stock_code, -1) # 默认为-1表示未知聚类 df['cluster'] = cluster_id return df # ========== 预测执行器 ========== class StockPredictor: def __init__(self, config): self.config = config # 加载预测模型 if os.path.exists(self.config.PREDICTION_MODEL_PATH): self.model, self.selected_features = joblib.load(self.config.PREDICTION_MODEL_PATH) logger.info(f"从 {self.config.PREDICTION_MODEL_PATH} 加载预测模型") logger.info(f"模型使用的特征: {self.selected_features}") else: logger.error("预测模型文件不存在") self.model = None self.selected_features = [] # 加载聚类模型 self.cluster_model = StockCluster(self.config.CLUSTER_MODEL_PATH) # 特征工程 self.feature_engineer = FeatureEngineer() def prepare_prediction_data(self, stock_data): """准备预测数据""" logger.info("准备预测数据...") prediction_data = [] for stock_code, df in tqdm(stock_data.items(), desc="处理股票数据"): try: # 特征工程 df = self.feature_engineer.transform(df.copy()) # 添加聚类特征 df = self.cluster_model.transform(df, stock_code) # 只取最新一天的数据 latest_data = df.iloc[-1].copy() # 确保所有特征都存在 for feature in self.selected_features: if feature not in latest_data: latest_data[feature] = 0 # 填充默认值 # 选择所需特征 features = latest_data[self.selected_features].values.reshape(1, -1) prediction_data.append({ 'stock_code': stock_code, 'date': latest_data['date'], 'features': features, 'open': latest_data['open'], 'high': latest_data['high'], 'low': latest_data['low'], 'close': latest_data['close'], 'volume': latest_data['volume'] }) except Exception as e: logger.error(f"处理股票 {stock_code} 失败: {str(e)}", exc_info=True) return prediction_data def predict(self, prediction_data): """执行预测并返回结果""" if self.model is None: logger.error("没有可用的预测模型") return None logger.info("执行预测...") results = [] for data in tqdm(prediction_data, desc="预测股票"): try: # 预测正类概率 proba = self.model.predict_proba(data['features'])[0][1] results.append({ 'stock_code': data['stock_code'], 'date': data['date'], 'probability': proba, 'open': data['open'], 'high': data['high'], 'low': data['low'], 'close': data['close'], 'volume': data['volume'] }) except Exception as e: logger.error(f"预测股票 {data['stock_code']} 失败: {str(e)}", exc_info=True) return results def get_top_stocks(self, results, top_n=20): """获取概率最高的前N只股票""" if not results: return [] # 转换为DataFrame并排序 df = pd.DataFrame(results) df = df.sort_values('probability', ascending=False).head(top_n) # 添加排名 df['rank'] = range(1, len(df) + 1) # 格式化输出 df['probability'] = df['probability'].apply(lambda x: f"{x:.2%}") df['date'] = df['date'].dt.strftime('%Y-%m-%d') return df def save_results(self, df, output_file): """保存结果到CSV文件""" if df is not None and not df.empty: df.to_csv(output_file, index=False) logger.info(f"结果已保存到: {output_file}") return True return False # ========== 主程序 ========== def main(): # 初始化配置 config = PredictionConfig() logger.info("===== 股票上涨概率预测程序 =====") logger.info(f"预测日期: {config.END_DATE}") # 加载股票数据 logger.info(f"加载股票数据: {config.START_DATE} 至 {config.END_DATE}") stock_data = load_stock_data_for_prediction( config.SH_PATH, config.SZ_PATH, config.START_DATE, config.END_DATE ) if not stock_data: logger.error("错误: 没有加载到任何股票数据") return # 初始化预测器 predictor = StockPredictor(config) if predictor.model is None: return # 准备预测数据 prediction_data = predictor.prepare_prediction_data(stock_data) if not prediction_data: logger.error("错误: 没有准备好预测数据") return # 执行预测 results = predictor.predict(prediction_data) if not results: logger.error("错误: 没有预测结果") return # 获取前N名股票 top_stocks = predictor.get_top_stocks(results, config.TOP_N) # 输出结果 logger.info("\n===== 上涨概率最高的前20只股票 =====") print(top_stocks[['rank', 'stock_code', 'probability', 'close', 'date']].to_string(index=False)) # 保存结果 predictor.save_results(top_stocks, config.OUTPUT_FILE) logger.info("===== 程序执行完成 =====") if __name__ == "__main__": main()

# -*- coding: utf-8 -*- """ Created on Sun Jul 20 16:20:23 2025 @author: srx20 """ # -*- coding: utf-8 -*- """ Created on Sun Jul 20 16:00:01 2025 @author: srx20 """ import os import gc import numpy as np import pandas as pd import joblib import talib as ta from tqdm import tqdm import random from sklearn.cluster import MiniBatchKMeans from sklearn.preprocessing import StandardScaler from sklearn.model_selection import RandomizedSearchCV, GroupKFold from sklearn.feature_selection import SelectKBest, f_classif from sklearn.metrics import make_scorer, recall_score, classification_report import lightgbm as lgb import logging import psutil import warnings from scipy import sparse warnings.filterwarnings('ignore') # 设置日志记录 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('stock_prediction_fixed.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # ========== 配置类 ========== class StockConfig: def __init__(self): # 数据路径 self.SH_PATH = r"D:\股票量化数据库\股票csv数据\上证" self.SZ_PATH = r"D:\股票量化数据库\股票csv数据\深证" # 时间范围 self.START_DATE = "2018-01-01" self.END_DATE = "2025-7-18" self.TEST_START = "2021-01-01" self.TEST_END = "2024-12-31" # 聚类设置 self.CLUSTER_NUM = 8 self.CLUSTER_FEATURES = [ 'price_change', 'volatility', 'volume_change', 'MA5', 'MA20', 'RSI14', 'MACD_hist' ] # 预测特征 (初始列表,实际使用时会动态更新) self.PREDICT_FEATURES = [ 'open', 'high', 'low', 'close', 'volume', 'price_change', 'volatility', 'volume_change', 'MA5', 'MA20', 'RSI14', 'MACD_hist', 'cluster', 'MOM10', 'ATR14', 'VWAP', 'RSI_diff', 'price_vol_ratio', 'MACD_RSI', 'advance_decline', 'day_of_week', 'month' ] # 模型参数优化范围(内存优化版) self.PARAM_GRID = { 'boosting_type': ['gbdt'], # 减少选项 'num_leaves': [31, 63], # 减少选项 'max_depth': [-1, 7], # 减少选项 'learning_rate': [0.01, 0.05], 'n_estimators': [300, 500], # 减少选项 'min_child_samples': [50], # 固定值 'min_split_gain': [0.0, 0.1], 'reg_alpha': [0, 0.1], 'reg_lambda': [0, 0.1], 'feature_fraction': [0.7, 0.9], 'bagging_fraction': [0.7, 0.9], 'bagging_freq': [1] } # 目标条件 self.MIN_GAIN = 0.05 self.MIN_LOW_RATIO = 0.98 # 调试模式 self.DEBUG_MODE = False self.MAX_STOCKS = 50 if self.DEBUG_MODE else None self.SAMPLE_FRACTION = 0.3 if not self.DEBUG_MODE else 1.0 # 采样比例 # ========== 内存管理工具 ========== def reduce_mem_usage(df): """优化DataFrame内存使用""" start_mem = df.memory_usage().sum() / 1024**2 for col in df.columns: col_type = df[col].dtype if col_type != object: c_min = df[col].min() c_max = df[col].max() if str(col_type)[:3] == 'int': if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max: df[col] = df[col].astype(np.int8) elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max: df[col] = df[col].astype(np.int16) elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max: df[col] = df[col].astype(np.int32) elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max: df[col] = df[col].astype(np.int64) else: if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max: df[col] = df[col].astype(np.float16) elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max: df[col] = df[col].astype(np.float32) else: df[col] = df[col].astype(np.float64) end_mem = df.memory_usage().sum() / 1024**2 logger.info(f'内存优化: 从 {start_mem:.2f} MB 减少到 {end_mem:.2f} MB ({100*(start_mem-end_mem)/start_mem:.1f}%)') return df def print_memory_usage(): """打印当前内存使用情况""" process = psutil.Process(os.getpid()) mem = process.memory_info().rss / (1024 ** 2) logger.info(f"当前内存使用: {mem:.2f} MB") # ========== 数据加载 (修复版) ========== def load_stock_data(sh_path, sz_path, start_date, end_date, sample_fraction=1.0, debug_mode=False, max_stocks=None): """加载股票数据,并过滤日期范围(修复随机抽样问题)""" stock_data = {} # 创建文件列表 all_files = [] for exchange, path in [('SH', sh_path), ('SZ', sz_path)]: if os.path.exists(path): csv_files = [f for f in os.listdir(path) if f.endswith('.csv')] for file in csv_files: all_files.append((exchange, path, file)) if not all_files: logger.warning("没有找到任何CSV文件") return stock_data # 随机抽样(修复一维问题) if sample_fraction < 1.0: sample_size = max(1, int(len(all_files) * sample_fraction)) # 使用random.sample代替np.random.choice all_files = random.sample(all_files, sample_size) logger.info(f"抽样 {len(all_files)} 只股票文件 (比例: {sample_fraction})") total_files = len(all_files) pbar = tqdm(total=total_files, desc='加载股票数据') loaded_count = 0 for exchange, path, file in all_files: if max_stocks is not None and loaded_count >= max_stocks: break if file.endswith('.csv'): stock_code = f"{exchange}_{file.split('.')[0]}" file_path = os.path.join(path, file) try: # 读取数据并验证列名 df = pd.read_csv(file_path) # 验证必要的列是否存在 required_cols = ['date', 'open', 'high', 'low', 'close', 'volume'] if not all(col in df.columns for col in required_cols): logger.warning(f"股票 {stock_code} 缺少必要列,跳过") pbar.update(1) continue # 转换日期并过滤 df['date'] = pd.to_datetime(df['date']) df = df[(df['date'] >= start_date) & (df['date'] <= end_date)] if len(df) < 100: # 至少100个交易日 logger.info(f"股票 {stock_code} 数据不足({len(df)}条),跳过") pbar.update(1) continue # 转换数据类型 for col in ['open', 'high', 'low', 'close']: df[col] = pd.to_numeric(df[col], errors='coerce').astype(np.float32) df['volume'] = pd.to_numeric(df['volume'], errors='coerce').astype(np.uint32) # 删除包含NaN的行 df = df.dropna(subset=required_cols) if len(df) > 0: stock_data[stock_code] = df loaded_count += 1 logger.debug(f"成功加载股票 {stock_code},数据条数: {len(df)}") else: logger.warning(f"股票 {stock_code} 过滤后无数据") except Exception as e: logger.error(f"加载股票 {stock_code} 失败: {str(e)}", exc_info=True) pbar.update(1) # 调试模式只处理少量股票 if debug_mode and loaded_count >= 10: logger.info("调试模式: 已加载10只股票,提前结束") break pbar.close() logger.info(f"成功加载 {len(stock_data)} 只股票数据") return stock_data # ========== 特征工程 (修复版) ========== class FeatureEngineer: def __init__(self, config): self.config = config def safe_fillna(self, series, default=0): """安全填充NaN值""" if isinstance(series, pd.Series): return series.fillna(default) elif isinstance(series, np.ndarray): return np.nan_to_num(series, nan=default) return series def transform(self, df): """添加技术指标特征(修复NumPy数组问题)""" try: # 创建临时副本用于TA-Lib计算 df_temp = df.copy() # 将价格列转换为float64以满足TA-Lib要求 for col in ['open', 'high', 'low', 'close']: df_temp[col] = df_temp[col].astype(np.float64) # 基础特征 df['price_change'] = df['close'].pct_change().fillna(0) df['volatility'] = df['close'].rolling(5).std().fillna(0) df['volume_change'] = df['volume'].pct_change().fillna(0) df['MA5'] = df['close'].rolling(5).mean().fillna(0) df['MA20'] = df['close'].rolling(20).mean().fillna(0) # 技术指标 - 修复NumPy数组问题 rsi = ta.RSI(df_temp['close'].values, timeperiod=14) df['RSI14'] = self.safe_fillna(rsi, 50) macd, macd_signal, macd_hist = ta.MACD( df_temp['close'].values, fastperiod=12, slowperiod=26, signalperiod=9 ) df['MACD_hist'] = self.safe_fillna(macd_hist, 0) # 新增特征 mom = ta.MOM(df_temp['close'].values, timeperiod=10) df['MOM10'] = self.safe_fillna(mom, 0) atr = ta.ATR( df_temp['high'].values, df_temp['low'].values, df_temp['close'].values, timeperiod=14 ) df['ATR14'] = self.safe_fillna(atr, 0) # 成交量加权平均价 vwap = (df['volume'] * (df['high'] + df['low'] + df['close']) / 3).cumsum() / df['volume'].cumsum() df['VWAP'] = self.safe_fillna(vwap, 0) # 相对强弱指数差值 df['RSI_diff'] = df['RSI14'] - df['RSI14'].rolling(5).mean().fillna(0) # 价格波动比率 df['price_vol_ratio'] = df['price_change'] / (df['volatility'].replace(0, 1e-8) + 1e-8) # 技术指标组合特征 df['MACD_RSI'] = df['MACD_hist'] * df['RSI14'] # 市场情绪指标 df['advance_decline'] = (df['close'] > df['open']).astype(int).rolling(5).sum().fillna(0) # 时间特征 df['day_of_week'] = df['date'].dt.dayofweek df['month'] = df['date'].dt.month # 处理无穷大和NaN df = df.replace([np.inf, -np.inf], np.nan) df = df.fillna(0) # 优化内存 return reduce_mem_usage(df) except Exception as e: logger.error(f"特征工程失败: {str(e)}", exc_info=True) # 返回基本特征作为回退方案 df['price_change'] = df['close'].pct_change().fillna(0) df['volatility'] = df['close'].rolling(5).std().fillna(0) df['volume_change'] = df['volume'].pct_change().fillna(0) df['MA5'] = df['close'].rolling(5).mean().fillna(0) df['MA20'] = df['close'].rolling(20).mean().fillna(0) # 填充缺失的技术指标 for col in self.config.PREDICT_FEATURES: if col not in df.columns: df[col] = 0 return df # ========== 聚类模型 (添加保存/加载功能) ========== class StockCluster: def __init__(self, config): self.config = config self.scaler = StandardScaler() self.kmeans = MiniBatchKMeans( n_clusters=config.CLUSTER_NUM, random_state=42, batch_size=1000 ) self.cluster_map = {} # 股票代码到聚类ID的映射 self.model_file = "stock_cluster_model.pkl" # 模型保存路径 def save(self): """保存聚类模型到文件""" # 创建包含所有必要组件的字典 model_data = { 'kmeans': self.kmeans, 'scaler': self.scaler, 'cluster_map': self.cluster_map, 'config_cluster_num': self.config.CLUSTER_NUM } # 使用joblib保存模型 joblib.dump(model_data, self.model_file) logger.info(f"聚类模型已保存到: {self.model_file}") def load(self): """从文件加载聚类模型""" if os.path.exists(self.model_file): model_data = joblib.load(self.model_file) self.kmeans = model_data['kmeans'] self.scaler = model_data['scaler'] self.cluster_map = model_data['cluster_map'] logger.info(f"从 {self.model_file} 加载聚类模型") return True else: logger.warning("聚类模型文件不存在,需要重新训练") return False def fit(self, stock_data): """训练聚类模型""" logger.info("开始股票聚类分析...") cluster_features = [] # 提取每只股票的特征 for stock_code, df in tqdm(stock_data.items(), desc="提取聚类特征"): if len(df) < 100: # 至少100个交易日 continue features = {} for feat in self.config.CLUSTER_FEATURES: if feat in df.columns: # 使用统计特征 features[f"{feat}_mean"] = df[feat].mean() features[f"{feat}_std"] = df[feat].std() else: # 特征缺失时填充0 features[f"{feat}_mean"] = 0 features[f"{feat}_std"] = 0 cluster_features.append(features) if not cluster_features: logger.warning("没有可用的聚类特征,使用默认聚类") # 创建默认聚类映射 self.cluster_map = {code: 0 for code in stock_data.keys()} return self # 创建特征DataFrame feature_df = pd.DataFrame(cluster_features) feature_df = reduce_mem_usage(feature_df) # 标准化特征 scaled_features = self.scaler.fit_transform(feature_df) # 聚类 self.kmeans.fit(scaled_features) clusters = self.kmeans.predict(scaled_features) feature_df['cluster'] = clusters # 创建股票到聚类的映射 stock_codes = list(stock_data.keys())[:len(clusters)] # 确保长度匹配 for i, stock_code in enumerate(stock_codes): self.cluster_map[stock_code] = clusters[i] logger.info("聚类分布统计:") logger.info(feature_df['cluster'].value_counts().to_string()) logger.info(f"股票聚类完成,共分为 {self.config.CLUSTER_NUM} 个类别") # 训练完成后自动保存模型 self.save() return self def transform(self, df, stock_code): """为数据添加聚类特征""" cluster_id = self.cluster_map.get(stock_code, -1) # 默认为-1表示未知聚类 df['cluster'] = cluster_id return df # ========== 目标创建 ========== class TargetCreator: def __init__(self, config): self.config = config def create_targets(self, df): """创建目标变量 - 修改为收盘价高于开盘价5%""" # 计算次日收盘价相对于开盘价的涨幅 df['next_day_open_to_close_gain'] = df['close'].shift(-1) / df['open'].shift(-1) - 1 # 计算次日最低价与开盘价比例 df['next_day_low_ratio'] = df['low'].shift(-1) / df['open'].shift(-1) # 创建复合目标:收盘价比开盘价高5% 且 最低价≥开盘价98% df['target'] = 0 mask = (df['next_day_open_to_close_gain'] > self.config.MIN_GAIN) & \ (df['next_day_low_ratio'] >= self.config.MIN_LOW_RATIO) df.loc[mask, 'target'] = 1 # 删除最后一行(没有次日数据) df = df.iloc[:-1] # 检查目标分布 target_counts = df['target'].value_counts() logger.info(f"目标分布: 0={target_counts.get(0, 0)}, 1={target_counts.get(1, 0)}") # 添加调试信息 if self.config.DEBUG_MODE: sample_targets = df[['open', 'close', 'next_day_open_to_close_gain', 'target']].tail(5) logger.debug(f"目标创建示例:\n{sample_targets}") return df # ========== 模型训练 (内存优化版) ========== class StockModelTrainer: def __init__(self, config): self.config = config self.model_name = "stock_prediction_model" self.feature_importance = None def prepare_dataset(self, stock_data, cluster_model, feature_engineer): """准备训练数据集(内存优化版)""" logger.info("准备训练数据集...") X_list = [] y_list = [] stock_group_list = [] # 用于分组交叉验证 target_creator = TargetCreator(self.config) # 使用生成器减少内存占用 for stock_code, df in tqdm(stock_data.items(), desc="处理股票数据"): try: # 特征工程 df = feature_engineer.transform(df.copy()) # 添加聚类特征 df = cluster_model.transform(df, stock_code) # 创建目标 df = target_creator.create_targets(df) # 只保留所需特征和目标 features = self.config.PREDICT_FEATURES if 'target' not in df.columns: logger.warning(f"股票 {stock_code} 缺少目标列,跳过") continue X = df[features] y = df['target'] # 确保没有NaN值 if X.isnull().any().any(): logger.warning(f"股票 {stock_code} 特征包含NaN值,跳过") continue # 使用稀疏矩阵存储(减少内存) sparse_X = sparse.csr_matrix(X.values.astype(np.float32)) X_list.append(sparse_X) y_list.append(y.values) stock_group_list.extend([stock_code] * len(X)) # 为每个样本添加股票代码作为组标识 # 定期清理内存 if len(X_list) % 100 == 0: gc.collect() print_memory_usage() except Exception as e: logger.error(f"处理股票 {stock_code} 失败: {str(e)}", exc_info=True) if not X_list: logger.error("没有可用的训练数据") return None, None, None # 合并所有数据 X_full = sparse.vstack(X_list) y_full = np.concatenate(y_list) groups = np.array(stock_group_list) logger.info(f"数据集准备完成,样本数: {X_full.shape[0]}") logger.info(f"目标分布: 0={sum(y_full==0)}, 1={sum(y_full==1)}") return X_full, y_full, groups def feature_selection(self, X, y): """执行特征选择(内存优化版)""" logger.info("执行特征选择...") # 使用基模型评估特征重要性 base_model = lgb.LGBMClassifier( n_estimators=100, random_state=42, n_jobs=-1 ) # 分批训练(减少内存占用) batch_size = 100000 for i in range(0, X.shape[0], batch_size): end_idx = min(i + batch_size, X.shape[0]) X_batch = X[i:end_idx].toarray() if sparse.issparse(X) else X[i:end_idx] y_batch = y[i:end_idx] if i == 0: base_model.fit(X_batch, y_batch) else: base_model.fit(X_batch, y_batch, init_model=base_model) # 获取特征重要性 importance = pd.Series(base_model.feature_importances_, index=self.config.PREDICT_FEATURES) importance = importance.sort_values(ascending=False) logger.info("特征重要性:\n" + importance.to_string()) # 选择前K个重要特征 k = min(15, len(self.config.PREDICT_FEATURES)) selected_features = importance.head(k).index.tolist() logger.info(f"选择前 {k} 个特征: {selected_features}") # 更新配置中的特征列表 self.config.PREDICT_FEATURES = selected_features # 转换特征矩阵 if sparse.issparse(X): # 对于稀疏矩阵,我们需要重新索引 feature_indices = [self.config.PREDICT_FEATURES.index(f) for f in selected_features] X_selected = X[:, feature_indices] else: X_selected = X[selected_features] return X_selected, selected_features def train_model(self, X, y, groups): """训练并优化模型(内存优化版)""" if X is None or len(y) == 0: logger.error("训练数据为空,无法训练模型") return None logger.info("开始训练模型...") # 1. 处理类别不平衡 pos_count = sum(y == 1) neg_count = sum(y == 0) scale_pos_weight = neg_count / pos_count logger.info(f"类别不平衡处理: 正样本权重 = {scale_pos_weight:.2f}") # 2. 特征选择 X_selected, selected_features = self.feature_selection(X, y) # 3. 自定义评分函数 - 关注正类召回率 def positive_recall_score(y_true, y_pred): return recall_score(y_true, y_pred, pos_label=1) custom_scorer = make_scorer(positive_recall_score, greater_is_better=True) # 4. 使用分组时间序列交叉验证(减少折数) group_kfold = GroupKFold(n_splits=2) # 减少折数以节省内存 cv = list(group_kfold.split(X_selected, y, groups=groups)) # 5. 创建模型 model = lgb.LGBMClassifier( objective='binary', random_state=42, n_jobs=-1, scale_pos_weight=scale_pos_weight, verbose=-1 ) # 6. 参数搜索(减少迭代次数) search = RandomizedSearchCV( estimator=model, param_distributions=self.config.PARAM_GRID, n_iter=10, # 减少迭代次数以节省内存 scoring=custom_scorer, cv=cv, verbose=2, n_jobs=1, # 减少并行任务以节省内存 pre_dispatch='2*n_jobs', # 控制任务分发 random_state=42 ) logger.info("开始参数搜索...") # 分批处理数据(减少内存占用) if sparse.issparse(X_selected): X_dense = X_selected.toarray() # 转换为密集矩阵用于搜索 else: X_dense = X_selected search.fit(X_dense, y) # 7. 使用最佳参数训练最终模型 best_params = search.best_params_ logger.info(f"最佳参数: {best_params}") logger.info(f"最佳召回率: {search.best_score_}") final_model = lgb.LGBMClassifier( **best_params, objective='binary', random_state=42, n_jobs=-1, scale_pos_weight=scale_pos_weight ) # 使用早停策略训练最终模型 logger.info("训练最终模型...") final_model.fit( X_dense, y, eval_set=[(X_dense, y)], eval_metric='binary_logloss', callbacks=[ lgb.early_stopping(stopping_rounds=50, verbose=False), lgb.log_evaluation(period=100) ] ) # 保存特征重要性 self.feature_importance = pd.Series( final_model.feature_importances_, index=selected_features ).sort_values(ascending=False) # 8. 保存模型 model_path = f"{self.model_name}.pkl" joblib.dump((final_model, selected_features), model_path) logger.info(f"模型已保存到: {model_path}") return final_model def evaluate_model(self, model, X_test, y_test): """评估模型性能""" if model is None or len(X_test) == 0: logger.warning("无法评估模型,缺少数据或模型") return # 预测测试集 y_pred = model.predict(X_test) # 计算召回率 recall = recall_score(y_test, y_pred, pos_label=1) logger.info(f"测试集召回率: {recall:.4f}") # 计算满足条件的样本比例 condition_ratio = sum(y_test == 1) / len(y_test) logger.info(f"满足条件的样本比例: {condition_ratio:.4f}") # 详细分类报告 report = classification_report(y_test, y_pred) logger.info("分类报告:\n" + report) # 特征重要性 if self.feature_importance is not None: logger.info("特征重要性:\n" + self.feature_importance.to_string()) # ========== 主程序 ========== def main(): # 初始化配置 config = StockConfig() logger.info("===== 股票上涨预测程序 (修复版) =====") # 加载训练数据(添加抽样) logger.info(f"加载训练数据: {config.START_DATE} 至 {config.END_DATE}") train_data = load_stock_data( config.SH_PATH, config.SZ_PATH, config.START_DATE, config.END_DATE, sample_fraction=config.SAMPLE_FRACTION, debug_mode=config.DEBUG_MODE, max_stocks=config.MAX_STOCKS ) if not train_data: logger.error("错误: 没有加载到任何股票数据,请检查数据路径和格式") return # 特征工程 feature_engineer = FeatureEngineer(config) # 聚类分析 - 尝试加载现有模型,否则训练新模型 cluster_model = StockCluster(config) if not cluster_model.load(): # 尝试加载模型 try: cluster_model.fit(train_data) except Exception as e: logger.error(f"聚类分析失败: {str(e)}", exc_info=True) # 创建默认聚类映射 cluster_model.cluster_map = {code: 0 for code in train_data.keys()} logger.info("使用默认聚类(所有股票归为同一类)") cluster_model.save() # 保存默认聚类模型 # 准备训练数据 trainer = StockModelTrainer(config) try: X_train, y_train, groups = trainer.prepare_dataset( train_data, cluster_model, feature_engineer ) except Exception as e: logger.error(f"准备训练数据失败: {str(e)}", exc_info=True) return if X_train is None or len(y_train) == 0: logger.error("错误: 没有可用的训练数据") return # 训练模型 model = trainer.train_model(X_train, y_train, groups) if model is None: logger.error("模型训练失败") return # 加载测试数据(添加抽样) logger.info(f"\n加载测试数据: {config.TEST_START} 至 {config.TEST_END}") test_data = load_stock_data( config.SH_PATH, config.SZ_PATH, config.TEST_START, config.TEST_END, sample_fraction=config.SAMPLE_FRACTION, debug_mode=config.DEBUG_MODE, max_stocks=config.MAX_STOCKS ) if test_data: # 准备测试数据 X_test, y_test, _ = trainer.prepare_dataset( test_data, cluster_model, feature_engineer ) if X_test is not None and len(y_test) > 0: # 评估模型 if sparse.issparse(X_test): X_test = X_test.toarray() trainer.evaluate_model(model, X_test, y_test) else: logger.warning("测试数据准备失败,无法评估模型") else: logger.warning("没有测试数据可用") logger.info("===== 程序执行完成 =====") if __name__ == "__main__": main() 这串代码有以下报错: Traceback (most recent call last): File "d:\股票量化数据库\股票量化数据库\大涨预测模型训练程序3.0.py", line 303, in transform return reduce_mem_usage(df) ^^^^^^^^^^^^^^^^^^^^ File "d:\股票量化数据库\股票量化数据库\大涨预测模型训练程序3.0.py", line 121, in reduce_mem_usage if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ TypeError: '>' not supported between instances of 'Timestamp' and 'numpy.float16' 2025-07-20 17:55:12,250 - INFO - 目标分布: 0=265, 1=0 2025-07-20 17:55:12,266 - ERROR - 特征工程失败: '>' not supported between instances of 'Timestamp' and 'numpy.float16'

给这个程序加一个变量,变量是输出股票排名,比如我只要 第x名到第y名,x小于等于y,方便我自己更改输出条件 # -*- coding: utf-8 -*- """ Created on Sun Jul 20 19:41:38 2025 @author: srx20 """ # -*- coding: utf-8 -*- """ 股票预测筛选程序 - 输出满足条件的Top 50股票并生成HTML报告 """ import os import joblib import pandas as pd import numpy as np from tqdm import tqdm from typing import Dict, List, Tuple from sklearn.preprocessing import StandardScaler from sklearn.cluster import MiniBatchKMeans import talib as ta import logging from datetime import datetime import matplotlib.pyplot as plt from matplotlib import font_manager as fm import base64 from io import BytesIO # 设置中文字体支持 plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签 plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号 # 设置日志记录 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('stock_prediction_filter.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # ========== 配置类 ========== class StockConfig: def __init__(self): # 数据路径 self.SH_PATH = r"D:\股票量化数据库\股票csv数据\上证" self.SZ_PATH = r"D:\股票量化数据库\股票csv数据\深证" # 聚类设置 self.CLUSTER_NUM = 8 self.CLUSTER_FEATURES = [ 'price_change', 'volatility', 'volume_change', 'MA5', 'MA20', 'RSI14', 'MACD_hist' ] # 目标条件 self.MIN_GAIN = 0.05 self.MIN_LOW_RATIO = 0.98 # 预测特征 (初始列表,实际使用时会动态更新) self.PREDICT_FEATURES = [ 'open', 'high', 'low', 'close', 'volume', 'price_change', 'volatility', 'volume_change', 'MA5', 'MA20', 'RSI14', 'MACD_hist', 'cluster', 'MOM10', 'ATR14', 'VWAP', 'RSI_diff', 'price_vol_ratio', 'MACD_RSI', 'advance_decline', 'day_of_week', 'month' ] # ========== 特征工程 ========== class FeatureEngineer: def __init__(self, config): self.config = config def safe_fillna(self, series, default=0): """安全填充NaN值""" if isinstance(series, pd.Series): return series.fillna(default) elif isinstance(series, np.ndarray): return np.nan_to_num(series, nan=default) return series def transform(self, df): """添加技术指标特征""" try: # 创建临时副本用于TA-Lib计算 df_temp = df.copy() # 将价格列转换为float64以满足TA-Lib要求 for col in ['open', 'high', 'low', 'close']: df_temp[col] = df_temp[col].astype(np.float64) # 基础特征 df['price_change'] = df['close'].pct_change().fillna(0) df['volatility'] = df['close'].rolling(5).std().fillna(0) df['volume_change'] = df['volume'].pct_change().fillna(0) df['MA5'] = df['close'].rolling(5).mean().fillna(0) df['MA20'] = df['close'].rolling(20).mean().fillna(0) # 技术指标 rsi = ta.RSI(df_temp['close'].values, timeperiod=14) df['RSI14'] = self.safe_fillna(rsi, 50) macd, macd_signal, macd_hist = ta.MACD( df_temp['close'].values, fastperiod=12, slowperiod=26, signalperiod=9 ) df['MACD_hist'] = self.safe_fillna(macd_hist, 0) # 新增特征 mom = ta.MOM(df_temp['close'].values, timeperiod=10) df['MOM10'] = self.safe_fillna(mom, 0) atr = ta.ATR( df_temp['high'].values, df_temp['low'].values, df_temp['close'].values, timeperiod=14 ) df['ATR14'] = self.safe_fillna(atr, 0) # 成交量加权平均价 vwap = (df['volume'] * (df['high'] + df['low'] + df['close']) / 3).cumsum() / df['volume'].cumsum() df['VWAP'] = self.safe_fillna(vwap, 0) # 相对强弱指数差值 df['RSI_diff'] = df['RSI14'] - df['RSI14'].rolling(5).mean().fillna(0) # 价格波动比率 df['price_vol_ratio'] = df['price_change'] / (df['volatility'].replace(0, 1e-8) + 1e-8) # 技术指标组合特征 df['MACD_RSI'] = df['MACD_hist'] * df['RSI14'] # 市场情绪指标 df['advance_decline'] = (df['close'] > df['open']).astype(int).rolling(5).sum().fillna(0) # 时间特征 df['day_of_week'] = df['date'].dt.dayofweek df['month'] = df['date'].dt.month # 处理无穷大和NaN df = df.replace([np.inf, -np.inf], np.nan) df = df.fillna(0) return df except Exception as e: logger.error(f"特征工程失败: {str(e)}", exc_info=True) # 返回基本特征作为回退方案 df['price_change'] = df['close'].pct_change().fillna(0) df['volatility'] = df['close'].rolling(5).std().fillna(0) df['volume_change'] = df['volume'].pct_change().fillna(0) df['MA5'] = df['close'].rolling(5).mean().fillna(0) df['MA20'] = df['close'].rolling(20).mean().fillna(0) # 填充缺失的技术指标 for col in self.config.PREDICT_FEATURES: if col not in df.columns: df[col] = 0 return df # ========== 聚类模型 ========== class StockCluster: def __init__(self, config): self.config = config self.scaler = StandardScaler() self.kmeans = MiniBatchKMeans( n_clusters=config.CLUSTER_NUM, random_state=42, batch_size=1000 ) self.cluster_map = {} # 股票代码到聚类ID的映射 self.model_file = "stock_cluster_model.pkl" # 模型保存路径 def load(self): """从文件加载聚类模型""" if os.path.exists(self.model_file): model_data = joblib.load(self.model_file) self.kmeans = model_data['kmeans'] self.scaler = model_data['scaler'] self.cluster_map = model_data['cluster_map'] logger.info(f"从 {self.model_file} 加载聚类模型") return True else: logger.warning("聚类模型文件不存在") return False def transform(self, df, stock_code): """为数据添加聚类特征""" cluster_id = self.cluster_map.get(stock_code, -1) # 默认为-1表示未知聚类 df['cluster'] = cluster_id return df # ========== 数据加载函数 ========== def load_prediction_data(sh_path: str, sz_path: str, lookback_days: int = 30) -> Dict[str, pd.DataFrame]: """ 加载用于预测的股票数据(只加载最近lookback_days天的数据) """ stock_data = {} exchanges = [ ('SH', sh_path), ('SZ', sz_path) ] total_files = 0 for exchange, path in exchanges: if os.path.exists(path): csv_files = [f for f in os.listdir(path) if f.endswith('.csv')] total_files += len(csv_files) if total_files == 0: logger.warning("没有找到任何CSV文件") return stock_data pbar = tqdm(total=total_files, desc='加载股票数据') for exchange, path in exchanges: if not os.path.exists(path): continue for file in os.listdir(path): if not file.endswith('.csv'): continue stock_code = f"{exchange}_{file.split('.')[0]}" file_path = os.path.join(path, file) try: # 读取整个文件 df = pd.read_csv(file_path) # 验证必要的列是否存在 required_cols = ['date', 'open', 'high', 'low', 'close', 'volume'] if not all(col in df.columns for col in required_cols): logger.debug(f"股票 {stock_code} 缺少必要列,跳过") pbar.update(1) continue # 转换日期并排序 df['date'] = pd.to_datetime(df['date']) df = df.sort_values('date', ascending=False) # 只取最近lookback_days天的数据 if len(df) > lookback_days: df = df.head(lookback_days) # 转换数据类型 for col in ['open', 'high', 'low', 'close']: df[col] = pd.to_numeric(df[col], errors='coerce').astype(np.float32) df['volume'] = pd.to_numeric(df['volume'], errors='coerce').astype(np.uint32) # 删除包含NaN的行 df = df.dropna(subset=required_cols) if len(df) > 0: stock_data[stock_code] = df logger.debug(f"成功加载股票 {stock_code},数据条数: {len(df)}") else: logger.warning(f"股票 {stock_code} 无有效数据") except Exception as e: logger.error(f"加载股票 {stock_code} 失败: {str(e)}", exc_info=True) pbar.update(1) pbar.close() logger.info(f"成功加载 {len(stock_data)} 只股票数据") return stock_data # ========== 生成HTML报告 ========== def generate_html_report(top_stocks: List[Tuple[str, float]], prediction_date: str, model_version: str = "1.0") -> str: """ 生成HTML格式的预测报告 参数: top_stocks: 包含(股票代码, 概率)元组的列表 prediction_date: 预测日期 model_version: 模型版本号 返回: HTML字符串 """ # 创建DataFrame df = pd.DataFrame(top_stocks, columns=['股票代码', '上涨概率']) df['排名'] = range(1, len(df) + 1) # 创建技术指标图表 plt.figure(figsize=(10, 6)) plt.bar(df['股票代码'], df['上涨概率'], color='skyblue') plt.title('Top 50股票上涨概率分布', fontsize=16) plt.xlabel('股票代码', fontsize=12) plt.ylabel('上涨概率', fontsize=12) plt.xticks(rotation=90, fontsize=8) plt.ylim(0.7, 1.0) plt.grid(axis='y', linestyle='--', alpha=0.7) # 将图表转换为Base64编码 buf = BytesIO() plt.savefig(buf, format='png', bbox_inches='tight') buf.seek(0) chart_base64 = base64.b64encode(buf.read()).decode('utf-8') plt.close() # 生成HTML内容 html_content = f""" <!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>大涨小跌前50预测结果</title> <style> body {{ font-family: 'Microsoft YaHei', sans-serif; margin: 0; padding: 20px; background-color: #f5f7fa; color: #333; }} .container {{ max-width: 1200px; margin: 0 auto; background-color: white; border-radius: 10px; box-shadow: 0 0 20px rgba(0, 0, 0, 0.1); padding: 30px; }} .header {{ text-align: center; padding-bottom: 20px; border-bottom: 1px solid #eee; margin-bottom: 30px; }} .header h1 {{ color: #1e3a8a; margin-bottom: 10px; }} .header .subtitle {{ color: #6b7280; font-size: 18px; }} .info-box {{ background-color: #f0f7ff; border-left: 4px solid #3b82f6; padding: 15px; margin-bottom: 30px; border-radius: 0 5px 5px 0; }} .chart-container {{ text-align: center; margin-bottom: 30px; }} .chart-container img {{ max-width: 100%; border-radius: 5px; box-shadow: 0 0 10px rgba(0, 0, 0, 0.1); }} table {{ width: 100%; border-collapse: collapse; margin-bottom: 30px; }} th, td {{ padding: 12px 15px; text-align: center; border-bottom: 1px solid #e5e7eb; }} th {{ background-color: #3b82f6; color: white; font-weight: bold; }} tr:nth-child(even) {{ background-color: #f9fafb; }} tr:hover {{ background-color: #f0f7ff; }} .footer {{ text-align: center; padding-top: 20px; border-top: 1px solid #eee; color: #6b7280; font-size: 14px; }} .highlight {{ color: #10b981; font-weight: bold; }} .rank-1 {{ background-color: #ffeb3b; }} .rank-2 {{ background-color: #e0e0e0; }} .rank-3 {{ background-color: #ff9800; }} </style> </head> <body> 大涨小跌前50预测结果 基于机器学习模型的股票预测分析 预测日期:{prediction_date} 模型版本:{model_version} 筛选条件:收盘价 > 开盘价 × 105% 且 最低价 > 开盘价 × 98% 说明:本报告基于历史数据预测,不构成投资建议 Top 50股票上涨概率分布图 股票上涨概率分布图 详细预测结果 排名 股票代码 上涨概率 预测评级 """ # 添加表格行 for i, (stock_code, prob) in enumerate(top_stocks): rank = i + 1 rating = "" row_class = "" if prob >= 0.95: rating = "⭐⭐⭐⭐⭐" elif prob >= 0.9: rating = "⭐⭐⭐⭐" elif prob >= 0.85: rating = "⭐⭐⭐" elif prob >= 0.8: rating = "⭐⭐" else: rating = "⭐" if rank == 1: row_class = "class='rank-1'" elif rank == 2: row_class = "class='rank-2'" elif rank == 3: row_class = "class='rank-3'" html_content += f""" {rank} {stock_code} {prob:.4f} {rating} """ # 添加HTML尾部 html_content += f""" 生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | 预测模型:LightGBM分类器 © 2025 股票量化分析系统 | 本报告仅供研究参考 </body> </html> """ return html_content # ========== 主预测函数 ========== def predict_top_stocks(model_path: str = "stock_prediction_model.pkl", top_n: int = 50) -> List[Tuple[str, float]]: """ 预测满足条件的Top N股票并生成HTML报告 """ # 1. 初始化配置 config = StockConfig() logger.info("===== 股票预测筛选程序 =====") # 2. 加载模型 if not os.path.exists(model_path): logger.error(f"模型文件 {model_path} 不存在") return [] try: model, selected_features = joblib.load(model_path) logger.info(f"成功加载预测模型,使用特征: {selected_features}") except Exception as e: logger.error(f"加载模型失败: {str(e)}", exc_info=True) return [] # 3. 加载聚类模型 cluster_model = StockCluster(config) cluster_model_loaded = cluster_model.load() if not cluster_model_loaded: logger.warning("无法加载聚类模型,使用默认聚类") # 4. 加载股票数据(最近30天) logger.info("加载股票数据...") stock_data = load_prediction_data(config.SH_PATH, config.SZ_PATH, lookback_days=30) if not stock_data: logger.error("没有加载到任何股票数据") return [] # 5. 初始化特征工程 feature_engineer = FeatureEngineer(config) # 6. 准备预测数据 predictions = [] logger.info("处理股票数据并进行预测...") for stock_code, df in tqdm(stock_data.items(), desc="预测股票"): try: # 确保数据按日期升序排列(用于正确计算指标) df = df.sort_values('date', ascending=True) # 特征工程 df = feature_engineer.transform(df.copy()) # 添加聚类特征 if cluster_model_loaded: df = cluster_model.transform(df, stock_code) # 获取最新一天的数据(用于预测) latest_data = df.iloc[-1:].copy() # 确保所有特征都存在 for feature in selected_features: if feature not in latest_data.columns: latest_data[feature] = 0 # 选择模型使用的特征 X_pred = latest_data[selected_features] # 预测概率(类别1的概率) proba = model.predict_proba(X_pred)[0, 1] # 添加到预测结果 predictions.append((stock_code, proba)) except Exception as e: logger.error(f"处理股票 {stock_code} 失败: {str(e)}", exc_info=True) # 7. 按概率排序并取Top N predictions.sort(key=lambda x: x[1], reverse=True) top_predictions = predictions[:top_n] # 8. 生成HTML报告 prediction_date = datetime.now().strftime("%Y-%m-%d") html_content = generate_html_report(top_predictions, prediction_date) # 9. 保存HTML报告 html_file = "大涨小跌前50预测结果.html" with open(html_file, "w", encoding="utf-8") as f: f.write(html_content) logger.info(f"已生成HTML报告: {html_file}") return top_predictions if __name__ == "__main__": # 运行预测并获取Top 50股票 top_stocks = predict_top_stocks(top_n=50) # 同时保存CSV结果 if top_stocks: result_df = pd.DataFrame(top_stocks, columns=['股票代码', '上涨概率']) result_df.to_csv('大涨小跌前50预测结果.csv', index=False, encoding='utf-8-sig') logger.info("结果已保存到 大涨小跌前50预测结果.csv")

zip

最新推荐

recommend-type

永磁同步电机全速域无传感器控制技术及其应用 加权切换法

内容概要:本文详细探讨了永磁同步电机(PMSM)在全速域范围内的无传感器控制技术。针对不同的速度区间,提出了三种主要的控制方法:零低速域采用高频脉振方波注入法,通过注入高频方波信号并处理产生的交互信号来估算转子位置;中高速域则使用改进的滑膜观测器,结合连续的sigmoid函数和PLL锁相环,实现对转子位置的精确估计;而在转速切换区域,则采用了加权切换法,动态调整不同控制方法的权重,确保平滑过渡。这些方法共同实现了电机在全速域内的高效、稳定运行,减少了对传感器的依赖,降低了系统复杂度和成本。 适合人群:从事电机控制系统设计、研发的技术人员,尤其是关注永磁同步电机无传感器控制领域的研究人员和技术爱好者。 使用场景及目标:适用于需要优化电机控制系统,减少硬件成本和提升系统可靠性的应用场景。目标是在不依赖额外传感器的情况下,实现电机在各种速度条件下的精准控制。 其他说明:文中引用了多篇相关文献,为每种控制方法提供了理论依据和实验验证的支持。
recommend-type

langchain4j-spring-boot-starter-0.29.1.jar中文文档.zip

1、压缩文件中包含: 中文文档、jar包下载地址、Maven依赖、Gradle依赖、源代码下载地址。 2、使用方法: 解压最外层zip,再解压其中的zip包,双击 【index.html】 文件,即可用浏览器打开、进行查看。 3、特殊说明: (1)本文档为人性化翻译,精心制作,请放心使用; (2)只翻译了该翻译的内容,如:注释、说明、描述、用法讲解 等; (3)不该翻译的内容保持原样,如:类名、方法名、包名、类型、关键字、代码 等。 4、温馨提示: (1)为了防止解压后路径太长导致浏览器无法打开,推荐在解压时选择“解压到当前文件夹”(放心,自带文件夹,文件不会散落一地); (2)有时,一套Java组件会有多个jar,所以在下载前,请仔细阅读本篇描述,以确保这就是你需要的文件。 5、本文件关键字: jar中文文档.zip,java,jar包,Maven,第三方jar包,组件,开源组件,第三方组件,Gradle,中文API文档,手册,开发手册,使用手册,参考手册。
recommend-type

4节点光储直流微网:基于多目标控制与多智能体一致性的光伏MPPT与储能双向DCDC优化

内容概要:本文介绍了一个15kW、400V级的四节点光储直流微网系统的设计与实现。该系统采用多目标控制、多智能体一致性及二次优化技术,旨在实现高效的能量管理和稳定的系统运行。具体而言,光伏MPPT采用粒子群算法以最大化太阳能利用率;储能系统的双向DCDC变换器则运用了多种控制策略,如电流内环的模型预测控制、电压环的分布式控制、初级控制的下垂策略以及二次控制的差异性和电压补偿策略。这些措施共同确保了直流母线电压的恢复和储能系统的SoC一致控制。 适合人群:从事电力电子、新能源技术和微网系统研究的专业人士,尤其是关注光储直流微网系统设计与优化的研究人员和技术人员。 使用场景及目标:适用于需要深入了解光储直流微网系统设计原理及其控制策略的应用场合,如科研机构、高校实验室、电力公司等。目标是为相关领域的研究人员提供理论依据和技术支持,促进微网技术的发展。 其他说明:文中详细讨论了各部分的具体实现方法和技术细节,对于希望深入理解光储直流微网系统运作机制的读者非常有帮助。
recommend-type

电动汽车BMS电池管理系统应用层软件模型:MBD方法、通信协议及AUTOSAR构建 MBD建模

基于模型开发(MBD)的BMS电池管理系统应用层软件模型。首先概述了BMS的核心任务,即确保电池的安全与高效运行,涉及充电、放电控制、实时监测和均衡管理。接着重点讨论了SUMlink电池管理系统策略模型,该模型通过收集和处理电池的数据(如电压、电流、温度),并运用多种算法(如SOC估算、SOH评估)来优化电池性能。文中还阐述了BMC CVS内部通讯协议DBC的作用,确保各模块间数据传输的准确性与效率。此外,文章介绍了采用AUTOSAR标准的底层Build工程,提高了系统的可维护性、可扩展性和可靠性。最后提到了INCA A2L标定文件的应用,用于配置和调整系统参数,以满足不同需求。通过代码分析与实践,进一步加深了对BMS的理解。 适合人群:从事电动汽车电池管理系统研究与开发的技术人员,尤其是对MBD方法、通信协议和AUTOSAR标准感兴趣的工程师。 使用场景及目标:适用于希望深入了解BMS系统的设计原理和技术细节的专业人士,旨在提高他们对该领域的理论认知和实际操作能力。 其他说明:文章不仅涵盖了理论知识,还包括具体的代码实现和实践指导,有助于读者全面掌握BMS的工作机制。
recommend-type

基于LPV、OFRMPC和PTC的变速单移线鲁棒模型预测控制及其Simulink与CarSim联合仿真 - 模型预测控制 (07月)

内容概要:本文介绍了线性参变(LPV)+输出反馈鲁棒模型预测控制(OFRMPC)+路径跟踪(PTC)技术在变速单移线工况中的应用。该技术通过Simulink和CarSim8.02联合仿真,实现了20-25m/s的速度范围内高效稳定的路径跟踪控制。文中详细阐述了LPV技术和OFRMPC的工作原理,以及它们在应对速度和侧偏刚度变化、质心侧偏角鲁棒估计等方面的优越性。此外,还介绍了基于二自由度模型和LMI设计的控制器的具体实现方法,包括上层控制率在线求解和下层最优化算法求解四轮转矩的过程。 适合人群:从事自动驾驶、智能交通系统研究的技术人员,尤其是对模型预测控制(MPC)、线性参变(LPV)和鲁棒控制感兴趣的科研人员和工程师。 使用场景及目标:适用于需要高精度路径跟踪控制的自动驾驶车辆,在复杂工况下如速度变化、侧偏刚度变化等情况中,确保车辆的稳定性和高效性。目标是为研究人员提供一种可靠的控制策略和技术实现方案。 其他说明:本文提供了完整的仿真环境配置指南,包括MATLAB2020a以上的版本和CarSim8.02版本,附带了详细的说明文档和参考文献,方便读者理解和复现实验结果。
recommend-type

Webdiy.net新闻系统v1.0企业版发布:功能强大、易操作

标题中提到的"Webdiy.net新闻系统 v1.0 企业版"是一个针对企业级应用开发的新闻内容管理系统,是基于.NET框架构建的。从描述中我们可以提炼出以下知识点: 1. **系统特性**: - **易用性**:系统设计简单,方便企业用户快速上手和操作。 - **可定制性**:用户可以轻松修改网站的外观和基本信息,例如网页标题、页面颜色、页眉和页脚等,以符合企业的品牌形象。 2. **数据库支持**: - **Access数据库**:作为轻量级数据库,Access对于小型项目和需要快速部署的场景非常合适。 - **Sql Server数据库**:适用于需要强大数据处理能力和高并发支持的企业级应用。 3. **性能优化**: - 系统针对Access和Sql Server数据库进行了特定的性能优化,意味着它能够提供更为流畅的用户体验和更快的数据响应速度。 4. **编辑器功能**: - **所见即所得编辑器**:类似于Microsoft Word,允许用户进行图文混排编辑,这样的功能对于非技术人员来说非常友好,因为他们可以直观地编辑内容而无需深入了解HTML或CSS代码。 5. **图片管理**: - 新闻系统中包含在线图片上传、浏览和删除的功能,这对于新闻编辑来说是非常必要的,可以快速地为新闻内容添加相关图片,并且方便地进行管理和更新。 6. **内容发布流程**: - **审核机制**:后台发布新闻后,需经过审核才能显示到网站上,这样可以保证发布的内容质量,减少错误和不当信息的传播。 7. **内容排序与类别管理**: - 用户可以按照不同的显示字段对新闻内容进行排序,这样可以突出显示最新或最受欢迎的内容。 - 新闻类别的动态管理及自定义显示顺序,可以灵活地对新闻内容进行分类,方便用户浏览和查找。 8. **前端展示**: - 系统支持Javascript前端页面调用,这允许开发者将系统内容嵌入到其他网页或系统中。 - 支持iframe调用,通过这种HTML元素可以将系统内容嵌入到网页中,实现了内容的跨域展示。 9. **安全性**: - 提供了默认的管理账号和密码(webdiy / webdiy.net),对于企业应用来说,这些默认的凭证需要被替换,以保证系统的安全性。 10. **文件结构**: - 压缩包文件名称为"webdiynetnews",这可能是系统的根目录名称或主要安装文件。 11. **技术栈**: - 系统基于ASP.NET技术构建,这表明它使用.NET框架开发,并且可以利用.NET生态中的各种库和工具来实现功能的扩展和维护。 在实施和部署这样的系统时,企业可能还需要考虑以下方面: - **可扩展性**:随着业务的增长,系统应该能容易地扩展,以支持更多的用户和内容。 - **安全性**:除了更改为安全的管理员凭证外,还需考虑防止SQL注入、XSS攻击等网络安全问题。 - **备份和恢复**:定期备份数据库和内容是很重要的,以防数据丢失或需要恢复到先前的状态。 - **多用户和权限管理**:企业环境可能需要多级用户权限设置,以适应不同的团队和角色。 - **SEO优化**:系统应支持良好的搜索引擎优化实践,以便于内容在搜索引擎中的排名和发现。 - **移动适应性**:由于移动设备的广泛使用,系统应具备良好的响应式设计,以提供良好的移动用户体验。 根据描述,该系统似乎是一个成熟的解决方案,适用于需要快速搭建新闻网站的企业用户,但同时也应该注意定期的维护和更新,以确保系统的稳定性和安全性。
recommend-type

【Windows 11用户的福音】:一步到位解决GX Works2安装问题,让兼容性不再是问题!

# 1. GX Works2简介与Windows 11兼容性问题 ## 简介GX Works2 GX Works2是由三菱电机开发的一款针对其PLC(可编程逻辑控制器)产品的编程和调试软件。该软件以其直观的用户界面和强大的编程功能闻名于工业自动化领域,支持从简单的逻辑控制到复杂的系统集成项目。随着Windows操作系统的迭代升级,尤其是新推
recommend-type

电磁加热双边谐振和单边谐振的区别

<think>我们正在处理关于电磁加热技术中双边谐振与单边谐振的工作原理及区别的问题。 首先,需要明确用户意图是了解这两种谐振方式的区别以及它们的工作原理。 我们将通过以下步骤进行: 1. 调用网络搜索工具,检索关于电磁加热中双边谐振和单边谐振的信息。 2. 整合检索到的内容,按照要求生成回答,包括工作原理和区别,并组织成多个方法或步骤(但避免使用步骤词汇,用换行分隔)。 3. 在回答中,如果有公式或代码,按照指定格式处理。 4. 最后,添加相关问题部分。 注意:避免使用第一人称,避免步骤词汇,引用内容不集中末尾,而是融入回答中。 根据搜索,电磁加热中的谐振通常指的是感应加
recommend-type

EnvMan源代码压缩包内容及功能解析

根据给定文件信息,我们需要生成关于“EnvMan-source.zip”这一压缩包的知识点。首先,由于提供的信息有限,我们无法直接得知EnvMan-source.zip的具体内容和功能,但可以通过标题、描述和标签中的信息进行推断。文件名称列表只有一个“EnvMan”,这暗示了压缩包可能包含一个名为EnvMan的软件或项目源代码。以下是一些可能的知识点: ### EnvMan软件/项目概览 EnvMan可能是一个用于环境管理的工具或框架,其源代码被打包并以“EnvMan-source.zip”的形式进行分发。通常,环境管理相关的软件用于构建、配置、管理和维护应用程序的运行时环境,这可能包括各种操作系统、服务器、中间件、数据库等组件的安装、配置和版本控制。 ### 源代码文件说明 由于只有一个名称“EnvMan”出现在文件列表中,我们可以推测这个压缩包可能只包含一个与EnvMan相关的源代码文件夹。源代码文件夹可能包含以下几个部分: - **项目结构**:展示EnvMan项目的基本目录结构,通常包括源代码文件(.c, .cpp, .java等)、头文件(.h, .hpp等)、资源文件(图片、配置文件等)、文档(说明文件、开发者指南等)、构建脚本(Makefile, build.gradle等)。 - **开发文档**:可能包含README文件、开发者指南或者项目wiki,用于说明EnvMan的功能、安装、配置、使用方法以及可能的API说明或开发者贡献指南。 - **版本信息**:在描述中提到了版本号“-1101”,这表明我们所见的源代码包是EnvMan的1101版本。通常版本信息会详细记录在版本控制文件(如ChangeLog或RELEASE_NOTES)中,说明了本次更新包含的新特性、修复的问题、已知的问题等。 ### 压缩包的特点 - **命名规范**:标题、描述和标签中的一致性表明这是一个正式发布的软件包。通常,源代码包的命名会遵循一定的规范,如“项目名称-版本号-类型”,在这里类型是“source”。 - **分发形式**:以.zip格式的压缩包进行分发,是一种常见的软件源代码分发方式。虽然较现代的版本控制系统(如Git、Mercurial)通常支持直接从仓库克隆源代码,但打包成zip文件依然是一种便于存储和传输的手段。 ### 可能的应用场景 - **开发环境配置**:EnvMan可能是用于创建、配置和管理开发环境的工具,这种工具在开发人员设置新的开发机或新的项目环境时非常有用。 - **自动化部署**:EnvMan可能包含自动化部署环境的脚本或命令,使得部署流程变得快捷且高效。 - **监控与维护**:作为环境管理工具,EnvMan可能还支持对环境的监控功能,包括系统资源监控、服务状态检查等,以保证生产环境的稳定性。 ### 总结 尽管以上知识点是基于有限的信息进行的假设性推论,但EnvMan-source.zip包可能是一个用于环境管理的软件或项目的源代码包。该软件或项目可能包含构建和部署自动化环境的能力,以及对运行时环境的监控和维护。文件命名的一致性暗示这是一个正式的版本发布。如果要深入了解EnvMan的功能与用法,建议直接查看压缩包中的文档或源代码注释。同时,考虑到源代码的开发,我们还应该探究该项目所使用的技术栈、编程语言以及版本控制工具等,这将有助于进一步了解EnvMan的技术细节。
recommend-type

【Windows 11终极解决方案】:彻底攻克GX Works2安装中难缠的.Net Framework 3.5障碍!

# 1. Windows 11与GX Works2简介 ## 1.1 Windows 11操作系统概览 Windows 11,作为微软最新的操作系统,不仅仅提供了一种现代的用户体验,而且加强了在企业环境中的安全性与生产力工具。其引入了全新的界面设计、改进的多任务处理以及对Android应用的支持,使它成为IT专业人