活动介绍
file-type

解决mediaplayer无法使用'vids:XVID'问题的方法指南

RAR文件

5星 · 超过95%的资源 | 下载需积分: 50 | 10.27MB | 更新于2025-06-05 | 140 浏览量 | 653 下载量 举报 1 收藏
download 立即下载
标题中的“XviD2 vids:XVID”指的是XviD编解码器的2.0版本,而“vids:XVID”可能是指在某个解码器组件或系统注册表中引用的XviD视频解码器标识。在处理视频和多媒体内容时,编解码器是至关重要的组件,它们能够对视频进行压缩和解压缩,使得视频文件更小,便于存储和传输,同时还能保持相对合理的画质。 描述部分提到“delphi等编译工具 mediaplayer无法使用视频”,这说明在使用某些编程工具(例如Delphi)编译的应用程序中,如果使用了标准的MediaPlayer组件,可能会遇到无法播放某些视频的问题。这通常发生在视频格式需要特定的解码器支持,而MediaPlayer默认并没有包含这些解码器。因此,用户可能会遇到一个错误提示,表明系统找不到名为“vids:XVID”的解压缩程序,或者提示缺少相应的视频解码器组件。 为了解决这个问题,可以采取以下几种措施: 1. 安装XviD解码器:用户需要下载并安装XviD解码器,这样系统就能够识别并播放XviD编码的视频文件了。从给出的文件列表来看,Xvid-1.3.2-20110531.exe是XviD的安装程序,用户只需运行这个程序,并按照向导提示完成安装即可。 2. 手动注册解码器:在某些情况下,安装程序可能无法自动注册解码器,或者用户可能需要手动指定解码器的位置。在这种情况下,可以查看Readme-说明.htm和该软件介绍说明及注意事项.txt文件,获取具体的操作指导和配置方法。 3. 更新系统解码器信息:安装XviD解码器后,有时候需要重启系统或者重新加载MediaPlayer组件,以确保新的解码器信息被系统正确加载和识别。 4. 使用第三方播放器:如果MediaPlayer组件的问题无法通过安装解码器解决,可以考虑使用其他支持XviD编码的第三方视频播放器,如VLC或KMPlayer等。 5. 检查编译工具:对于开发者来说,如果问题出现在自己编译的应用程序中,应该检查应用程序是否正确地包含了对XviD视频格式的支持,或者是否正确调用了系统中的解码器组件。 6. 代码级别处理:在Delphi等编程工具中,如果标准的MediaPlayer无法满足需求,可以考虑使用更高级的第三方组件或者库来处理视频播放任务,例如DirectShow、FFmpeg等。 综上所述,要解决MediaPlayer无法播放XviD视频的问题,核心在于确保系统中安装了正确的视频解码器,并且配置了正确的解码路径。在这个案例中,通过安装XviD解码器和参考安装包内的使用说明文档,应该能够解决大部分问题。如果问题依旧存在,可能还需要进一步检查和调试应用程序的代码。

相关推荐

filetype

1.给出模型的auc;2.模型的预测结果,第一列是did,第二列只保留点击概率最高的vid,第三列是预计的完播率,按照上面三个要求调整代码并检查无误:import pandas as pd import numpy as np import lightgbm as lgb from lightgbm import early_stopping, log_evaluation import gc import os import chardet from sklearn.model_selection import train_test_split from tqdm import tqdm import joblib from datetime import datetime # 1. 增强数据加载函数(添加列存在性检查) def load_data_safely(file_path, usecols=None, dtype=None, chunksize=100000): """安全加载大型CSV文件,优化内存使用""" try: if not os.path.exists(file_path): print(f"⚠️ 文件不存在: {file_path}") return pd.DataFrame() # 自动检测编码 with open(file_path, 'rb') as f: result = chardet.detect(f.read(100000)) encoding = result['encoding'] if result['confidence'] > 0.7 else 'latin1' # 分批读取并优化内存 chunks = [] reader = pd.read_csv( file_path, encoding=encoding, usecols=usecols, dtype=dtype, chunksize=chunksize, low_memory=False ) for chunk in tqdm(reader, desc=f"加载 {os.path.basename(file_path)}"): # 优化分类列内存 for col in chunk.columns: if dtype and col in dtype and dtype[col] == 'category': chunk[col] = chunk[col].astype('category').cat.as_ordered() chunks.append(chunk) if chunks: return pd.concat(chunks, ignore_index=True) return pd.DataFrame() except Exception as e: print(f"⚠️ 加载 {file_path} 失败: {str(e)}") return pd.DataFrame() # 2. 优化历史数据加载(添加列存在性检查) def load_historical_data(days=32): """高效加载历史数据,支持分批处理""" see_list, click_list, play_list = [], [], [] for day in tqdm(range(1, days + 1), desc="加载历史数据"): day_str = f"{day:02d}" # 加载曝光数据 see_path = f'see_{day_str}.csv' if os.path.exists(see_path): see = load_data_safely(see_path, usecols=['did', 'vid'], dtype={'did': 'category', 'vid': 'category'}) if not see.empty and 'did' in see.columns and 'vid' in see.columns: see_list.append(see) del see gc.collect() # 加载点击数据 click_path = f'click_{day_str}.csv' if os.path.exists(click_path): click = load_data_safely(click_path, usecols=['did', 'vid', 'click_time'], dtype={'did': 'category', 'vid': 'category'}) if not click.empty and 'click_time' in click.columns and 'did' in click.columns and 'vid' in click.columns: # 优化日期处理 click['date'] = pd.to_datetime(click['click_time'], errors='coerce').dt.date click = click.drop(columns=['click_time'], errors='ignore') click_list.append(click[['did', 'vid', 'date']]) del click gc.collect() # 加载播放数据 play_path = f'playplus_{day_str}.csv' if os.path.exists(play_path): play = load_data_safely(play_path, usecols=['did', 'vid', 'play_time'], dtype={'did': 'category', 'vid': 'category'}) if not play.empty and 'play_time' in play.columns and 'did' in play.columns and 'vid' in play.columns: play_list.append(play) del play gc.collect() gc.collect() # 确保返回三个DataFrame,即使某些为空 return ( pd.concat(see_list).drop_duplicates(['did', 'vid']) if see_list else pd.DataFrame(), pd.concat(click_list).drop_duplicates(['did', 'vid']) if click_list else pd.DataFrame(), pd.concat(play_list).drop_duplicates(['did', 'vid']) if play_list else pd.DataFrame() ) # 定义内存优化的数据类型(添加列存在性检查) dtypes = { 'did': 'category', 'vid': 'category', 'play_time': 'float32' } # 可选特征 - 只有在数据中存在时才添加 optional_features = { 'item_cid': 'category', 'item_type': 'category', 'item_assetSource': 'category', 'item_classify': 'category', 'item_isIntact': 'category', 'sid': 'category', 'stype': 'category' } # 添加特征字段 for i in range(88): dtypes[f'f{i}'] = 'float32' # 加载核心数据 print("开始加载核心数据...") did_features = load_data_safely('did_features_table.csv', dtype=dtypes) vid_info = load_data_safely('vid_info_table.csv', dtype=dtypes) # 添加可选特征到dtypes(仅当列存在时) for feature, dtype in optional_features.items(): if not vid_info.empty and feature in vid_info.columns: dtypes[feature] = dtype # 重新加载数据以确保所有列使用正确的数据类型 if os.path.exists('did_features_table.csv'): did_features = load_data_safely('did_features_table.csv', dtype=dtypes) else: print("⚠️ did_features_table.csv 不存在") did_features = pd.DataFrame() if os.path.exists('vid_info_table.csv'): vid_info = load_data_safely('vid_info_table.csv', dtype=dtypes) else: print("⚠️ vid_info_table.csv 不存在") vid_info = pd.DataFrame() # 加载历史数据 - 确保所有变量都被定义 print("开始加载历史数据...") hist_exposure, hist_click, hist_play = load_historical_data(days=32) # 打印历史数据状态 print(f"历史曝光数据形状: {hist_exposure.shape if not hist_exposure.empty else '空'}") print(f"历史点击数据形状: {hist_click.shape if not hist_click.empty else '空'}") print(f"历史播放数据形状: {hist_play.shape if not hist_play.empty else '空'}") # 3. 优化点击数据集构建(添加空数据检查) def build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1): """构建点击数据集,包含负样本采样""" if hist_exposure.empty or hist_click.empty: print("⚠️ 历史曝光或点击数据为空,无法构建数据集") return pd.DataFrame() # 标记正样本 hist_click = hist_click.copy() hist_click['label'] = 1 # 高效标记负样本(使用集合操作) exposure_set = set(zip(hist_exposure['did'], hist_exposure['vid'])) click_set = set(zip(hist_click['did'], hist_click['vid'])) # 找出未点击的曝光 negative_set = exposure_set - click_set # 创建负样本DataFrame if negative_set: negative_dids, negative_vids = zip(*negative_set) negative_samples = pd.DataFrame({ 'did': list(negative_dids), 'vid': list(negative_vids), 'label': 0 }) # 采样负样本 if sample_ratio < 1.0: negative_samples = negative_samples.sample(frac=sample_ratio, random_state=42) else: negative_samples = pd.DataFrame(columns=['did', 'vid', 'label']) # 合并数据集 click_data = pd.concat([ hist_click[['did', 'vid', 'label']], negative_samples ], ignore_index=True) # 释放内存 del exposure_set, click_set, negative_set, negative_samples gc.collect() return click_data # 使用10%负样本比例 - 确保hist_click已定义 if not hist_exposure.empty and not hist_click.empty: print("构建点击数据集...") click_train_data = build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1) else: print("⚠️ 无法构建点击数据集,因为历史曝光或点击数据为空") click_train_data = pd.DataFrame() # 4. 优化特征工程(解决分类特征问题) def add_click_features(df, did_features, vid_info, hist_click, hist_play): """添加关键特征,避免内存溢出""" if df.empty: return df # 基础特征(使用索引加速合并) if not did_features.empty and 'did' in did_features.columns: df = df.merge(did_features, on='did', how='left') if not vid_info.empty and 'vid' in vid_info.columns: df = df.merge(vid_info, on='vid', how='left') # 用户行为统计 user_click_count = pd.Series(dtype='int') if not hist_click.empty and 'did' in hist_click.columns: user_click_count = hist_click.groupby('did').size().rename('user_click_count') if not user_click_count.empty: df = df.merge(user_click_count, on='did', how='left') else: df['user_click_count'] = 0 user_total_play = pd.Series(dtype='float') if not hist_play.empty and 'did' in hist_play.columns and 'play_time' in hist_play.columns: user_total_play = hist_play.groupby('did')['play_time'].sum().rename('user_total_play') if not user_total_play.empty: df = df.merge(user_total_play, on='did', how='left') else: df['user_total_play'] = 0 # 视频热度统计 video_click_count = pd.Series(dtype='int') if not hist_click.empty and 'vid' in hist_click.columns: video_click_count = hist_click.groupby('vid').size().rename('video_click_count') if not video_click_count.empty: df = df.merge(video_click_count, on='vid', how='left') else: df['video_click_count'] = 0 avg_play_time = pd.Series(dtype='float') if not hist_play.empty and 'vid' in hist_play.columns and 'play_time' in hist_play.columns: avg_play_time = hist_play.groupby('vid')['play_time'].mean().rename('avg_play_time') if not avg_play_time.empty: df = df.merge(avg_play_time, on='vid', how='left') else: df['avg_play_time'] = 0 # 填充缺失值 fill_values = { 'user_click_count': 0, 'user_total_play': 0, 'video_click_count': df['video_click_count'].median() if 'video_click_count' in df else 0, 'avg_play_time': df['avg_play_time'].median() if 'avg_play_time' in df else 0 } for col, value in fill_values.items(): if col in df: df[col] = df[col].fillna(value) # 添加时间相关特征 if 'date' in df: df['day_of_week'] = pd.to_datetime(df['date']).dt.dayofweek.astype('int8') df['hour'] = pd.to_datetime(df['date']).dt.hour.astype('int8') return df # 添加特征 - 确保所有参数都已定义 if not click_train_data.empty: print("开始构建点击特征...") click_train_data = add_click_features( click_train_data, did_features, vid_info, hist_click, # 确保hist_click已定义 hist_play # 确保hist_play已定义 ) else: print("⚠️ 点击数据集为空,跳过特征构建") # 5. 修复分类特征问题 def get_categorical_features(df, base_features): """动态获取存在的分类特征""" existing_features = [] for feature in base_features: if feature in df.columns: try: # 尝试转换为数值,如果是数值则跳过 pd.to_numeric(df[feature], errors='raise') except: existing_features.append(feature) # 确保转换为category类型 df[feature] = df[feature].astype('category').cat.as_ordered() return existing_features # 基础分类特征列表 base_categorical_features = [ 'item_cid', 'item_type', 'item_assetSource', 'item_classify', 'item_isIntact', 'sid', 'stype', 'day_of_week', 'hour' ] # 动态获取存在的分类特征 categorical_features = [] if not click_train_data.empty: categorical_features = get_categorical_features(click_train_data, base_categorical_features) print(f"使用的分类特征: {categorical_features}") else: print("⚠️ 点击训练数据为空,无法获取分类特征") # 准备训练数据 if not click_train_data.empty: if 'date' in click_train_data.columns: X = click_train_data.drop(columns=['did', 'vid', 'label', 'date'], errors='ignore') else: X = click_train_data.drop(columns=['did', 'vid', 'label'], errors='ignore') y = click_train_data['label'] else: X, y = pd.DataFrame(), pd.Series() print("⚠️ 点击训练数据为空") # 划分数据集 if not X.empty and not y.empty: X_train, X_val, y_train, y_val = train_test_split( X, y, test_size=0.2, random_state=42, stratify=y ) else: print("⚠️ 训练数据为空,无法进行模型训练") X_train, X_val, y_train, y_val = pd.DataFrame(), pd.DataFrame(), pd.Series(), pd.Series() # 训练模型(优化参数) params = { 'objective': 'binary', 'metric': 'binary_logloss', 'boosting_type': 'gbdt', 'num_leaves': 63, 'learning_rate': 0.05, 'feature_fraction': 0.8, 'bagging_fraction': 0.8, 'bagging_freq': 5, 'min_child_samples': 100, 'verbosity': -1 } if not X_train.empty: train_data = lgb.Dataset(X_train, label=y_train, categorical_feature=categorical_features) val_data = lgb.Dataset(X_val, label=y_val, categorical_feature=categorical_features) print("开始训练点击预测模型...") model_click = lgb.train( params, train_data, num_boost_round=1500, valid_sets=[val_data], callbacks=[ early_stopping(stopping_rounds=100, verbose=True), log_evaluation(period=50) ] ) else: model_click = None print("⚠️ 训练数据为空,跳过点击预测模型训练") # 6. 优化完播率数据集构建 def build_play_dataset(hist_play, vid_info, did_features, hist_click): """构建完播率数据集,优化内存使用""" if hist_play.empty: print("⚠️ 历史播放数据为空,无法构建完播率数据集") return pd.DataFrame() # 基础数据 play_data = hist_play[['did', 'vid', 'play_time']].copy() # 添加视频时长信息 if not vid_info.empty and 'vid' in vid_info.columns and 'item_duration' in vid_info.columns: play_data = play_data.merge( vid_info[['vid', 'item_duration']], on='vid', how='left' ) else: play_data['item_duration'] = 1.0 # 默认值 # 计算完播率 play_data['completion_rate'] = play_data['play_time'] / play_data['item_duration'] play_data['completion_rate'] = play_data['completion_rate'].clip(upper=1.0) # 添加用户特征 if not did_features.empty and 'did' in did_features.columns: play_data = play_data.merge( did_features, on='did', how='left' ) # 添加视频特征 if not vid_info.empty and 'vid' in vid_info.columns: vid_cols = [col for col in vid_info.columns if col != 'item_duration'] play_data = play_data.merge( vid_info[vid_cols], on='vid', how='left' ) # 用户平均完播率 play_data['user_avg_completion'] = play_data.groupby('did')['completion_rate'].transform('mean') play_data['user_play_count'] = play_data.groupby('did')['completion_rate'].transform('count') # 视频平均完播率 play_data['video_avg_completion'] = play_data.groupby('vid')['completion_rate'].transform('mean') play_data['video_completion_std'] = play_data.groupby('vid')['completion_rate'].transform('std') # 用户-视频互动特征 if not hist_click.empty and 'did' in hist_click.columns and 'vid' in hist_click.columns: user_vid_clicks = hist_click.groupby(['did', 'vid']).size().reset_index(name='user_vid_clicks') play_data = play_data.merge(user_vid_clicks, on=['did', 'vid'], how='left') else: play_data['user_vid_clicks'] = 0 # 填充缺失值 play_data['user_avg_completion'].fillna(play_data['completion_rate'].mean(), inplace=True) play_data['user_play_count'].fillna(1, inplace=True) play_data['video_avg_completion'].fillna(play_data['completion_rate'].median(), inplace=True) play_data['video_completion_std'].fillna(0, inplace=True) play_data['user_vid_clicks'].fillna(0, inplace=True) return play_data print("开始构建完播率数据集...") # 确保所有参数都已定义 if 'hist_play' in globals() and 'vid_info' in globals() and 'did_features' in globals() and 'hist_click' in globals(): play_train_data = build_play_dataset(hist_play, vid_info, did_features, hist_click) else: print("⚠️ 无法构建完播率数据集,因为所需变量未定义") play_train_data = pd.DataFrame() # 7. 训练完播率模型 if not play_train_data.empty: X_play = play_train_data.drop(columns=['did', 'vid', 'play_time', 'item_duration', 'completion_rate'], errors='ignore') y_play = play_train_data['completion_rate'] else: X_play, y_play = pd.DataFrame(), pd.Series() print("⚠️ 完播率训练数据为空") if not X_play.empty and not y_play.empty: X_train_play, X_val_play, y_train_play, y_val_play = train_test_split( X_play, y_play, test_size=0.2, random_state=42 ) else: print("⚠️ 完播率训练数据为空,无法进行模型训练") X_train_play, X_val_play, y_train_play, y_val_play = pd.DataFrame(), pd.DataFrame(), pd.Series(), pd.Series() # 获取完播率模型的分类特征 if not play_train_data.empty: play_categorical_features = get_categorical_features(play_train_data, base_categorical_features) print(f"完播率模型使用的分类特征: {play_categorical_features}") else: play_categorical_features = [] print("⚠️ 完播率训练数据为空,无法获取分类特征") # 训练参数 params_reg = { 'objective': 'regression', 'metric': 'mae', 'boosting_type': 'gbdt', 'num_leaves': 63, 'learning_rate': 0.03, 'feature_fraction': 0.8, 'bagging_fraction': 0.8, 'bagging_freq': 5, 'lambda_l1': 0.1, 'lambda_l2': 0.1, 'min_data_in_leaf': 50, 'verbosity': -1 } if not X_train_play.empty: train_data_play = lgb.Dataset(X_train_play, label=y_train_play, categorical_feature=play_categorical_features) val_data_play = lgb.Dataset(X_val_play, label=y_val_play, categorical_feature=play_categorical_features) print("开始训练完播率模型...") model_play = lgb.train( params_reg, train_data_play, num_boost_round=2000, valid_sets=[val_data_play], callbacks=[ early_stopping(stopping_rounds=100, verbose=True), log_evaluation(period=50) ] ) else: model_play = None print("⚠️ 训练数据为空,跳过完播率模型训练") # 保存模型 if model_click: model_click.save_model('click_model.txt') if model_play: model_play.save_model('play_model.txt') joblib.dump(base_categorical_features, 'categorical_features.pkl') # 8. 添加预测流程 def predict_for_test_data(test_users, test_exposure, did_features, vid_info): """为测试数据生成预测结果""" if test_users.empty or test_exposure.empty: print("⚠️ 测试数据为空,无法进行预测") return pd.DataFrame() # 合并测试数据 test_data = test_exposure.merge(test_users, on='did', how='left') # 添加特征 test_data = add_click_features( test_data, did_features, vid_info, pd.DataFrame(), # 无历史点击 pd.DataFrame() # 无历史播放 ) # 动态获取分类特征 test_categorical_features = get_categorical_features(test_data, base_categorical_features) # 预测点击率 X_test = test_data.drop(columns=['did', 'vid', 'date'], errors='ignore') click_probs = [] if model_click and not X_test.empty: click_probs = model_click.predict(X_test) else: click_probs = [0.5] * len(test_data) # 默认值 # 预测完播率 completion_rates = [] if model_play and not X_test.empty: # 添加视频时长信息 if not vid_info.empty and 'vid' in vid_info.columns and 'item_duration' in vid_info.columns: test_data = test_data.merge(vid_info[['vid', 'item_duration']], on='vid', how='left') else: test_data['item_duration'] = 1.0 completion_rates = model_play.predict(X_test) else: completion_rates = [0.7] * len(test_data) # 默认值 # 计算综合得分 test_data['click_prob'] = click_probs test_data['completion_rate'] = completion_rates test_data['score'] = test_data['click_prob'] * test_data['completion_rate'] # 生成提交格式 submission = test_data.groupby('did').apply( lambda x: ' '.join(x.sort_values('score', ascending=False)['vid'].astype(str)[:100]) ).reset_index(name='vid_list') return submission # 加载预测数据 print("开始加载预测数据...") to_predict_users = load_data_safely('testA_pred_did.csv', dtype={'did': 'category'}) to_predict_exposure = load_data_safely('testA_did_show.csv', dtype={'did': 'category', 'vid': 'category'}) # 执行预测 if not to_predict_users.empty and not to_predict_exposure.empty: print("开始生成预测结果...") submission = predict_for_test_data(to_predict_users, to_predict_exposure, did_features, vid_info) # 保存结果 if not submission.empty: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = f'submission_{timestamp}.csv' submission.to_csv(output_file, index=False) print(f"预测结果已保存至: {output_file}") else: print("⚠️ 预测结果为空,未保存文件") else: print("⚠️ 预测数据加载失败,无法生成结果")

filetype

请帮我检查并完善代码:#步骤1:数据读取与预处理,如果内存不足,可以考虑分批处理或使用Dask等工具。 import pandas as pd import numpy as np import lightgbm as lgb from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score, mean_absolute_error # 读取数据 did_features = pd.read_csv('did_features_table.csv') # 用户特征 vid_info = pd.read_csv('vid_info_table.csv') # 视频信息 # 历史32天曝光数据see_list(用于构建负样本) # 历史32天点击数据click_list(正样本) # 历史32天播放数据play_list(用于训练完播率模型) def detect_encoding(file_path): with open(file_path, 'rb') as f: result = chardet.detect(f.read(10000)) return result['encoding'], result['confidence'] def load_all_data(days=32): see_list, click_list, play_list = [], [], [] dtypes = {'did': 'category', 'vid': 'category'} for i in range(1, days + 1): day = f"{i:02d}" # 检查文件是否存在 for file_type in [f'see_{day}.csv', f'click_{day}.csv', f'playplus_{day}.csv']: if not os.path.exists(file_type): print(f"⚠️ 警告: 文件 {file_type} 不存在,跳过该天数据") continue try: # 加载 see 数据 see = pd.read_csv(f'see_{day}.csv', encoding='latin1', dtype=dtypes) if 'did' not in see.columns or 'vid' not in see.columns: print(f"⚠️ 警告: see_{day}.csv 缺少必要字段,跳过该天数据") continue see['day'] = day see_list.append(see) del see gc.collect() # 加载 click 数据 click = pd.read_csv( f'click_{day}.csv', encoding='ISO-8859-1', on_bad_lines='skip', dtype=dtypes ) if 'click_time' not in click.columns: print(f"⚠️ 警告: click_{day}.csv 缺少 click_time 字段,跳过该天数据") continue click['date'] = pd.to_datetime(click['click_time'], errors='coerce').dt.date click_list.append(click[['did', 'vid', 'date']]) del click gc.collect() # 加载 play 数据 play = pd.read_csv( f'playplus_{day}.csv', engine='python', encoding_errors='ignore', dtype=dtypes ) if 'play_time' not in play.columns: print(f"⚠️ 警告: playplus_{day}.csv 缺少 play_time 字段,跳过该天数据") continue play_list.append(play[['did', 'vid', 'play_time']]) del play gc.collect() except Exception as e: print(f"⚠️ 加载第 {day} 天数据时出错: {str(e)}") continue # 处理所有天都没有数据的情况 if not see_list: raise ValueError("错误: 未找到任何有效数据,请检查输入文件") all_see = pd.concat(see_list).drop_duplicates(['did', 'vid']) all_click = pd.concat(click_list).drop_duplicates(['did', 'vid']) to_predict_users = pd.read_csv('testA_pred_did.csv') # 待预测用户 to_predict_exposure = pd.read_csv('testA_did_show.csv') # 待预测用户的曝光视频 # 注意:弹幕数据暂不使用 #步骤2:构建点击预测模型的训练集,如果内存不足,可以考虑分批处理或使用Dask等工具。 # 正样本:历史点击数据,标记为1 positive_samples = hist_click[['did', 'vid']].copy() positive_samples['label'] = 1 # 负样本:从历史曝光数据中,排除出现在历史点击数据中的(did, vid) # 注意:同一个用户可能曝光了多个视频,但只点击了部分,所以未点击的就是负样本 # 合并同一个用户的所有曝光和点击,然后取差集 # 方法:对每个用户,曝光视频中不在点击视频列表中的作为负样本 # 注意:数据量可能很大,需要分组操作 # 先获取每个用户点击了哪些视频(did, vid集合) user_clicked_vids = hist_click.groupby('did')['vid'].apply(set).reset_index(name='clicked_set') # 将历史曝光数据与user_clicked_vids合并 hist_exposure_with_click = hist_exposure.merge(user_clicked_vids, on='did', how='left') # 对于每个曝光记录,如果vid不在clicked_set中,则作为负样本 # 注意:如果用户没有点击记录,则clicked_set为NaN,使用空集 hist_exposure_with_click['clicked_set'] = hist_exposure_with_click['clicked_set'].apply(lambda x: x if isinstance(x, set) else set()) hist_exposure_with_click['is_clicked'] = hist_exposure_with_click.apply(lambda row: row['vid'] in row['clicked_set'], axis=1) # 负样本:未点击的曝光记录 negative_samples = hist_exposure_with_click[~hist_exposure_with_click['is_clicked']][['did', 'vid']] negative_samples['label'] = 0 # 合并正负样本 click_train_data = pd.concat([positive_samples, negative_samples], axis=0, ignore_index=True) # 合并用户特征和视频特征 click_train_data = click_train_data.merge(did_features, on='did', how='left') click_train_data = click_train_data.merge(vid_info, on='vid', how='left') # 注意:这里可能会有缺失值(比如视频信息表中没有某个视频的信息),需要填充 # 填充策略:对于数值特征,用中位数或均值;对于类别特征,用众数或特殊值(如-1) #步骤3:特征工程(点击预测模型) video_click_count = hist_click.groupby('vid').size().reset_index(name='video_click_count') click_train_data = click_train_data.merge(video_click_count, on='vid', how='left') click_train_data['video_click_count'].fillna(0, inplace=True) # 对于新视频,用0填充 #步骤4:训练点击预测模型 # 划分训练集和验证集 X = click_train_data.drop(columns=['did', 'vid', 'label']) y = click_train_data['label'] # 将类别特征转换为类别类型(LightGBM可以处理类别特征) categorical_features = ['item_cid', 'item_type', 'item_assetSource', 'item_classify', 'item_isIntact', 'sid', 'stype'] for col in categorical_features: if col in X.columns: X[col] = X[col].astype('category') X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42) # 训练LightGBM分类器 params = { 'objective': 'binary', 'metric': 'binary_logloss', 'boosting_type': 'gbdt', 'num_leaves': 31, 'learning_rate': 0.05, 'feature_fraction': 0.9, 'bagging_fraction': 0.8, 'bagging_freq': 5, 'verbose': 0 } train_data = lgb.Dataset(X_train, label=y_train, categorical_feature=categorical_features, free_raw_data=False) val_data = lgb.Dataset(X_val, label=y_val, categorical_feature=categorical_features, free_raw_data=False) model_click = lgb.train(params, train_data, valid_sets=[train_data, val_data], num_boost_round=1000, early_stopping_rounds=50, verbose_eval=10) # 保存模型 model_click.save_model('click_model.txt') #步骤5:构建完播率预测模型的训练集 # 使用历史播放数据(有播放时长),需要合并视频信息表获取视频时长,然后计算完播率 # 注意:播放时长可能大于视频时长,所以完播率最大为1 play_data = hist_play.merge(vid_info[['vid', 'item_duration']], on='vid', how='left') play_data['completion_rate'] = play_data['play_time'] / play_data['item_duration'] play_data['completion_rate'] = play_data['completion_rate'].clip(upper=1.0) # 超过1的设为1 # 合并用户特征和视频特征 play_train_data = play_data.merge(did_features, on='did', how='left') play_train_data = play_train_data.merge(vid_info.drop(columns=['item_duration']), on='vid', how='left') # 同样,构造统计特征(如用户平均完播率、视频平均完播率等) # 示例:用户平均完播率 user_avg_completion = play_train_data.groupby('did')['completion_rate'].mean().reset_index(name='user_avg_completion') play_train_data = play_train_data.merge(user_avg_completion, on='did', how='left') # 视频平均完播率 video_avg_completion = play_train_data.groupby('vid')['completion_rate'].mean().reset_index(name='video_avg_completion') play_train_data = play_train_data.merge(video_avg_completion, on='vid', how='left') # 填充缺失值 # ... # 特征矩阵 X_play = play_train_data.drop(columns=['did', 'vid', 'play_time', 'item_duration', 'completion_rate']) y_play = play_train_data['completion_rate'] #步骤6:训练完播率预测模型 # 划分训练集和验证集 X_train_play, X_val_play, y_train_play, y_val_play = train_test_split(X_play, y_play, test_size=0.2, random_state=42) # 训练LightGBM回归模型 params_reg = { 'objective': 'regression', 'metric': 'mae', 'boosting_type': 'gbdt', 'num_leaves': 31, 'learning_rate': 0.05, 'feature_fraction': 0.9, 'bagging_fraction': 0.8, 'bagging_freq': 5, 'verbose': 0 } train_data_play = lgb.Dataset(X_train_play, label=y_train_play, categorical_feature=categorical_features, free_raw_data=False) val_data_play = lgb.Dataset(X_val_play, label=y_val_play, categorical_feature=categorical_features, free_raw_data=False) model_play = lgb.train(params_reg, train_data_play, valid_sets=[train_data_play, val_data_play], num_boost_round=1000, early_stopping_rounds=50, verbose_eval=10) # 保存模型 model_play.save_model('play_model.txt')

filetype

帮我检查代码:import pandas as pd import numpy as np import lightgbm as lgb from lightgbm import early_stopping, log_evaluation import gc import os import chardet from sklearn.model_selection import train_test_split from tqdm import tqdm import joblib from datetime import datetime # 1. 增强数据加载函数(添加列存在性检查) def load_data_safely(file_path, usecols=None, dtype=None, chunksize=100000): """安全加载大型CSV文件,优化内存使用""" try: if not os.path.exists(file_path): print(f"⚠️ 文件不存在: {file_path}") return pd.DataFrame() # 自动检测编码 with open(file_path, 'rb') as f: result = chardet.detect(f.read(100000)) encoding = result['encoding'] if result['confidence'] > 0.7 else 'latin1' # 分批读取并优化内存 chunks = [] reader = pd.read_csv( file_path, encoding=encoding, usecols=usecols, dtype=dtype, chunksize=chunksize, low_memory=False ) for chunk in tqdm(reader, desc=f"加载 {os.path.basename(file_path)}"): # 优化分类列内存 for col in chunk.columns: if dtype and col in dtype and dtype[col] == 'category': chunk[col] = chunk[col].astype('category').cat.as_ordered() chunks.append(chunk) if chunks: return pd.concat(chunks, ignore_index=True) return pd.DataFrame() except Exception as e: print(f"⚠️ 加载 {file_path} 失败: {str(e)}") return pd.DataFrame() # 2. 优化历史数据加载(添加列存在性检查) def load_historical_data(days=32): """高效加载历史数据,支持分批处理""" see_list, click_list, play_list = [], [], [] for day in tqdm(range(1, days + 1), desc="加载历史数据"): day_str = f"{day:02d}" # 加载曝光数据 see_path = f'see_{day_str}.csv' if os.path.exists(see_path): see = load_data_safely(see_path, usecols=['did', 'vid'], dtype={'did': 'category', 'vid': 'category'}) if not see.empty and 'did' in see.columns and 'vid' in see.columns: see_list.append(see) del see gc.collect() # 加载点击数据 click_path = f'click_{day_str}.csv' if os.path.exists(click_path): click = load_data_safely(click_path, usecols=['did', 'vid', 'click_time'], dtype={'did': 'category', 'vid': 'category'}) if not click.empty and 'click_time' in click.columns and 'did' in click.columns and 'vid' in click.columns: # 优化日期处理 click['date'] = pd.to_datetime(click['click_time'], errors='coerce').dt.date click = click.drop(columns=['click_time'], errors='ignore') click_list.append(click[['did', 'vid', 'date']]) del click gc.collect() # 加载播放数据 play_path = f'playplus_{day_str}.csv' if os.path.exists(play_path): play = load_data_safely(play_path, usecols=['did', 'vid', 'play_time'], dtype={'did': 'category', 'vid': 'category'}) if not play.empty and 'play_time' in play.columns and 'did' in play.columns and 'vid' in play.columns: play_list.append(play) del play gc.collect() gc.collect() # 确保返回三个DataFrame,即使某些为空 return ( pd.concat(see_list).drop_duplicates(['did', 'vid']) if see_list else pd.DataFrame(), pd.concat(click_list).drop_duplicates(['did', 'vid']) if click_list else pd.DataFrame(), pd.concat(play_list).drop_duplicates(['did', 'vid']) if play_list else pd.DataFrame() ) # 定义内存优化的数据类型(添加列存在性检查) dtypes = { 'did': 'category', 'vid': 'category', 'play_time': 'float32' } # 可选特征 - 只有在数据中存在时才添加 optional_features = { 'item_cid': 'category', 'item_type': 'category', 'item_assetSource': 'category', 'item_classify': 'category', 'item_isIntact': 'category', 'sid': 'category', 'stype': 'category' } # 添加特征字段 for i in range(88): dtypes[f'f{i}'] = 'float32' # 加载核心数据 print("开始加载核心数据...") did_features = load_data_safely('did_features_table.csv', dtype=dtypes) vid_info = load_data_safely('vid_info_table.csv', dtype=dtypes) # 添加可选特征到dtypes(仅当列存在时) for feature, dtype in optional_features.items(): if not vid_info.empty and feature in vid_info.columns: dtypes[feature] = dtype # 重新加载数据以确保所有列使用正确的数据类型 if os.path.exists('did_features_table.csv'): did_features = load_data_safely('did_features_table.csv', dtype=dtypes) else: print("⚠️ did_features_table.csv 不存在") did_features = pd.DataFrame() if os.path.exists('vid_info_table.csv'): vid_info = load_data_safely('vid_info_table.csv', dtype=dtypes) else: print("⚠️ vid_info_table.csv 不存在") vid_info = pd.DataFrame() # 加载历史数据 - 确保所有变量都被定义 print("开始加载历史数据...") hist_exposure, hist_click, hist_play = load_historical_data(days=32) # 打印历史数据状态 print(f"历史曝光数据形状: {hist_exposure.shape if not hist_exposure.empty else '空'}") print(f"历史点击数据形状: {hist_click.shape if not hist_click.empty else '空'}") print(f"历史播放数据形状: {hist_play.shape if not hist_play.empty else '空'}") # 3. 优化点击数据集构建(添加空数据检查) def build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1): """构建点击数据集,包含负样本采样""" if hist_exposure.empty or hist_click.empty: print("⚠️ 历史曝光或点击数据为空,无法构建数据集") return pd.DataFrame() # 标记正样本 hist_click = hist_click.copy() hist_click['label'] = 1 # 高效标记负样本(使用集合操作) exposure_set = set(zip(hist_exposure['did'], hist_exposure['vid'])) click_set = set(zip(hist_click['did'], hist_click['vid'])) # 找出未点击的曝光 negative_set = exposure_set - click_set # 创建负样本DataFrame if negative_set: negative_dids, negative_vids = zip(*negative_set) negative_samples = pd.DataFrame({ 'did': list(negative_dids), 'vid': list(negative_vids), 'label': 0 }) # 采样负样本 if sample_ratio < 1.0: negative_samples = negative_samples.sample(frac=sample_ratio, random_state=42) else: negative_samples = pd.DataFrame(columns=['did', 'vid', 'label']) # 合并数据集 click_data = pd.concat([ hist_click[['did', 'vid', 'label']], negative_samples ], ignore_index=True) # 释放内存 del exposure_set, click_set, negative_set, negative_samples gc.collect() return click_data # 使用10%负样本比例 - 确保hist_click已定义 if not hist_exposure.empty and not hist_click.empty: print("构建点击数据集...") click_train_data = build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1) else: print("⚠️ 无法构建点击数据集,因为历史曝光或点击数据为空") click_train_data = pd.DataFrame() # 4. 优化特征工程(解决分类特征问题) def add_click_features(df, did_features, vid_info, hist_click, hist_play): """添加关键特征,避免内存溢出""" if df.empty: return df # 基础特征(使用索引加速合并) if not did_features.empty and 'did' in did_features.columns: df = df.merge(did_features, on='did', how='left') if not vid_info.empty and 'vid' in vid_info.columns: df = df.merge(vid_info, on='vid', how='left') # 用户行为统计 user_click_count = pd.Series(dtype='int') if not hist_click.empty and 'did' in hist_click.columns: user_click_count = hist_click.groupby('did').size().rename('user_click_count') if not user_click_count.empty: df = df.merge(user_click_count, on='did', how='left') else: df['user_click_count'] = 0 user_total_play = pd.Series(dtype='float') if not hist_play.empty and 'did' in hist_play.columns and 'play_time' in hist_play.columns: user_total_play = hist_play.groupby('did')['play_time'].sum().rename('user_total_play') if not user_total_play.empty: df = df.merge(user_total_play, on='did', how='left') else: df['user_total_play'] = 0 # 视频热度统计 video_click_count = pd.Series(dtype='int') if not hist_click.empty and 'vid' in hist_click.columns: video_click_count = hist_click.groupby('vid').size().rename('video_click_count') if not video_click_count.empty: df = df.merge(video_click_count, on='vid', how='left') else: df['video_click_count'] = 0 avg_play_time = pd.Series(dtype='float') if not hist_play.empty and 'vid' in hist_play.columns and 'play_time' in hist_play.columns: avg_play_time = hist_play.groupby('vid')['play_time'].mean().rename('avg_play_time') if not avg_play_time.empty: df = df.merge(avg_play_time, on='vid', how='left') else: df['avg_play_time'] = 0 # 填充缺失值 fill_values = { 'user_click_count': 0, 'user_total_play': 0, 'video_click_count': df['video_click_count'].median() if 'video_click_count' in df else 0, 'avg_play_time': df['avg_play_time'].median() if 'avg_play_time' in df else 0 } for col, value in fill_values.items(): if col in df: df[col] = df[col].fillna(value) # 添加时间相关特征 if 'date' in df: df['day_of_week'] = pd.to_datetime(df['date']).dt.dayofweek.astype('int8') df['hour'] = pd.to_datetime(df['date']).dt.hour.astype('int8') return df # 添加特征 - 确保所有参数都已定义 if not click_train_data.empty: print("开始构建点击特征...") click_train_data = add_click_features( click_train_data, did_features, vid_info, hist_click, # 确保hist_click已定义 hist_play # 确保hist_play已定义 ) else: print("⚠️ 点击数据集为空,跳过特征构建") # 5. 修复分类特征问题 def get_categorical_features(df, base_features): """动态获取存在的分类特征""" existing_features = [] for feature in base_features: if feature in df.columns: try: # 尝试转换为数值,如果是数值则跳过 pd.to_numeric(df[feature], errors='raise') except: existing_features.append(feature) # 确保转换为category类型 df[feature] = df[feature].astype('category').cat.as_ordered() return existing_features # 基础分类特征列表 base_categorical_features = [ 'item_cid', 'item_type', 'item_assetSource', 'item_classify', 'item_isIntact', 'sid', 'stype', 'day_of_week', 'hour' ] # 动态获取存在的分类特征 categorical_features = [] if not click_train_data.empty: categorical_features = get_categorical_features(click_train_data, base_categorical_features) print(f"使用的分类特征: {categorical_features}") else: print("⚠️ 点击训练数据为空,无法获取分类特征") # 准备训练数据 if not click_train_data.empty: if 'date' in click_train_data.columns: X = click_train_data.drop(columns=['did', 'vid', 'label', 'date'], errors='ignore') else: X = click_train_data.drop(columns=['did', 'vid', 'label'], errors='ignore') y = click_train_data['label'] else: X, y = pd.DataFrame(), pd.Series() print("⚠️ 点击训练数据为空") # 划分数据集 if not X.empty and not y.empty: X_train, X_val, y_train, y_val = train_test_split( X, y, test_size=0.2, random_state=42, stratify=y ) else: print("⚠️ 训练数据为空,无法进行模型训练") X_train, X_val, y_train, y_val = pd.DataFrame(), pd.DataFrame(), pd.Series(), pd.Series() # 训练模型(优化参数) params = { 'objective': 'binary', 'metric': 'binary_logloss', 'boosting_type': 'gbdt', 'num_leaves': 63, 'learning_rate': 0.05, 'feature_fraction': 0.8, 'bagging_fraction': 0.8, 'bagging_freq': 5, 'min_child_samples': 100, 'verbosity': -1 } if not X_train.empty: train_data = lgb.Dataset(X_train, label=y_train, categorical_feature=categorical_features) val_data = lgb.Dataset(X_val, label=y_val, categorical_feature=categorical_features) print("开始训练点击预测模型...") model_click = lgb.train( params, train_data, num_boost_round=1500, valid_sets=[val_data], callbacks=[ early_stopping(stopping_rounds=100, verbose=True), log_evaluation(period=50) ] ) else: model_click = None print("⚠️ 训练数据为空,跳过点击预测模型训练") # 6. 优化完播率数据集构建 def build_play_dataset(hist_play, vid_info, did_features, hist_click): """构建完播率数据集,优化内存使用""" if hist_play.empty: print("⚠️ 历史播放数据为空,无法构建完播率数据集") return pd.DataFrame() # 基础数据 play_data = hist_play[['did', 'vid', 'play_time']].copy() # 添加视频时长信息 if not vid_info.empty and 'vid' in vid_info.columns and 'item_duration' in vid_info.columns: play_data = play_data.merge( vid_info[['vid', 'item_duration']], on='vid', how='left' ) else: play_data['item_duration'] = 1.0 # 默认值 # 计算完播率 play_data['completion_rate'] = play_data['play_time'] / play_data['item_duration'] play_data['completion_rate'] = play_data['completion_rate'].clip(upper=1.0) # 添加用户特征 if not did_features.empty and 'did' in did_features.columns: play_data = play_data.merge( did_features, on='did', how='left' ) # 添加视频特征 if not vid_info.empty and 'vid' in vid_info.columns: vid_cols = [col for col in vid_info.columns if col != 'item_duration'] play_data = play_data.merge( vid_info[vid_cols], on='vid', how='left' ) # 用户平均完播率 play_data['user_avg_completion'] = play_data.groupby('did')['completion_rate'].transform('mean') play_data['user_play_count'] = play_data.groupby('did')['completion_rate'].transform('count') # 视频平均完播率 play_data['video_avg_completion'] = play_data.groupby('vid')['completion_rate'].transform('mean') play_data['video_completion_std'] = play_data.groupby('vid')['completion_rate'].transform('std') # 用户-视频互动特征 if not hist_click.empty and 'did' in hist_click.columns and 'vid' in hist_click.columns: user_vid_clicks = hist_click.groupby(['did', 'vid']).size().reset_index(name='user_vid_clicks') play_data = play_data.merge(user_vid_clicks, on=['did', 'vid'], how='left') else: play_data['user_vid_clicks'] = 0 # 填充缺失值 play_data['user_avg_completion'].fillna(play_data['completion_rate'].mean(), inplace=True) play_data['user_play_count'].fillna(1, inplace=True) play_data['video_avg_completion'].fillna(play_data['completion_rate'].median(), inplace=True) play_data['video_completion_std'].fillna(0, inplace=True) play_data['user_vid_clicks'].fillna(0, inplace=True) return play_data print("开始构建完播率数据集...") # 确保所有参数都已定义 if 'hist_play' in globals() and 'vid_info' in globals() and 'did_features' in globals() and 'hist_click' in globals(): play_train_data = build_play_dataset(hist_play, vid_info, did_features, hist_click) else: print("⚠️ 无法构建完播率数据集,因为所需变量未定义") play_train_data = pd.DataFrame() # 7. 训练完播率模型 if not play_train_data.empty: X_play = play_train_data.drop(columns=['did', 'vid', 'play_time', 'item_duration', 'completion_rate'], errors='ignore') y_play = play_train_data['completion_rate'] else: X_play, y_play = pd.DataFrame(), pd.Series() print("⚠️ 完播率训练数据为空") if not X_play.empty and not y_play.empty: X_train_play, X_val_play, y_train_play, y_val_play = train_test_split( X_play, y_play, test_size=0.2, random_state=42 ) else: print("⚠️ 完播率训练数据为空,无法进行模型训练") X_train_play, X_val_play, y_train_play, y_val_play = pd.DataFrame(), pd.DataFrame(), pd.Series(), pd.Series() # 获取完播率模型的分类特征 if not play_train_data.empty: play_categorical_features = get_categorical_features(play_train_data, base_categorical_features) print(f"完播率模型使用的分类特征: {play_categorical_features}") else: play_categorical_features = [] print("⚠️ 完播率训练数据为空,无法获取分类特征") # 训练参数 params_reg = { 'objective': 'regression', 'metric': 'mae', 'boosting_type': 'gbdt', 'num_leaves': 63, 'learning_rate': 0.03, 'feature_fraction': 0.8, 'bagging_fraction': 0.8, 'bagging_freq': 5, 'lambda_l1': 0.1, 'lambda_l2': 0.1, 'min_data_in_leaf': 50, 'verbosity': -1 } if not X_train_play.empty: train_data_play = lgb.Dataset(X_train_play, label=y_train_play, categorical_feature=play_categorical_features) val_data_play = lgb.Dataset(X_val_play, label=y_val_play, categorical_feature=play_categorical_features) print("开始训练完播率模型...") model_play = lgb.train( params_reg, train_data_play, num_boost_round=2000, valid_sets=[val_data_play], callbacks=[ early_stopping(stopping_rounds=100, verbose=True), log_evaluation(period=50) ] ) else: model_play = None print("⚠️ 训练数据为空,跳过完播率模型训练") # 保存模型 if model_click: model_click.save_model('click_model.txt') if model_play: model_play.save_model('play_model.txt') joblib.dump(base_categorical_features, 'categorical_features.pkl') # 8. 添加预测流程 def predict_for_test_data(test_users, test_exposure, did_features, vid_info): """为测试数据生成预测结果""" if test_users.empty or test_exposure.empty: print("⚠️ 测试数据为空,无法进行预测") return pd.DataFrame() # 合并测试数据 test_data = test_exposure.merge(test_users, on='did', how='left') # 添加特征 test_data = add_click_features( test_data, did_features, vid_info, pd.DataFrame(), # 无历史点击 pd.DataFrame() # 无历史播放 ) # 动态获取分类特征 test_categorical_features = get_categorical_features(test_data, base_categorical_features) # 预测点击率 X_test = test_data.drop(columns=['did', 'vid', 'date'], errors='ignore') click_probs = [] if model_click and not X_test.empty: click_probs = model_click.predict(X_test) else: click_probs = [0.5] * len(test_data) # 默认值 # 预测完播率 completion_rates = [] if model_play and not X_test.empty: # 添加视频时长信息 if not vid_info.empty and 'vid' in vid_info.columns and 'item_duration' in vid_info.columns: test_data = test_data.merge(vid_info[['vid', 'item_duration']], on='vid', how='left') else: test_data['item_duration'] = 1.0 completion_rates = model_play.predict(X_test) else: completion_rates = [0.7] * len(test_data) # 默认值 # 计算综合得分 test_data['click_prob'] = click_probs test_data['completion_rate'] = completion_rates test_data['score'] = test_data['click_prob'] * test_data['completion_rate'] # 生成提交格式 submission = test_data.groupby('did').apply( lambda x: ' '.join(x.sort_values('score', ascending=False)['vid'].astype(str)[:100]) ).reset_index(name='vid_list') return submission # 加载预测数据 print("开始加载预测数据...") to_predict_users = load_data_safely('testA_pred_did.csv', dtype={'did': 'category'}) to_predict_exposure = load_data_safely('testA_did_show.csv', dtype={'did': 'category', 'vid': 'category'}) # 执行预测 if not to_predict_users.empty and not to_predict_exposure.empty: print("开始生成预测结果...") submission = predict_for_test_data(to_predict_users, to_predict_exposure, did_features, vid_info) # 保存结果 if not submission.empty: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = f'submission_{timestamp}.csv' submission.to_csv(output_file, index=False) print(f"预测结果已保存至: {output_file}") else: print("⚠️ 预测结果为空,未保存文件") else: print("⚠️ 预测数据加载失败,无法生成结果") # 8. 调整预测流程以满足新格式要求 def predict_for_test_data(test_users, test_exposure, did_features, vid_info): """为测试数据生成预测结果,格式为:did, vid, completion_rate""" if test_users.empty or test_exposure.empty: print("⚠️ 测试数据为空,无法进行预测") return pd.DataFrame() # 合并测试数据 test_data = test_exposure.merge(test_users, on='did', how='left') # 添加特征 test_data = add_click_features( test_data, did_features, vid_info, pd.DataFrame(), # 无历史点击 pd.DataFrame() # 无历史播放 ) # 动态获取分类特征 test_categorical_features = get_categorical_features(test_data, base_categorical_features) # 预测点击率 X_test = test_data.drop(columns=['did', 'vid', 'date'], errors='ignore') click_probs = [] if model_click and not X_test.empty: click_probs = model_click.predict(X_test) else: click_probs = [0.5] * len(test_data) # 默认值 # 预测完播率 completion_rates = [] if model_play and not X_test.empty: # 添加视频时长信息 if not vid_info.empty and 'vid' in vid_info.columns and 'item_duration' in vid_info.columns: test_data = test_data.merge(vid_info[['vid', 'item_duration']], on='vid', how='left') else: test_data['item_duration'] = 1.0 completion_rates = model_play.predict(X_test) else: completion_rates = [0.7] * len(test_data) # 默认值 # 计算综合得分 test_data['click_prob'] = click_probs test_data['completion_rate'] = completion_rates test_data['score'] = test_data['click_prob'] * test_data['completion_rate'] # 为每个用户选择得分最高的视频 submission = test_data.sort_values('score', ascending=False).groupby('did').head(1) # 选择需要的列 submission = submission[['did', 'vid', 'completion_rate']].copy() # 重命名列以符合要求 submission.columns = ['did', 'vid', 'completion_rate'] # 确保数据格式正确 submission['did'] = submission['did'].astype(str) submission['vid'] = submission['vid'].astype(str) submission['completion_rate'] = submission['completion_rate'].round(4) # 保留4位小数 return submission # 加载预测数据 print("开始加载预测数据...") to_predict_users = load_data_safely('testA_pred_did.csv', dtype={'did': 'category'}) to_predict_exposure = load_data_safely('testA_did_show.csv', dtype={'did': 'category', 'vid': 'category'}) # 执行预测 if not to_predict_users.empty and not to_predict_exposure.empty: print("开始生成预测结果...") submission = predict_for_test_data(to_predict_users, to_predict_exposure, did_features, vid_info) # 保存结果 if not submission.empty: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = f'submission_{timestamp}.csv' # 确保输出格式正确:did, vid, completion_rate submission.to_csv(output_file, index=False, header=True) print(f"预测结果已保存至: {output_file}") # 打印前5行示例 print("\n预测结果示例:") print(submission.head()) else: print("⚠️ 预测结果为空,未保存文件") else: print("⚠️ 预测数据加载失败,无法生成结果")

filetype

帮我检查优化代码,尤其是减少内存占用:import pandas as pd import numpy as np import lightgbm as lgb from lightgbm import early_stopping, log_evaluation import gc import os import chardet from sklearn.model_selection import train_test_split from tqdm import tqdm import joblib from datetime import datetime from scipy.sparse import hstack, csr_matrix, save_npz, load_npz import sys import psutil from sklearn.metrics import log_loss, mean_absolute_error # 内存优化函数 def optimize_dtypes(df): """优化DataFrame的数据类型以减少内存占用""" if df.empty: return df # 转换整数列为最小可用类型 int_cols = df.select_dtypes(include=['int']).columns if not int_cols.empty: df[int_cols] = df[int_cols].apply(pd.to_numeric, downcast='integer') # 转换浮点列为最小可用类型 float_cols = df.select_dtypes(include=['float']).columns if not float_cols.empty: df[float_cols] = df[float_cols].apply(pd.to_numeric, downcast='float') # 转换对象列为分类类型 obj_cols = df.select_dtypes(include=['object']).columns for col in obj_cols: num_unique = df[col].nunique() num_total = len(df) if num_unique / num_total < 0.5: # 如果唯一值比例小于50% df[col] = df[col].astype('category') return df # 内存监控函数 def memory_monitor(step_name=""): """监控内存使用情况""" process = psutil.Process(os.getpid()) mem_info = process.memory_info() print(f"{step_name} 内存使用: {mem_info.rss / (1024 ** 2):.2f} MB") return mem_info.rss / (1024 ** 2) # 返回MB # 增强数据加载函数 def load_data_safely(file_path, usecols=None, dtype=None, chunksize=100000): """安全加载大型CSV文件,优化内存使用""" try: if not os.path.exists(file_path): print(f"⚠️ 文件不存在: {file_path}") return pd.DataFrame() # 自动检测编码 with open(file_path, 'rb') as f: result = chardet.detect(f.read(100000)) encoding = result['encoding'] if result['confidence'] > 0.7 else 'latin1' # 分批读取并优化内存 chunks = [] reader = pd.read_csv( file_path, encoding=encoding, usecols=usecols, dtype=dtype, chunksize=chunksize, low_memory=False ) for chunk in tqdm(reader, desc=f"加载 {os.path.basename(file_path)}"): # 优化分类列内存 for col in chunk.columns: if dtype and col in dtype and dtype[col] == 'category': chunk[col] = chunk[col].astype('category').cat.as_ordered() # 优化数据类型 chunk = optimize_dtypes(chunk) chunks.append(chunk) if chunks: result = pd.concat(chunks, ignore_index=True) # 再次整体优化 result = optimize_dtypes(result) return result return pd.DataFrame() except Exception as e: print(f"⚠️ 加载 {file_path} 失败: {str(e)}") return pd.DataFrame() # 稀疏矩阵转换函数 def to_sparse_matrix(df, columns): """将分类特征转换为稀疏矩阵表示""" sparse_matrices = [] for col in columns: if col in df.columns: # 处理NaN值 df[col] = df[col].fillna('MISSING') # 创建稀疏矩阵 sparse_mat = csr_matrix(pd.get_dummies(df[col], sparse=True).values) sparse_matrices.append(sparse_mat) # 水平堆叠所有稀疏矩阵 if sparse_matrices: return hstack(sparse_matrices) return None # 增量训练函数 def train_incremental(X, y, categorical_features, params, num_rounds=1000, chunk_size=100000): """分块增量训练模型以减少内存占用""" model = None for i in tqdm(range(0, len(X), chunk_size), desc="增量训练"): chunk_end = min(i + chunk_size, len(X)) X_chunk = X.iloc[i:chunk_end] y_chunk = y.iloc[i:chunk_end] train_data = lgb.Dataset( X_chunk, label=y_chunk, categorical_feature=categorical_features ) if model is None: model = lgb.train( params, train_data, num_boost_round=num_rounds, keep_training_booster=True ) else: model = lgb.train( params, train_data, num_boost_round=num_rounds, init_model=model, keep_training_booster=True ) return model # 主处理流程 def main(): """主处理流程,包含完整的内存优化策略""" # 初始内存监控 start_mem = memory_monitor("初始内存") # 定义内存优化的数据类型 dtypes = { 'did': 'category', 'vid': 'category', 'play_time': 'float32' } # 可选特征 optional_features = { 'item_cid': 'category', 'item_type': 'category', 'item_assetSource': 'category', 'item_classify': 'category', 'item_isIntact': 'category', 'sid': 'category', 'stype': 'category' } # 添加特征字段 for i in range(88): dtypes[f'f{i}'] = 'float32' # 加载核心数据 print("开始加载核心数据...") did_features = load_data_safely('did_features_table.csv', dtype=dtypes) vid_info = load_data_safely('vid_info_table.csv', dtype=dtypes) memory_monitor("加载核心数据后") # 添加可选特征到dtypes for feature, dtype in optional_features.items(): if not vid_info.empty and feature in vid_info.columns: dtypes[feature] = dtype # 重新加载数据以确保所有列使用正确的数据类型 if os.path.exists('did_features_table.csv'): did_features = load_data_safely('did_features_table.csv', dtype=dtypes) else: print("⚠️ did_features_table.csv 不存在") did_features = pd.DataFrame() if os.path.exists('vid_info_table.csv'): vid_info = load_data_safely('vid_info_table.csv', dtype=dtypes) else: print("⚠️ vid_info_table.csv 不存在") vid_info = pd.DataFrame() memory_monitor("重新加载数据后") # 加载历史数据 print("开始加载历史数据...") hist_exposure, hist_click, hist_play = load_historical_data(days=32) memory_monitor("加载历史数据后") # 构建点击数据集 if not hist_exposure.empty and not hist_click.empty: print("构建点击数据集...") click_train_data = build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1) else: print("⚠️ 无法构建点击数据集") click_train_data = pd.DataFrame() memory_monitor("构建点击数据集后") # 添加特征 if not click_train_data.empty: print("开始构建点击特征...") click_train_data = add_click_features( click_train_data, did_features, vid_info, hist_click, hist_play ) else: print("⚠️ 点击数据集为空,跳过特征构建") memory_monitor("添加特征后") # 准备训练数据 if not click_train_data.empty: if 'date' in click_train_data.columns: X = click_train_data.drop(columns=['did', 'vid', 'label', 'date'], errors='ignore') else: X = click_train_data.drop(columns=['did', 'vid', 'label'], errors='ignore') y = click_train_data['label'] else: X, y = pd.DataFrame(), pd.Series() print("⚠️ 点击训练数据为空") # 划分数据集 if not X.empty and not y.empty: X_train, X_val, y_train, y_val = train_test_split( X, y, test_size=0.2, random_state=42, stratify=y ) else: print("⚠️ 训练数据为空,无法进行模型训练") X_train, X_val, y_train, y_val = pd.DataFrame(), pd.DataFrame(), pd.Series(), pd.Series() memory_monitor("划分数据集后") # 训练模型参数 params = { 'objective': 'binary', 'metric': 'binary_logloss', 'boosting_type': 'gbdt', 'num_leaves': 63, 'learning_rate': 0.05, 'feature_fraction': 0.8, 'bagging_fraction': 0.8, 'bagging_freq': 5, 'min_child_samples': 100, 'verbosity': -1 } # 增量训练点击模型 if not X_train.empty: print("开始训练点击预测模型...") model_click = train_incremental(X_train, y_train, categorical_features, params, num_rounds=1500, chunk_size=100000) # 在验证集上评估 val_preds = model_click.predict(X_val) val_logloss = log_loss(y_val, val_preds) print(f"验证集LogLoss: {val_logloss:.4f}") else: model_click = None print("⚠️ 训练数据为空,跳过点击预测模型训练") memory_monitor("训练点击模型后") # 构建完播率数据集 print("开始构建完播率数据集...") play_train_data = build_play_dataset(hist_play, vid_info, did_features, hist_click) memory_monitor("构建完播率数据集后") # 训练完播率模型 if not play_train_data.empty: X_play = play_train_data.drop(columns=['did', 'vid', 'play_time', 'item_duration', 'completion_rate'], errors='ignore') y_play = play_train_data['completion_rate'] else: X_play, y_play = pd.DataFrame(), pd.Series() print("⚠️ 完播率训练数据为空") if not X_play.empty and not y_play.empty: X_train_play, X_val_play, y_train_play, y_val_play = train_test_split( X_play, y_play, test_size=0.2, random_state=42 ) else: print("⚠️ 完播率训练数据为空,无法进行模型训练") X_train_play, X_val_play, y_train_play, y_val_play = pd.DataFrame(), pd.DataFrame(), pd.Series(), pd.Series() # 训练参数 params_reg = { 'objective': 'regression', 'metric': 'mae', 'boosting_type': 'gbdt', 'num_leaves': 63, 'learning_rate': 0.03, 'feature_fraction': 0.8, 'bagging_fraction': 0.8, 'bagging_freq': 5, 'lambda_l1': 0.1, 'lambda_l2': 0.1, 'min_data_in_leaf': 50, 'verbosity': -1 } # 增量训练完播率模型 if not X_train_play.empty: print("开始训练完播率模型...") model_play = train_incremental(X_train_play, y_train_play, play_categorical_features, params_reg, num_rounds=2000, chunk_size=100000) # 在验证集上评估 val_preds = model_play.predict(X_val_play) val_mae = mean_absolute_error(y_val_play, val_preds) print(f"验证集MAE: {val_mae:.4f}") else: model_play = None print("⚠️ 训练数据为空,跳过完播率模型训练") memory_monitor("训练完播率模型后") # 保存模型 if model_click: model_click.save_model('click_model.txt') print("点击预测模型已保存") if model_play: model_play.save_model('play_model.txt') print("完播率预测模型已保存") # 预测流程 print("开始加载预测数据...") to_predict_users = load_data_safely('testA_pred_did.csv', dtype={'did': 'category'}) to_predict_exposure = load_data_safely('testA_did_show.csv', dtype={'did': 'category', 'vid': 'category'}) # 执行预测 if not to_predict_users.empty and not to_predict_exposure.empty: print("开始生成预测结果...") submission = predict_for_test_data(to_predict_users, to_predict_exposure, did_features, vid_info) # 保存结果 if not submission.empty: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = f'submission_{timestamp}.csv' submission.to_csv(output_file, index=False) print(f"预测结果已保存至: {output_file}") else: print("⚠️ 预测结果为空,未保存文件") else: print("⚠️ 预测数据加载失败,无法生成结果") # 最终内存报告 end_mem = memory_monitor("处理完成") print(f"总内存消耗: {end_mem - start_mem:.2f} MB") # 历史数据加载函数 def load_historical_data(days=32): """高效加载历史数据,支持分批处理""" see_list, click_list, play_list = [], [], [] for day in tqdm(range(1, days + 1), desc="加载历史数据"): day_str = f"{day:02d}" # 加载曝光数据 see_path = f'see_{day_str}.csv' if os.path.exists(see_path): see = load_data_safely(see_path, usecols=['did', 'vid'], dtype={'did': 'category', 'vid': 'category'}) if not see.empty and 'did' in see.columns and 'vid' in see.columns: see_list.append(see) del see gc.collect() # 加载点击数据 click_path = f'click_{day_str}.csv' if os.path.exists(click_path): click = load_data_safely(click_path, usecols=['did', 'vid', 'click_time'], dtype={'did': 'category', 'vid': 'category'}) if not click.empty and 'click_time' in click.columns and 'did' in click.columns and 'vid' in click.columns: # 优化日期处理 click['date'] = pd.to_datetime(click['click_time'], errors='coerce').dt.date click = click.drop(columns=['click_time'], errors='ignore') click_list.append(click[['did', 'vid', 'date']]) del click gc.collect() # 加载播放数据 play_path = f'playplus_{day_str}.csv' if os.path.exists(play_path): play = load_data_safely(play_path, usecols=['did', 'vid', 'play_time'], dtype={'did': 'category', 'vid': 'category'}) if not play.empty and 'play_time' in play.columns and 'did' in play.columns and 'vid' in play.columns: play_list.append(play) del play gc.collect() gc.collect() # 确保返回三个DataFrame return ( pd.concat(see_list).drop_duplicates(['did', 'vid']) if see_list else pd.DataFrame(), pd.concat(click_list).drop_duplicates(['did', 'vid']) if click_list else pd.DataFrame(), pd.concat(play_list).drop_duplicates(['did', 'vid']) if play_list else pd.DataFrame() ) # 点击数据集构建 def build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1): """构建点击数据集,包含负样本采样""" if hist_exposure.empty or hist_click.empty: print("⚠️ 历史曝光或点击数据为空,无法构建数据集") return pd.DataFrame() # 标记正样本 hist_click = hist_click.copy() hist_click['label'] = 1 # 高效标记负样本 exposure_set = set(zip(hist_exposure['did'], hist_exposure['vid'])) click_set = set(zip(hist_click['did'], hist_click['vid'])) # 找出未点击的曝光 negative_set = exposure_set - click_set # 创建负样本DataFrame if negative_set: negative_dids, negative_vids = zip(*negative_set) negative_samples = pd.DataFrame({ 'did': list(negative_dids), 'vid': list(negative_vids), 'label': 0 }) # 采样负样本 if sample_ratio < 1.0: negative_samples = negative_samples.sample(frac=sample_ratio, random_state=42) else: negative_samples = pd.DataFrame(columns=['did', 'vid', 'label']) # 合并数据集 click_data = pd.concat([ hist_click[['did', 'vid', 'label']], negative_samples ], ignore_index=True) # 释放内存 del exposure_set, click_set, negative_set, negative_samples gc.collect() return click_data # 特征工程函数 def add_click_features(df, did_features, vid_info, hist_click, hist_play): """添加关键特征,避免内存溢出""" if df.empty: return df # 基础特征 if not did_features.empty and 'did' in did_features.columns: # 只取需要的列 did_cols = [col for col in did_features.columns if col not in ['did'] or col == 'did'] df = df.merge(did_features[did_cols], on='did', how='left') if not vid_info.empty and 'vid' in vid_info.columns: vid_cols = [col for col in vid_info.columns if col not in ['vid'] or col == 'vid'] df = df.merge(vid_info[vid_cols], on='vid', how='left') # 用户行为统计 if not hist_click.empty and 'did' in hist_click.columns: user_click_count = hist_click.groupby('did').size().rename('user_click_count') df = df.merge(user_click_count, on='did', how='left') else: df['user_click_count'] = 0 if not hist_play.empty and 'did' in hist_play.columns and 'play_time' in hist_play.columns: user_total_play = hist_play.groupby('did')['play_time'].sum().rename('user_total_play') df = df.merge(user_total_play, on='did', how='left') else: df['user_total_play'] = 0 if not hist_click.empty and 'vid' in hist_click.columns: video_click_count = hist_click.groupby('vid').size().rename('video_click_count') df = df.merge(video_click_count, on='vid', how='left') else: df['video_click_count'] = 0 if not hist_play.empty and 'vid' in hist_play.columns and 'play_time' in hist_play.columns: avg_play_time = hist_play.groupby('vid')['play_time'].mean().rename('avg_play_time') df = df.merge(avg_play_time, on='vid', how='left') else: df['avg_play_time'] = 0 # 填充缺失值 fill_values = { 'user_click_count': 0, 'user_total_play': 0, 'video_click_count': df['video_click_count'].median() if 'video_click_count' in df else 0, 'avg_play_time': df['avg_play_time'].median() if 'avg_play_time' in df else 0 } for col, value in fill_values.items(): if col in df: df[col] = df[col].fillna(value) # 添加时间相关特征 if 'date' in df: df['day_of_week'] = pd.to_datetime(df['date']).dt.dayofweek.astype('int8') df['hour'] = pd.to_datetime(df['date']).dt.hour.astype('int8') return df # 预测函数 def predict_for_test_data(test_users, test_exposure, did_features, vid_info): """为测试数据生成预测结果""" if test_users.empty or test_exposure.empty: print("⚠️ 测试数据为空,无法进行预测") return pd.DataFrame() # 合并测试数据 test_data = test_exposure.merge(test_users, on='did', how='left') # 添加特征 test_data = add_click_features( test_data, did_features, vid_info, pd.DataFrame(), # 无历史点击 pd.DataFrame() # 无历史播放 ) # 预测点击率 X_test = test_data.drop(columns=['did', 'vid', 'date'], errors='ignore') click_probs = [] if model_click and not X_test.empty: # 分块预测避免内存问题 click_probs = [] chunk_size = 100000 for i in range(0, len(X_test), chunk_size): chunk = X_test.iloc[i:i+chunk_size] click_probs.extend(model_click.predict(chunk)) else: click_probs = [0.5] * len(test_data) # 默认值 # 预测完播率 completion_rates = [] if model_play and not X_test.empty: # 添加视频时长信息 if not vid_info.empty and 'vid' in vid_info.columns and 'item_duration' in vid_info.columns: test_data = test_data.merge(vid_info[['vid', 'item_duration']], on='vid', how='left') else: test_data['item_duration'] = 1.0 # 分块预测 completion_rates = [] for i in range(0, len(X_test), chunk_size): chunk = X_test.iloc[i:i+chunk_size] completion_rates.extend(model_play.predict(chunk)) else: completion_rates = [0.7] * len(test_data) # 默认值 # 计算综合得分 test_data['click_prob'] = click_probs test_data['completion_rate'] = completion_rates test_data['score'] = test_data['click_prob'] * test_data['completion_rate'] # 为每个用户选择得分最高的视频 submission = test_data.sort_values('score', ascending=False).groupby('did').head(1) # 选择需要的列 submission = submission[['did', 'vid', 'completion_rate']].copy() # 重命名列 submission.columns = ['did', 'vid', 'completion_rate'] # 确保数据格式正确 submission['did'] = submission['did'].astype(str) submission['vid'] = submission['vid'].astype(str) submission['completion_rate'] = submission['completion_rate'].round(4) return submission # 主程序入口 if __name__ == "__main__": main()

filetype

帮我修改代码,一是原始数据只有30天,不是32天,二是完播率预测结果数值一样,需要调优模型;三是预测结果的行数应该与testA_pred_did.csv保持一致。import pandas as pd import numpy as np import lightgbm as lgb from lightgbm import early_stopping, log_evaluation import gc import os import chardet from sklearn.model_selection import train_test_split from tqdm import tqdm import joblib from datetime import datetime from sklearn.metrics import roc_auc_score # 添加AUC计算 # 修复:在函数定义后添加缩进的代码块 def load_data_safely(file_path, usecols=None, dtype=None, chunksize=100000): """安全加载大型CSV文件,优化内存使用""" try: if not os.path.exists(file_path): print(f"⚠️ 文件不存在: {file_path}") return pd.DataFrame() # 自动检测编码 with open(file_path, 'rb') as f: result = chardet.detect(f.read(100000)) encoding = result['encoding'] if result['confidence'] > 0.7 else 'latin1' # 分批读取并优化内存 chunks = [] reader = pd.read_csv( file_path, encoding=encoding, usecols=usecols, dtype=dtype, chunksize=chunksize, low_memory=False ) for chunk in tqdm(reader, desc=f"加载 {os.path.basename(file_path)}"): # 优化分类列内存 if dtype: # 确保dtype不为空 for col in chunk.columns: if col in dtype and dtype[col] == 'category': chunk[col] = chunk[col].astype('category').cat.as_ordered() chunks.append(chunk) if chunks: return pd.concat(chunks, ignore_index=True) return pd.DataFrame() except Exception as e: print(f"⚠️ 加载 {file_path} 失败: {str(e)}") return pd.DataFrame() # 修复:确保所有函数都有缩进的代码块 def load_historical_data(days=32): """高效加载历史数据,支持分批处理""" see_list, click_list, play_list = [], [], [] for day in tqdm(range(1, days + 1), desc="加载历史数据"): day_str = f"{day:02d}" # 加载曝光数据 see_path = f'see_{day_str}.csv' if os.path.exists(see_path): see = load_data_safely(see_path, usecols=['did', 'vid'], dtype={'did': 'category', 'vid': 'category'}) if not see.empty and 'did' in see.columns and 'vid' in see.columns: see_list.append(see) del see gc.collect() # 加载点击数据 click_path = f'click_{day_str}.csv' if os.path.exists(click_path): click = load_data_safely(click_path, usecols=['did', 'vid', 'click_time'], dtype={'did': 'category', 'vid': 'category'}) if not click.empty and 'click_time' in click.columns and 'did' in click.columns and 'vid' in click.columns: # 优化日期处理 click['date'] = pd.to_datetime(click['click_time'], errors='coerce').dt.date click = click.drop(columns=['click_time'], errors='ignore') click_list.append(click[['did', 'vid', 'date']]) del click gc.collect() # 加载播放数据 play_path = f'playplus_{day_str}.csv' if os.path.exists(play_path): play = load_data_safely(play_path, usecols=['did', 'vid', 'play_time'], dtype={'did': 'category', 'vid': 'category'}) if not play.empty and 'play_time' in play.columns and 'did' in play.columns and 'vid' in play.columns: play_list.append(play) del play gc.collect() gc.collect() # 确保返回三个DataFrame,即使某些为空 return ( pd.concat(see_list).drop_duplicates(['did', 'vid']) if see_list else pd.DataFrame(), pd.concat(click_list).drop_duplicates(['did', 'vid']) if click_list else pd.DataFrame(), pd.concat(play_list).drop_duplicates(['did', 'vid']) if play_list else pd.DataFrame() ) # 修复:添加缺失的函数定义 def build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1): """构建点击数据集,包含负样本采样""" if hist_exposure.empty or hist_click.empty: print("⚠️ 历史曝光或点击数据为空,无法构建数据集") return pd.DataFrame() # 标记正样本 hist_click = hist_click.copy() hist_click['label'] = 1 # 高效标记负样本(使用集合操作) exposure_set = set(zip(hist_exposure['did'], hist_exposure['vid'])) click_set = set(zip(hist_click['did'], hist_click['vid'])) # 找出未点击的曝光 negative_set = exposure_set - click_set # 创建负样本DataFrame if negative_set: negative_dids, negative_vids = zip(*negative_set) negative_samples = pd.DataFrame({ 'did': list(negative_dids), 'vid': list(negative_vids), 'label': 0 }) # 采样负样本 if sample_ratio < 1.0: negative_samples = negative_samples.sample(frac=sample_ratio, random_state=42) else: negative_samples = pd.DataFrame(columns=['did', 'vid', 'label']) # 合并数据集 click_data = pd.concat([ hist_click[['did', 'vid', 'label']], negative_samples ], ignore_index=True) # 释放内存 del exposure_set, click_set, negative_set, negative_samples gc.collect() return click_data # 修复:添加缺失的函数定义 def add_click_features(df, did_features, vid_info, hist_click, hist_play): """添加关键特征,避免内存溢出""" if df.empty: return df # 基础特征(使用索引加速合并) if not did_features.empty and 'did' in did_features.columns: df = df.merge(did_features, on='did', how='left') if not vid_info.empty and 'vid' in vid_info.columns: df = df.merge(vid_info, on='vid', how='left') # 用户行为统计 user_click_count = pd.Series(dtype='int') if not hist_click.empty and 'did' in hist_click.columns: user_click_count = hist_click.groupby('did').size().rename('user_click_count') if not user_click_count.empty: df = df.merge(user_click_count, on='did', how='left') else: df['user_click_count'] = 0 user_total_play = pd.Series(dtype='float') if not hist_play.empty and 'did' in hist_play.columns and 'play_time' in hist_play.columns: user_total_play = hist_play.groupby('did')['play_time'].sum().rename('user_total_play') if not user_total_play.empty: df = df.merge(user_total_play, on='did', how='left') else: df['user_total_play'] = 0 # 视频热度统计 video_click_count = pd.Series(dtype='int') if not hist_click.empty and 'vid' in hist_click.columns: video_click_count = hist_click.groupby('vid').size().rename('video_click_count') if not video_click_count.empty: df = df.merge(video_click_count, on='vid', how='left') else: df['video_click_count'] = 0 avg_play_time = pd.Series(dtype='float') if not hist_play.empty and 'vid' in hist_play.columns and 'play_time' in hist_play.columns: avg_play_time = hist_play.groupby('vid')['play_time'].mean().rename('avg_play_time') if not avg_play_time.empty: df = df.merge(avg_play_time, on='vid', how='left') else: df['avg_play_time'] = 0 # 填充缺失值 fill_values = { 'user_click_count': 0, 'user_total_play': 0, 'video_click_count': df['video_click_count'].median() if 'video_click_count' in df else 0, 'avg_play_time': df['avg_play_time'].median() if 'avg_play_time' in df else 0 } for col, value in fill_values.items(): if col in df: df[col] = df[col].fillna(value) # 添加时间相关特征 if 'date' in df: df['day_of_week'] = pd.to_datetime(df['date']).dt.dayofweek.astype('int8') df['hour'] = pd.to_datetime(df['date']).dt.hour.astype('int8') return df # 修复:添加缺失的函数定义 def get_categorical_features(df, base_features): """动态获取存在的分类特征""" existing_features = [] for feature in base_features: if feature in df.columns: try: # 尝试转换为数值,如果是数值则跳过 pd.to_numeric(df[feature], errors='raise') except: existing_features.append(feature) # 确保转换为category类型 df[feature] = df[feature].astype('category').cat.as_ordered() return existing_features # 修复:添加缺失的函数定义 def build_play_dataset(hist_play, vid_info, did_features, hist_click): """构建完播率数据集,优化内存使用""" if hist_play.empty: print("⚠️ 历史播放数据为空,无法构建完播率数据集") return pd.DataFrame() # 基础数据 play_data = hist_play[['did', 'vid', 'play_time']].copy() # 添加视频时长信息 if not vid_info.empty and 'vid' in vid_info.columns and 'item_duration' in vid_info.columns: play_data = play_data.merge( vid_info[['vid', 'item_duration']], on='vid', how='left' ) else: play_data['item_duration'] = 1.0 # 默认值 # 计算完播率 play_data['completion_rate'] = play_data['play_time'] / play_data['item_duration'] play_data['completion_rate'] = play_data['completion_rate'].clip(upper=1.0) # 添加用户特征 if not did_features.empty and 'did' in did_features.columns: play_data = play_data.merge( did_features, on='did', how='left' ) # 添加视频特征 if not vid_info.empty and 'vid' in vid_info.columns: vid_cols = [col for col in vid_info.columns if col != 'item_duration'] play_data = play_data.merge( vid_info[vid_cols], on='vid', how='left' ) # 用户平均完播率 play_data['user_avg_completion'] = play_data.groupby('did')['completion_rate'].transform('mean') play_data['user_play_count'] = play_data.groupby('did')['completion_rate'].transform('count') # 视频平均完播率 play_data['video_avg_completion'] = play_data.groupby('vid')['completion_rate'].transform('mean') play_data['video_completion_std'] = play_data.groupby('vid')['completion_rate'].transform('std') # 用户-视频互动特征 if not hist_click.empty and 'did' in hist_click.columns and 'vid' in hist_click.columns: user_vid_clicks = hist_click.groupby(['did', 'vid']).size().reset_index(name='user_vid_clicks') play_data = play_data.merge(user_vid_clicks, on=['did', 'vid'], how='left') else: play_data['user_vid_clicks'] = 0 # 填充缺失值 play_data['user_avg_completion'].fillna(play_data['completion_rate'].mean(), inplace=True) play_data['user_play_count'].fillna(1, inplace=True) play_data['video_avg_completion'].fillna(play_data['completion_rate'].median(), inplace=True) play_data['video_completion_std'].fillna(0, inplace=True) play_data['user_vid_clicks'].fillna(0, inplace=True) return play_data # 修复:添加缺失的函数定义 def predict_for_test_data(test_users, test_exposure, did_features, vid_info): """为测试数据生成预测结果 - 修改为只保留点击概率最高的vid""" if test_users.empty or test_exposure.empty: print("⚠️ 测试数据为空,无法进行预测") return pd.DataFrame() # 合并测试数据 test_data = test_exposure.merge(test_users, on='did', how='left') # 添加特征 test_data = add_click_features( test_data, did_features, vid_info, pd.DataFrame(), # 无历史点击 pd.DataFrame() # 无历史播放 ) # 动态获取分类特征 test_categorical_features = get_categorical_features(test_data, base_categorical_features) # 预测点击率 X_test = test_data.drop(columns=['did', 'vid', 'date'], errors='ignore') click_probs = [] if model_click and not X_test.empty: click_probs = model_click.predict(X_test) else: click_probs = [0.5] * len(test_data) # 默认值 # 预测完播率 completion_rates = [] if model_play and not X_test.empty: # 添加视频时长信息 if not vid_info.empty and 'vid' in vid_info.columns and 'item_duration' in vid_info.columns: test_data = test_data.merge(vid_info[['vid', 'item_duration']], on='vid', how='left') else: test_data['item_duration'] = 1.0 completion_rates = model_play.predict(X_test) else: completion_rates = [0.7] * len(test_data) # 默认值 # 存储预测结果 test_data['click_prob'] = click_probs test_data['completion_rate'] = completion_rates # 修改:只保留每个did点击概率最高的vid result = test_data.sort_values('click_prob', ascending=False).groupby('did').head(1) # 选择需要的列 result = result[['did', 'vid', 'completion_rate']].copy() # 重命名列 result.columns = ['did', 'vid', 'predicted_completion_rate'] return result # 主程序流程 if __name__ == "__main__": # 定义内存优化的数据类型 dtypes = { 'did': 'category', 'vid': 'category', 'play_time': 'float32' } # 可选特征 - 只有在数据中存在时才添加 optional_features = { 'item_cid': 'category', 'item_type': 'category', 'item_assetSource': 'category', 'item_classify': 'category', 'item_isIntact': 'category', 'sid': 'category', 'stype': 'category' } # 添加特征字段 for i in range(88): dtypes[f'f{i}'] = 'float32' # 加载核心数据 print("开始加载核心数据...") did_features = load_data_safely('did_features_table.csv', dtype=dtypes) vid_info = load_data_safely('vid_info_table.csv', dtype=dtypes) # 添加可选特征到dtypes(仅当列存在时) for feature, dtype in optional_features.items(): if not vid_info.empty and feature in vid_info.columns: dtypes[feature] = dtype # 重新加载数据以确保所有列使用正确的数据类型 if os.path.exists('did_features_table.csv'): did_features = load_data_safely('did_features_table.csv', dtype=dtypes) else: print("⚠️ did_features_table.csv 不存在") did_features = pd.DataFrame() if os.path.exists('vid_info_table.csv'): vid_info = load_data_safely('vid_info_table.csv', dtype=dtypes) else: print("⚠️ vid_info_table.csv 不存在") vid_info = pd.DataFrame() # 加载历史数据 - 确保所有变量都被定义 print("开始加载历史数据...") hist_exposure, hist_click, hist_play = load_historical_data(days=32) # 打印历史数据状态 print(f"历史曝光数据形状: {hist_exposure.shape if not hist_exposure.empty else '空'}") print(f"历史点击数据形状: {hist_click.shape if not hist_click.empty else '空'}") print(f"历史播放数据形状: {hist_play.shape if not hist_play.empty else '空'}") # 构建点击数据集 if not hist_exposure.empty and not hist_click.empty: print("构建点击数据集...") click_train_data = build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1) else: print("⚠️ 无法构建点击数据集,因为历史曝光或点击数据为空") click_train_data = pd.DataFrame() # 添加特征 - 确保所有参数都已定义 if not click_train_data.empty: print("开始构建点击特征...") click_train_data = add_click_features( click_train_data, did_features, vid_info, hist_click, # 确保hist_click已定义 hist_play # 确保hist_play已定义 ) else: print("⚠️ 点击数据集为空,跳过特征构建") # 基础分类特征列表 base_categorical_features = [ 'item_cid', 'item_type', 'item_assetSource', 'item_classify', 'item_isIntact', 'sid', 'stype', 'day_of_week', 'hour' ] # 动态获取存在的分类特征 categorical_features = [] if not click_train_data.empty: categorical_features = get_categorical_features(click_train_data, base_categorical_features) print(f"使用的分类特征: {categorical_features}") else: print("⚠️ 点击训练数据为空,无法获取分类特征") # 准备训练数据 if not click_train_data.empty: if 'date' in click_train_data.columns: X = click_train_data.drop(columns=['did', 'vid', 'label', 'date'], errors='ignore') else: X = click_train_data.drop(columns=['did', 'vid', 'label'], errors='ignore') y = click_train_data['label'] else: X, y = pd.DataFrame(), pd.Series() print("⚠️ 点击训练数据为空") # 划分数据集 if not X.empty and not y.empty: X_train, X_val, y_train, y_val = train_test_split( X, y, test_size=0.2, random_state=42, stratify=y ) else: print("⚠️ 训练数据为空,无法进行模型训练") X_train, X_val, y_train, y_val = pd.DataFrame(), pd.DataFrame(), pd.Series(), pd.Series() # 训练模型(优化参数) params = { 'objective': 'binary', 'metric': 'binary_logloss', 'boosting_type': 'gbdt', 'num_leaves': 63, 'learning_rate': 0.05, 'feature_fraction': 0.8, 'bagging_fraction': 0.8, 'bagging_freq': 5, 'min_child_samples': 100, 'verbosity': -1 } model_click = None if not X_train.empty: train_data = lgb.Dataset(X_train, label=y_train, categorical_feature=categorical_features) val_data = lgb.Dataset(X_val, label=y_val, categorical_feature=categorical_features) print("开始训练点击预测模型...") model_click = lgb.train( params, train_data, num_boost_round=1500, valid_sets=[val_data], callbacks=[ early_stopping(stopping_rounds=100, verbose=True), log_evaluation(period=50) ] ) # 计算并输出AUC if not X_val.empty and not y_val.empty and model_click: y_val_pred = model_click.predict(X_val) auc_score = roc_auc_score(y_val, y_val_pred) print(f"📊 点击率模型在验证集上的AUC: {auc_score:.6f}") with open('model_metrics.txt', 'w') as f: f.write(f"点击率模型AUC: {auc_score:.6f}\n") else: print("⚠️ 训练数据为空,跳过点击预测模型训练") # 构建完播率数据集 print("开始构建完播率数据集...") if 'hist_play' in globals() and 'vid_info' in globals() and 'did_features' in globals() and 'hist_click' in globals(): play_train_data = build_play_dataset(hist_play, vid_info, did_features, hist_click) else: print("⚠️ 无法构建完播率数据集,因为所需变量未定义") play_train_data = pd.DataFrame() # 训练完播率模型 model_play = None if not play_train_data.empty: X_play = play_train_data.drop(columns=['did', 'vid', 'play_time', 'item_duration', 'completion_rate'], errors='ignore') y_play = play_train_data['completion_rate'] else: X_play, y_play = pd.DataFrame(), pd.Series() print("⚠️ 完播率训练数据为空") if not X_play.empty and not y_play.empty: X_train_play, X_val_play, y_train_play, y_val_play = train_test_split( X_play, y_play, test_size=0.2, random_state=42 ) else: print("⚠️ 完播率训练数据为空,无法进行模型训练") X_train_play, X_val_play, y_train_play, y_val_play = pd.DataFrame(), pd.DataFrame(), pd.Series(), pd.Series() # 获取完播率模型的分类特征 play_categorical_features = [] if not play_train_data.empty: play_categorical_features = get_categorical_features(play_train_data, base_categorical_features) print(f"完播率模型使用的分类特征: {play_categorical_features}") else: print("⚠️ 完播率训练数据为空,无法获取分类特征") # 训练参数 params_reg = { 'objective': 'regression', 'metric': 'mae', 'boosting_type': 'gbdt', 'num_leaves': 63, 'learning_rate': 0.03, 'feature_fraction': 0.8, 'bagging_fraction': 0.8, 'bagging_freq': 5, 'lambda_l1': 0.1, 'lambda_l2': 0.1, 'min_data_in_leaf': 50, 'verbosity': -1 } if not X_train_play.empty: train_data_play = lgb.Dataset(X_train_play, label=y_train_play, categorical_feature=play_categorical_features) val_data_play = lgb.Dataset(X_val_play, label=y_val_play, categorical_feature=play_categorical_features) print("开始训练完播率模型...") model_play = lgb.train( params_reg, train_data_play, num_boost_round=2000, valid_sets=[val_data_play], callbacks=[ early_stopping(stopping_rounds=100, verbose=True), log_evaluation(period=50) ] ) else: print("⚠️ 训练数据为空,跳过完播率模型训练") # 保存模型 if model_click: model_click.save_model('click_model.txt') if model_play: model_play.save_model('play_model.txt') joblib.dump(base_categorical_features, 'categorical_features.pkl') # 加载预测数据 print("开始加载预测数据...") to_predict_users = load_data_safely('testA_pred_did.csv', dtype={'did': 'category'}) to_predict_exposure = load_data_safely('testA_did_show.csv', dtype={'did': 'category', 'vid': 'category'}) # 执行预测 if not to_predict_users.empty and not to_predict_exposure.empty: print("开始生成预测结果...") submission = predict_for_test_data(to_predict_users, to_predict_exposure, did_features, vid_info) # 保存结果 if not submission.empty: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = f'submission_{timestamp}.csv' # 修改:保存为无表头CSV submission.to_csv(output_file, index=False, header=False) print(f"预测结果已保存至: {output_file}") print(f"结果格式: 共 {len(submission)} 行") print(f"列信息: [did, vid, predicted_completion_rate]") else: print("⚠️ 预测结果为空,未保存文件") else: print("⚠️ 预测数据加载失败,无法生成结果")

filetype

请按照上面的方法,帮我补充并优化代码:import pandas as pd import numpy as np import xgboost as xgb import lightgbm as lgb from sklearn.model_selection import train_test_split from sklearn.metrics import roc_auc_score from sklearn.cluster import MiniBatchKMeans from sklearn.preprocessing import StandardScaler from sklearn.impute import SimpleImputer from sklearn.pipeline import make_pipeline import chardet import gc import joblib import dask.dataframe as dd from dask.diagnostics import ProgressBar from datetime import datetime import warnings warnings.filterwarnings('ignore') # 配置Dask进度条显示 ProgressBar().register() def detect_encoding(file_path): """自动检测文件编码""" with open(file_path, 'rb') as f: result = chardet.detect(f.read(10000)) return result['encoding'], result['confidence'] def load_data_with_dask(days=32): """使用Dask加载和处理大规模数据""" see_dfs, click_dfs, play_dfs = [], [], [] # 并行加载32天数据 for i in range(1, days + 1): day = f"{i:02d}" # 加载曝光数据 see = dd.read_csv( f'see_{day}.csv', dtype={'did': 'str', 'vid': 'str'}, blocksize='128MB' ) see = see.assign(day=day) see_dfs.append(see) # 加载点击数据 click = dd.read_csv( f'click_{day}.csv', dtype={'did': 'str', 'vid': 'str', 'item_cid': 'str'}, blocksize='128MB' ) click = click[['did', 'vid', 'click_time']] click_dfs.append(click) # 加载播放数据 play = dd.read_csv( f'playplus_{day}.csv', dtype={'did': 'str', 'vid': 'str', 'item_cid': 'str'}, blocksize='128MB' ) play = play[['did', 'vid', 'play_time']] play_dfs.append(play) # 合并所有数据 all_see = dd.concat(see_dfs).drop_duplicates(['did', 'vid']) all_click = dd.concat(click_dfs).drop_duplicates(['did', 'vid']) all_play = dd.concat(play_dfs) # 计算基本统计数据 total_users = all_see['did'].nunique().compute() total_videos = all_see['vid'].nunique().compute() print(f"Total unique users: {total_users}, Total unique videos: {total_videos}") return all_see, all_click, all_play def prepare_user_features(all_see, all_click, all_play, video_info): """为有记录的用户准备特征""" print("Preparing user behavior features for users with history...") # 计算用户曝光统计 user_exposure = all_see.groupby('did').size().rename('user_exposure_count').compute().astype('int32') # 计算用户点击统计 user_click = all_click.groupby('did').size().rename('user_click_count').compute().astype('int32') # 计算用户播放时长 user_play = all_play.groupby('did')['play_time'].sum().rename('total_play_time').compute().astype('float32') # 合并用户行为特征 user_features = pd.concat([user_exposure, user_click, user_play], axis=1).fillna(0) user_features['user_ctr'] = user_features['user_click_count'] / (user_features['user_exposure_count'] + 1e-6) user_features['avg_play_time'] = user_features['total_play_time'] / (user_features['user_click_count'] + 1e-6) # 添加用户活跃天数 active_days = all_see.groupby('did')['day'].nunique().compute().rename('active_days').astype('int8') user_features = user_features.merge(active_days, left_index=True, right_index=True, how='left').fillna(0) return user_features.reset_index() def prepare_video_features(all_see, all_click, all_play, video_info): """准备视频特征""" print("Preparing video popularity features...") # 计算视频曝光 video_exposure = all_see.groupby('vid').size().rename('video_exposure_count').compute().astype('int32') # 计算视频点击 video_click = all_click.groupby('vid').size().rename('video_click_count').compute().astype('int32') # 计算视频播放时长 video_play = all_play.groupby('vid')['play_time'].sum().rename('total_play_time').compute().astype('float32') # 合并视频特征 video_features = pd.concat([video_exposure, video_click, video_play], axis=1).fillna(0) video_features['video_ctr'] = video_features['video_click_count'] / (video_features['video_exposure_count'] + 1e-6) video_features['avg_play_time'] = video_features['total_play_time'] / (video_features['video_click_count'] + 1e-6) # 合并视频元数据 video_features = video_features.merge(video_info, left_index=True, right_on='vid', how='left') # 类别特征编码 for cat_col in ['item_type', 'item_assetSource', 'item_classify']: video_features[cat_col] = video_features[cat_col].astype('category') return video_features def prepare_cold_start_cluster(user_features_table, history_users): """为冷启动用户准备聚类模型""" print("Preparing clustering model for cold-start users...") # 只使用有记录的用户进行聚类训练 trained_users = history_users['did'].tolist() user_features_table['has_history'] = user_features_table['did'].isin(trained_users) # 提取有历史记录用户的特征 trained_user_features = user_features_table[user_features_table['has_history']] feature_cols = [f'f{i}' for i in range(0, 87)] X = trained_user_features[feature_cols].values # 使用MiniBatchKMeans处理大数据 pipe = make_pipeline( SimpleImputer(strategy='mean'), StandardScaler(), MiniBatchKMeans(n_clusters=100, batch_size=5000, n_init=3) ) # 训练聚类模型 cluster_model = pipe.fit(X) trained_user_features['cluster'] = cluster_model.labels_ # 保存模型 joblib.dump(cluster_model, 'cold_start_cluster_model.pkl') return cluster_model def prepare_samples(all_see, all_click, all_play, user_features_table): """准备训练样本,区分有记录和无记录用户""" print("Preparing training samples...") # 加载视频元数据 video_info = pd.read_csv('vid_info_table.csv', encoding='gbk', dtype={'vid': 'str'}) # 准备用户和视频特征 user_behavior_features = prepare_user_features(all_see, all_click, all_play, video_info) video_features = prepare_video_features(all_see, all_click, all_play, video_info) # 标记有历史记录的用户 history_users = all_see['did'].unique().compute().to_frame(name='did') user_features_table['has_history'] = user_features_table['did'].isin(history_users['did']) # 准备冷启动聚类模型 cluster_model = prepare_cold_start_cluster(user_features_table, history_users) # 为有记录用户准备训练样本 train_samples = dd.merge(all_see, all_click, on=['did', 'vid'], how='left', suffixes=('', '_click')) train_samples = dd.merge(train_samples, all_play, on=['did', 'vid'], how='left') train_samples = dd.merge(train_samples, user_behavior_features, on='did', how='left') train_samples = dd.merge(train_samples, video_features, on='vid', how='left') # 创建标签(点击为1,否则为0) train_samples['label'] = (~train_samples['click_time'].isnull()).astype('int8') # 优化内存使用 train_samples = train_samples.compute() for col in train_samples.select_dtypes(include='float64').columns: train_samples[col] = train_samples[col].astype('float32') print(f"Training samples shape: {train_samples.shape}") return train_samples, cluster_model, video_features def train_behavior_model(samples, feature_columns): """训练有记录用户的行为预测模型""" print("Training behavior prediction model...") # 准备特征和标签 X = samples[feature_columns] y = samples['label'] # 划分训练验证集(时间序列分割) days = samples['day'].unique() train_days = days[:-3] # 前29天用于训练 test_days = days[-3:] # 最后3天用于验证 X_train = samples[samples['day'].isin(train_days)][feature_columns] y_train = samples[samples['day'].isin(train_days)]['label'] X_val = samples[samples['day'].isin(test_days)][feature_columns] y_val = samples[samples['day'].isin(test_days)]['label'] # LightGBM参数设置 params = { 'boosting_type': 'gbdt', 'objective': 'binary', 'metric': 'auc', 'learning_rate': 0.05, 'num_leaves': 63, 'feature_fraction': 0.8, 'bagging_fraction': 0.8, 'bagging_freq': 5, 'verbose': -1, 'seed': 42, 'max_depth': 7, 'min_child_samples': 500, 'n_jobs': 8 } # 训练模型 behavior_model = lgb.train( params, lgb.Dataset(X_train, label=y_train), num_boost_round=1000, valid_sets=[lgb.Dataset(X_val, label=y_val)], callbacks=[ lgb.early_stopping(stopping_rounds=30, verbose=False), lgb.log_evaluation(period=50) ] ) # 保存模型 behavior_model.save_model('behavior_model.txt') return behavior_model def predict_behavior(model, test_data, feature_columns): """预测有记录用户的行为""" print("Predicting behavior for users with history...") # 准备特征矩阵 X = test_data[feature_columns] # 预测点击概率 preds = model.predict(X) return preds def predict_cold_start(cluster_model, video_features, user_features_table): """预测冷启动用户的偏好""" print("Predicting preferences for cold-start users...") # 获取冷启动用户 cold_start_users = user_features_table[~user_features_table['has_history']] feature_cols = [f'f{i}' for i in range(0, 87)] # 预测用户所属聚类 X = cold_start_users[feature_cols].values cold_start_users['cluster'] = cluster_model.predict(X) # 加载热门视频(每个聚类Top 50视频) cluster_top_videos = joblib.load('cluster_top_videos.pkl') # 为每个用户生成推荐 cold_start_users['recommended_vid'] = cold_start_users['cluster'].map( lambda c: cluster_top_videos.get(c, []).copy() ) # 对推荐列表进行截断(每个用户最多100个推荐) cold_start_users['recommended_vid'] = cold_start_users['recommended_vid'].apply( lambda lst: lst[:min(100, len(lst))] ) return cold_start_users[['did', 'recommended_vid']] def save_cluster_top_videos(video_features, cluster_model, behavior_data): """保存每个聚类的热门视频""" print("Saving top videos for each cluster...") # 获取每个聚类的热门视频(基于播放时长和点击率) video_cluster_score = video_features[['vid', 'video_ctr', 'avg_play_time']].copy() video_cluster_score['popularity_score'] = (video_cluster_score['video_ctr'] * video_cluster_score['avg_play_time'] * 1000) # 获取训练数据中的聚类分配 cluster_model = joblib.load('cold_start_cluster_model.pkl') behavior_data['cluster'] = cluster_model.predict(behavior_data.iloc[:, 5:92]) # 统计每个聚类的视频偏好 cluster_video_pref = behavior_data.groupby(['cluster', 'vid'])['play_time'].sum().reset_index() cluster_video_pref = cluster_video_pref.merge(video_cluster_score, on='vid') # 为每个聚类计算Top视频 cluster_top_videos = {} for cluster_id in behavior_data['cluster'].unique(): cluster_vids = cluster_video_pref[cluster_video_pref['cluster'] == cluster_id] top_vids = cluster_vids.sort_values('popularity_score', ascending=False)['vid'].head(100).tolist() cluster_top_videos[cluster_id] = top_vids # 保存聚类视频偏好 joblib.dump(cluster_top_videos, 'cluster_top_videos.pkl') return cluster_top_videos def main(): """主执行流程""" # 1. 自动检测编码 encoding, confidence = detect_encoding('see_01.csv') print(f"Detected encoding: {encoding} (confidence: {confidence:.2f})") # 2. 加载基础数据 print("Loading base data...") all_see, all_click, all_play = load_data_with_dask(days=32) # 3. 加载用户特征表 user_features_table = pd.read_csv('did_features_table.csv', encoding='gbk', dtype={'did': 'str'}) # 4. 准备样本和聚类模型 train_samples, cluster_model, video_features = prepare_samples(all_see, all_click, all_play, user_features_table) # 5. 保存聚类热门视频 save_cluster_top_videos(video_features, cluster_model, train_samples) # 6. 定义模型特征列 feature_columns = [ 'user_exposure_count', 'user_click_count', 'user_ctr', 'video_exposure_count', 'video_click_count', 'video_ctr', 'item_duration', 'item_serialno', 'item_classify', 'item_type', 'item_assetSource' ] # 7. 训练行为预测模型 behavior_model = train_behavior_model(train_samples, feature_columns) # 8. 加载测试数据 print("Loading test data...") test_data = dd.read_csv('testA_did_show.csv', dtype={'did': 'str', 'vid': 'str'}) test_data = test_data.compute() # 9. 合并用户历史状态 history_users = train_samples['did'].unique() test_data['has_history'] = test_data['did'].isin(history_users) # 10. 对于有历史记录的用户 - 使用模型预测 if test_data['has_history'].any(): print("Processing users with history...") history_users_test = test_data[test_data['has_history']].copy() # 合并特征 history_users_test = history_users_test.merge( user_features_table, on='did', how='left', suffixes=('', '_feat') ) history_users_test = history_users_test.merge( video_features, on='vid', how='left' ) # 预测点击概率 history_preds = predict_behavior(behavior_model, history_users_test, feature_columns) history_users_test['click_prob'] = history_preds # 保存结果 history_users_test[['did', 'vid', 'click_prob']].to_csv('history_user_predictions.csv', index=False) # 11. 对于无历史记录的用户 - 使用聚类推荐 if not test_data['has_history'].all(): print("Processing cold-start users...") cold_start_preds = predict_cold_start(cluster_model, video_features, user_features_table) cold_start_preds.to_csv('cold_start_predictions.csv', index=False) print("Prediction completed!") if __name__ == '__main__': main()

strong0511
  • 粉丝: 11
上传资源 快速赚钱