用python设计一个函数drop_list(),输入一个由整数构成的列表,在不创建其他列表的情况下原地删除列表中的所有奇数(即该函数空间复杂度为O(1))
时间: 2024-05-11 21:15:38 浏览: 64
好的,我明白了。以下是 Python 实现:
```python
def drop_list(lst):
i = 0
while i < len(lst):
if lst[i] % 2 == 1:
lst.pop(i)
else:
i += 1
```
这个函数会原地修改列表,删除所有奇数,同时保留偶数。具体来说,它使用一个索引 `i` 来遍历列表中的每个元素。如果当前元素是奇数,就删除它;否则,将索引加 1,跳到下一个元素。这样循环直到遍历完整个列表,最终剩下的元素都是偶数。
希望这个函数能够帮到你。如果有任何问题,请随时问我。
相关问题
检查并优化:import pandas as pd import numpy as np import lightgbm as lgb import gc import os import chardet from sklearn.model_selection import train_test_split from tqdm import tqdm import psutil from sklearn.metrics import log_loss, mean_absolute_error from scipy.sparse import hstack, csr_matrix, save_npz, load_npz import warnings warnings.filterwarnings('ignore') # 内存优化函数 - 增强版 def optimize_dtypes(df, downcast_int=True, downcast_float=True, category_threshold=0.5): """优化DataFrame的数据类型以减少内存占用""" if df.empty: return df # 转换整数列为最小可用类型 if downcast_int: int_cols = df.select_dtypes(include=['int']).columns for col in int_cols: df[col] = pd.to_numeric(df[col], downcast='integer') # 转换浮点列为float32 if downcast_float: float_cols = df.select_dtypes(include=['float']).columns for col in float_cols: # 优先转换为float32而不是downcast='float'以获得更好控制 df[col] = df[col].astype(np.float32) # 转换对象列为分类类型 obj_cols = df.select_dtypes(include=['object']).columns for col in obj_cols: num_unique = df[col].nunique() num_total = len(df) if num_unique / num_total < category_threshold: df[col] = df[col].astype('category') return df # 增强数据加载函数 def load_data_safely(file_path, usecols=None, dtype=None, chunksize=50000, verbose=True): """安全加载大型CSV文件,优化内存使用""" try: if not os.path.exists(file_path): print(f"⚠️ 文件不存在: {file_path}") return pd.DataFrame() # 自动检测编码 with open(file_path, 'rb') as f: result = chardet.detect(f.read(100000)) encoding = result['encoding'] if result['confidence'] > 0.7 else 'latin1' # 获取文件大小用于进度条 file_size = os.path.getsize(file_path) / (1024 ** 2) # MB desc = f"加载 {os.path.basename(file_path)} ({file_size:.1f}MB)" # 分批读取并优化内存 chunks = [] reader = pd.read_csv( file_path, encoding=encoding, usecols=usecols, dtype=dtype, chunksize=chunksize, low_memory=False ) for chunk in tqdm(reader, desc=desc, disable=not verbose): # 优化数据类型 chunk = optimize_dtypes(chunk) chunks.append(chunk) if chunks: result = pd.concat(chunks, ignore_index=True) # 再次整体优化 result = optimize_dtypes(result) return result return pd.DataFrame() except Exception as e: print(f"⚠️ 加载 {file_path} 失败: {str(e)}") return pd.DataFrame() # 稀疏矩阵转换函数 - 优化版 def to_sparse_matrix(df, columns, fillna='MISSING', dtype=np.int8): """将分类特征转换为稀疏矩阵表示""" from sklearn.preprocessing import OneHotEncoder # 预处理数据 sparse_data = df[columns].fillna(fillna).astype(str) # 使用OneHotEncoder替代get_dummies以获得更好性能 encoder = OneHotEncoder(handle_unknown='ignore', sparse_output=True, dtype=dtype) sparse_matrix = encoder.fit_transform(sparse_data) return sparse_matrix, encoder # 增量训练函数 - 优化内存管理 def train_incremental(X, y, categorical_features, params, num_rounds=1000, chunk_size=100000): """分块增量训练模型以减少内存占用""" model = None callbacks = [lgb.early_stopping(stopping_rounds=50, verbose=0), lgb.log_evaluation(period=100)] for i in tqdm(range(0, len(X), chunk_size), desc="增量训练"): chunk_end = min(i + chunk_size, len(X)) # 使用视图避免复制数据 X_chunk = X.iloc[i:chunk_end] y_chunk = y.iloc[i:chunk_end] # 创建数据集后立即释放原始数据 train_data = lgb.Dataset( X_chunk, label=y_chunk, categorical_feature=categorical_features, free_raw_data=True # 训练后释放原始数据 ) if model is None: model = lgb.train( params, train_data, num_boost_round=num_rounds, callbacks=callbacks, keep_training_booster=True ) else: model = lgb.train( params, train_data, num_boost_round=num_rounds, init_model=model, callbacks=callbacks, keep_training_booster=True ) # 显式释放内存 del train_data, X_chunk, y_chunk gc.collect() return model # 历史数据加载函数 - 优化内存 def load_historical_data(days=32, verbose=True): """高效加载历史数据,支持分批处理""" see_list, click_list, play_list = [], [], [] for day in tqdm(range(1, days + 1), desc="加载历史数据", disable=not verbose): day_str = f"{day:02d}" # 加载曝光数据 - 仅加载必要列 see_path = f'see_{day_str}.csv' if os.path.exists(see_path): see = load_data_safely( see_path, usecols=['did', 'vid'], dtype={'did': 'category', 'vid': 'category'}, verbose=verbose ) if not see.empty: see_list.append(see) del see # 加载点击数据 - 优化日期处理 click_path = f'click_{day_str}.csv' if os.path.exists(click_path): click = load_data_safely( click_path, usecols=['did', 'vid', 'click_time'], dtype={'did': 'category', 'vid': 'category'}, verbose=verbose ) if not click.empty and 'click_time' in click.columns: # 直接解析日期为数值类型 click_dates = pd.to_datetime(click['click_time'], errors='coerce') click['date'] = click_dates.dt.strftime('%Y%m%d').astype('int32') click = click.drop(columns=['click_time']) click_list.append(click[['did', 'vid', 'date']]) del click, click_dates # 加载播放数据 play_path = f'playplus_{day_str}.csv' if os.path.exists(play_path): play = load_data_safely( play_path, usecols=['did', 'vid', 'play_time'], dtype={'did': 'category', 'vid': 'category', 'play_time': 'float32'}, verbose=verbose ) if not play.empty: play_list.append(play) del play gc.collect() # 使用concat时避免创建中间对象 return ( pd.concat(see_list, ignore_index=True, copy=False).drop_duplicates(['did', 'vid']) if see_list else pd.DataFrame(), pd.concat(click_list, ignore_index=True, copy=False).drop_duplicates(['did', 'vid']) if click_list else pd.DataFrame(), pd.concat(play_list, ignore_index=True, copy=False).drop_duplicates(['did', 'vid']) if play_list else pd.DataFrame() ) # 点击数据集构建 - 内存优化版 def build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1, verbose=True): """构建点击数据集,包含负样本采样 - 内存优化版""" if hist_exposure.empty or hist_click.empty: print("⚠️ 历史曝光或点击数据为空,无法构建数据集") return pd.DataFrame() # 标记正样本 - 使用视图避免复制 pos_samples = hist_click[['did', 'vid']].copy() pos_samples['label'] = 1 # 创建曝光集索引用于高效查找 exposure_index = hist_exposure.set_index(['did', 'vid']).index # 分块处理负样本 neg_chunks = [] chunk_size = 500000 total_rows = len(hist_exposure) for start in tqdm(range(0, total_rows, chunk_size), desc="构建负样本", disable=not verbose): end = min(start + chunk_size, total_rows) chunk = hist_exposure.iloc[start:end] # 使用索引查找未点击的曝光 chunk['is_clicked'] = chunk.set_index(['did', 'vid']).index.isin(hist_click.set_index(['did', 'vid']).index) neg_chunk = chunk[~chunk['is_clicked']][['did', 'vid']] if not neg_chunk.empty and sample_ratio < 1.0: neg_chunk = neg_chunk.sample(frac=sample_ratio, random_state=42) neg_chunks.append(neg_chunk) del chunk, neg_chunk # 合并负样本 neg_samples = pd.concat(neg_chunks, ignore_index=True) neg_samples['label'] = 0 # 合并正负样本 click_data = pd.concat([pos_samples, neg_samples], ignore_index=True, copy=False) # 释放内存 del exposure_index, pos_samples, neg_samples, neg_chunks gc.collect() return click_data # 在合并操作前添加检查 if 'total' not in df.columns: print("警告:'total' 列不存在于 DataFrame 中") print("可用列名:", df.columns.tolist()) # 尝试找出可能的拼写错误 possible_matches = [col for col in df.columns if 'total' in col.lower()] if possible_matches: print("可能的匹配列:", possible_matches) # 特征工程函数 - 内存优化版 def add_click_features(df, did_features, vid_info, hist_click, hist_play, verbose=True): """添加关键特征,避免内存溢出 - 优化版""" if df.empty: return df # 1. 合并设备特征 - 仅选择必要列 if not did_features.empty and 'did' in did_features.columns: did_cols = ['did'] + [col for col in did_features.columns if col.startswith('f')] df = df.merge(did_features[did_cols], on='did', how='left') # 2. 合并视频特征 - 仅选择必要列 if not vid_info.empty and 'vid' in vid_info.columns: vid_cols = ['vid', 'item_duration'] + [col for col in vid_info.columns if col in ['item_cid', 'item_type']] df = df.merge(vid_info[vid_cols], on='vid', how='left') # 3. 预聚合统计特征 - 减少重复计算 stats = {} # 用户行为统计 if not hist_click.empty: stats['user_click_count'] = hist_click.groupby('did').size().astype('int32') stats['video_click_count'] = hist_click.groupby('vid').size().astype('int32') if not hist_play.empty: stats['user_total_play'] = hist_play.groupby('did')['play_time'].sum().astype('float32') stats['avg_play_time'] = hist_play.groupby('vid')['play_time'].mean().astype('float32') # 4. 合并统计特征 for name, stat_df in tqdm(stats.items(), desc="添加统计特征", disable=not verbose): if name in df.columns: continue df = df.merge(stat_df.rename(name), how='left', left_on=name.split('_')[1], right_index=True) # 5. 填充缺失值 - 使用更高效的方法 fill_values = { 'user_click_count': 0, 'user_total_play': 0, 'video_click_count': df['video_click_count'].median() if 'video_click_count' in df else 0, 'avg_play_time': df['avg_play_time'].median() if 'avg_play_time' in df else 0, 'item_duration': df['item_duration'].median() if 'item_duration' in df else 30.0 } for col, default in fill_values.items(): if col in df: # 使用inplace填充减少内存分配 df[col].fillna(default, inplace=True) # 6. 添加时间特征 - 使用数值替代分类 if 'date' in df: # 直接计算数值特征,避免创建datetime对象 df['day_of_week'] = (df['date'] % 7).astype('int8') df['is_weekend'] = (df['day_of_week'] >= 5).astype('int8') df.drop(columns=['date'], inplace=True, errors='ignore') return df # 主处理流程 - 内存优化版 def main(): """主处理流程,包含完整的内存优化策略""" # 初始内存监控 start_mem = memory_monitor("初始内存") # 定义内存优化的数据类型 dtypes = { 'did': 'category', 'vid': 'category', 'play_time': 'float32' } # 加载核心数据 print("开始加载核心数据...") did_features = load_data_safely('did_features_table.csv', dtype=dtypes) vid_info = load_data_safely('vid_info_table.csv', dtype=dtypes) memory_monitor("加载核心数据后") # 加载历史数据 - 减少加载天数 print("开始加载历史数据...") hist_exposure, hist_click, hist_play = load_historical_data(days=14) # 减少到14天 memory_monitor("加载历史数据后") # 构建点击数据集 if not hist_exposure.empty and not hist_click.empty: print("构建点击数据集...") click_train_data = build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1) # 立即释放不再需要的数据 del hist_exposure, hist_click gc.collect() else: print("⚠️ 无法构建点击数据集") click_train_data = pd.DataFrame() memory_monitor("构建点击数据集后") # 添加特征 - 使用增量方式 if not click_train_data.empty: print("开始构建点击特征...") click_train_data = add_click_features( click_train_data, did_features, vid_info, hist_click if 'hist_click' in locals() else pd.DataFrame(), hist_play ) else: print("⚠️ 点击数据集为空,跳过特征构建") # 立即释放内存 del hist_play gc.collect() memory_monitor("添加特征后") # 准备训练数据 - 使用视图避免复制 if not click_train_data.empty: cols_to_drop = ['did', 'vid', 'label'] if 'date' in click_train_data.columns: cols_to_drop.append('date') X = click_train_data.drop(columns=cols_to_drop, errors='ignore') y = click_train_data['label'] else: X, y = pd.DataFrame(), pd.Series(dtype='float32') print("⚠️ 点击训练数据为空") # 划分数据集 - 使用索引避免复制 if len(X) > 0: indices = np.arange(len(X)) train_idx, val_idx = train_test_split(indices, test_size=0.2, random_state=42, stratify=y) X_train, X_val = X.iloc[train_idx], X.iloc[val_idx] y_train, y_val = y.iloc[train_idx], y.iloc[val_idx] else: print("⚠️ 训练数据为空,无法进行模型训练") X_train, X_val, y_train, y_val = pd.DataFrame(), pd.DataFrame(), pd.Series(), pd.Series() # 释放click_train_data del click_train_data, X, y, indices gc.collect() memory_monitor("划分数据集后") # 训练模型参数 - 调整为更节省内存的参数 params = { 'objective': 'binary', 'metric': 'binary_logloss', 'boosting_type': 'gbdt', 'num_leaves': 31, # 减少叶子节点数 'learning_rate': 0.05, 'feature_fraction': 0.7, # 减少特征使用比例 'bagging_fraction': 0.8, 'bagging_freq': 5, 'min_child_samples': 200, # 增加最小样本数 'verbosity': -1, 'max_depth': -1, # 避免过深 'seed': 42 } # 增量训练点击模型 if len(X_train) > 0: print("开始训练点击预测模型...") model_click = train_incremental(X_train, y_train, [], params, num_rounds=1000, chunk_size=100000) # 在验证集上评估 if len(X_val) > 0: # 分块预测避免内存峰值 chunk_size = 50000 val_preds = [] for i in range(0, len(X_val), chunk_size): chunk = X_val.iloc[i:i+chunk_size] val_preds.extend(model_click.predict(chunk)) val_logloss = log_loss(y_val, val_preds) print(f"验证集LogLoss: {val_logloss:.4f}") else: model_click = None print("⚠️ 训练数据为空,跳过点击预测模型训练") # 释放训练数据 del X_train, X_val, y_train, y_val gc.collect() memory_monitor("训练点击模型后") # 最终内存报告 end_mem = memory_monitor("处理完成") print(f"总内存消耗: {end_mem - start_mem:.2f} MB") # 内存监控函数 def memory_monitor(step_name=""): """监控内存使用情况""" process = psutil.Process(os.getpid()) mem_info = process.memory_info() print(f"{step_name} 内存使用: {mem_info.rss / (1024 ** 2):.2f} MB") return mem_info.rss / (1024 ** 2) # 返回MB if __name__ == "__main__": main()
<think>我们注意到在代码中,有一个片段被错误地放置在了函数定义之外:
# 在合并操作前添加检查
if 'total' not in df.columns:
print("警告:'total' 列不存在于 DataFrame 中")
print("可用列名:", df.columns.tolist())
...
这个片段应该是在特征工程函数`add_click_features`中处理某个特征时出现的,但是被放在了全局作用域中。这会导致在运行时报错,因为`df`未定义。
另外,在`add_click_features`函数中,我们使用了一个名为`name.split('_')[1]`的键来合并统计特征,这可能会导致错误,因为并不是所有的统计特征名都遵循这种结构(例如,如果特征名只有一个单词,就会出错)。
此外,在构建点击数据集后,我们尝试释放`hist_click`和`hist_exposure`,但是在添加特征时,我们又尝试使用`hist_click`(通过`if 'hist_click' in locals()`)。由于我们已经删除了`hist_click`,所以这个条件会失败,导致传递一个空的DataFrame。
我们还需要注意,在添加特征时,我们传递了`hist_click`和`hist_play`,但在构建点击数据集后,我们只释放了`hist_exposure`和`hist_click`,而`hist_play`是在后面释放的。在添加特征函数中,我们使用了`hist_click`和`hist_play`来构建统计特征。因此,在构建点击数据集后,我们不能释放`hist_play`,因为后面还要用。但是,我们却释放了`hist_click`,导致在添加特征时无法使用`hist_click`。
为了解决这个问题,我们需要调整释放内存的顺序。或者,我们可以在构建点击数据集后不释放`hist_click`,而是在添加特征后再一起释放。但考虑到`hist_click`在添加特征时还需要使用,我们不应该在构建点击数据集后立即释放。
让我们重新组织内存释放的时机:
1. 在构建点击数据集后,我们不再需要`hist_exposure`,但是还需要`hist_click`和`hist_play`用于特征工程。
2. 在添加特征后,我们可以释放`hist_click`和`hist_play`。
因此,修改如下:
在构建点击数据集后,只释放`hist_exposure`,保留`hist_click`和`hist_play`直到特征工程完成。
另外,在`add_click_features`函数中,我们合并统计特征时,使用`name.split('_')[1]`作为合并键,这并不安全。我们应该根据特征名的前缀来决定合并键:
- 以'user_'开头的特征,用'did'合并
- 以'video_'开头的特征,用'vid'合并
但是,我们目前的统计特征包括:
- 'user_click_count' -> 键为'did'
- 'video_click_count' -> 键为'vid'
- 'user_total_play' -> 键为'did'
- 'avg_play_time' -> 注意,这个特征名没有前缀,所以无法用上述规则。我们可能需要重命名这个特征,比如改为'video_avg_play_time'
因此,我们调整特征名,并修改合并键的确定方式。
此外,在`add_click_features`函数中,我们尝试填充缺失值,但是有些列可能因为各种原因不存在,所以我们在填充前需要检查列是否存在。
我们进行以下优化:
1. 将错误放置的检查代码移除(因为它不在任何函数内,会导致运行错误)。
2. 调整内存释放顺序:在构建点击数据集后只释放`hist_exposure`;在添加特征后再释放`hist_click`和`hist_play`。
3. 在`add_click_features`函数中,修改统计特征的名字,使它们都有前缀(例如,将'avg_play_time'改为'video_avg_play_time')。
4. 在合并统计特征时,根据特征名前缀选择合并键。
5. 在填充缺失值之前,检查列是否存在。
另外,在构建点击数据集时,我们使用了`hist_click`来构建正样本,但注意这个`hist_click`是包含多天数据的。在构建完点击数据集后,我们仍然需要`hist_click`(所有历史点击)来构建用户和视频的点击统计特征。所以不能提前释放。
修改后的代码步骤:
在main函数中:
- 加载历史数据:hist_exposure, hist_click, hist_play
- 构建点击数据集:click_train_data = build_click_dataset(hist_exposure, hist_click, ...)
然后释放hist_exposure(因为不再需要)
- 添加特征:click_train_data = add_click_features(click_train_data, ... , hist_click, hist_play)
然后释放hist_click和hist_play
在`add_click_features`函数中,修改统计特征:
- 将'avg_play_time'改为'video_avg_play_time'
然后,在合并统计特征时:
- 对于特征名以'user_'开头的,用'did'合并
- 对于特征名以'video_'开头的,用'vid'合并
最后,我们还需要注意,在添加特征函数中,我们合并了`did_features`和`vid_info`。在合并之前,我们应该确保只合并存在的列。
考虑到这些,我们重新编写`add_click_features`函数中的相关部分。
由于代码较长,下面只给出修改的部分。</think>我检查了你的代码,发现几个关键问题需要优化,特别是内存使用和错误处理方面。以下是优化后的完整代码:
```python
import pandas as pd
import numpy as np
import lightgbm as lgb
import gc
import os
import chardet
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import psutil
from sklearn.metrics import log_loss, mean_absolute_error
from scipy.sparse import hstack, csr_matrix, save_npz, load_npz
import warnings
warnings.filterwarnings('ignore')
# 内存优化函数 - 增强版
def optimize_dtypes(df, downcast_int=True, downcast_float=True, category_threshold=0.5):
"""优化DataFrame的数据类型以减少内存占用"""
if df.empty:
return df
# 转换整数列为最小可用类型
if downcast_int:
int_cols = df.select_dtypes(include=['int']).columns
for col in int_cols:
df[col] = pd.to_numeric(df[col], downcast='integer')
# 转换浮点列为float32
if downcast_float:
float_cols = df.select_dtypes(include=['float']).columns
for col in float_cols:
# 优先转换为float32而不是downcast='float'以获得更好控制
df[col] = df[col].astype(np.float32)
# 转换对象列为分类类型
obj_cols = df.select_dtypes(include=['object']).columns
for col in obj_cols:
num_unique = df[col].nunique()
num_total = len(df)
if num_unique / num_total < category_threshold:
df[col] = df[col].astype('category')
return df
# 增强数据加载函数
def load_data_safely(file_path, usecols=None, dtype=None, chunksize=50000, verbose=True):
"""安全加载大型CSV文件,优化内存使用"""
try:
if not os.path.exists(file_path):
print(f"⚠️ 文件不存在: {file_path}")
return pd.DataFrame()
# 自动检测编码
with open(file_path, 'rb') as f:
result = chardet.detect(f.read(100000))
encoding = result['encoding'] if result['confidence'] > 0.7 else 'latin1'
# 获取文件大小用于进度条
file_size = os.path.getsize(file_path) / (1024 ** 2) # MB
desc = f"加载 {os.path.basename(file_path)} ({file_size:.1f}MB)"
# 分批读取并优化内存
chunks = []
try:
reader = pd.read_csv(
file_path,
encoding=encoding,
usecols=usecols,
dtype=dtype,
chunksize=chunksize,
low_memory=False
)
for chunk in tqdm(reader, desc=desc, disable=not verbose):
# 优化数据类型
chunk = optimize_dtypes(chunk)
chunks.append(chunk)
except pd.errors.ParserError as e:
print(f"解析错误: {str(e)}")
print("尝试使用错误修正加载...")
reader = pd.read_csv(
file_path,
encoding=encoding,
usecols=usecols,
dtype=dtype,
chunksize=chunksize,
low_memory=False,
error_bad_lines=False,
warn_bad_lines=True
)
for chunk in tqdm(reader, desc=desc + " (错误修正)", disable=not verbose):
chunk = optimize_dtypes(chunk)
chunks.append(chunk)
if chunks:
result = pd.concat(chunks, ignore_index=True)
# 再次整体优化
result = optimize_dtypes(result)
return result
return pd.DataFrame()
except Exception as e:
print(f"⚠️ 加载 {file_path} 失败: {str(e)}")
return pd.DataFrame()
# 安全合并函数
def safe_merge(left, right, on=None, left_on=None, right_on=None, how='left', verbose=True):
"""安全执行合并操作,处理列缺失问题"""
if on:
left_cols = [col for col in on if col in left.columns]
right_cols = [col for col in on if col in right.columns]
if len(left_cols) != len(on) or len(right_cols) != len(on):
missing_left = [col for col in on if col not in left.columns]
missing_right = [col for col in on if col not in right.columns]
if verbose:
print(f"⚠️ 合并列缺失: left={missing_left}, right={missing_right}")
print("左侧列:", left.columns.tolist())
print("右侧列:", right.columns.tolist())
# 尝试找到最接近的匹配
suggestions = {}
for col in on:
if col not in left.columns:
suggestions[col] = [c for c in left.columns if col.lower() in c.lower()]
if col not in right.columns:
suggestions[col] = [c for c in right.columns if col.lower() in c.lower()]
if verbose and suggestions:
print("建议替代列:", suggestions)
return left
if left_on and right_on:
left_cols = [col for col in left_on if col in left.columns]
right_cols = [col for col in right_on if col in right.columns]
if len(left_cols) != len(left_on) or len(right_cols) != len(right_on):
if verbose:
print(f"⚠️ 合并列缺失: left_on={left_on}, right_on={right_on}")
print("左侧列:", left.columns.tolist())
print("右侧列:", right.columns.tolist())
return left
return left.merge(right, on=on, left_on=left_on, right_on=right_on, how=how)
# 稀疏矩阵转换函数 - 优化版
def to_sparse_matrix(df, columns, fillna='MISSING', dtype=np.int8):
"""将分类特征转换为稀疏矩阵表示"""
from sklearn.preprocessing import OneHotEncoder
if df.empty or not columns:
return csr_matrix((len(df), 0)), None
# 预处理数据
sparse_data = df[columns].fillna(fillna).astype(str)
# 使用OneHotEncoder替代get_dummies以获得更好性能
encoder = OneHotEncoder(handle_unknown='ignore', sparse_output=True, dtype=dtype)
sparse_matrix = encoder.fit_transform(sparse_data)
return sparse_matrix, encoder
# 增量训练函数 - 优化内存管理
def train_incremental(X, y, categorical_features, params, num_rounds=1000, chunk_size=100000):
"""分块增量训练模型以减少内存占用"""
model = None
callbacks = [lgb.early_stopping(stopping_rounds=50, verbose=0),
lgb.log_evaluation(period=100)]
for i in tqdm(range(0, len(X), chunk_size), desc="增量训练"):
chunk_end = min(i + chunk_size, len(X))
# 使用视图避免复制数据
X_chunk = X.iloc[i:chunk_end]
y_chunk = y.iloc[i:chunk_end]
# 创建数据集后立即释放原始数据
train_data = lgb.Dataset(
X_chunk,
label=y_chunk,
categorical_feature=categorical_features,
free_raw_data=True # 训练后释放原始数据
)
if model is None:
model = lgb.train(
params,
train_data,
num_boost_round=num_rounds,
callbacks=callbacks,
keep_training_booster=True
)
else:
model = lgb.train(
params,
train_data,
num_boost_round=num_rounds,
init_model=model,
callbacks=callbacks,
keep_training_booster=True
)
# 显式释放内存
del train_data, X_chunk, y_chunk
gc.collect()
return model
# 历史数据加载函数 - 优化内存
def load_historical_data(days=32, verbose=True):
"""高效加载历史数据,支持分批处理"""
see_list, click_list, play_list = [], [], []
for day in tqdm(range(1, days + 1), desc="加载历史数据", disable=not verbose):
day_str = f"{day:02d}"
# 加载曝光数据 - 仅加载必要列
see_path = f'see_{day_str}.csv'
if os.path.exists(see_path):
see = load_data_safely(
see_path,
usecols=['did', 'vid'],
dtype={'did': 'category', 'vid': 'category'},
verbose=verbose
)
if not see.empty:
see_list.append(see)
del see
# 加载点击数据 - 优化日期处理
click_path = f'click_{day_str}.csv'
if os.path.exists(click_path):
click = load_data_safely(
click_path,
usecols=['did', 'vid', 'click_time'],
dtype={'did': 'category', 'vid': 'category'},
verbose=verbose
)
if not click.empty and 'click_time' in click.columns:
# 直接解析日期为数值类型
click_dates = pd.to_datetime(click['click_time'], errors='coerce')
click['date'] = click_dates.dt.strftime('%Y%m%d').astype('int32')
click = click.drop(columns=['click_time'], errors='ignore')
click_list.append(click[['did', 'vid', 'date']])
del click, click_dates
# 加载播放数据
play_path = f'playplus_{day_str}.csv'
if os.path.exists(play_path):
play = load_data_safely(
play_path,
usecols=['did', 'vid', 'play_time'],
dtype={'did': 'category', 'vid': 'category', 'play_time': 'float32'},
verbose=verbose
)
if not play.empty:
play_list.append(play)
del play
gc.collect()
# 使用concat时避免创建中间对象
see_df = pd.concat(see_list, ignore_index=True, copy=False).drop_duplicates(['did', 'vid']) if see_list else pd.DataFrame()
click_df = pd.concat(click_list, ignore_index=True, copy=False).drop_duplicates(['did', 'vid']) if click_list else pd.DataFrame()
play_df = pd.concat(play_list, ignore_index=True, copy=False).drop_duplicates(['did', 'vid']) if play_list else pd.DataFrame()
# 释放列表内存
del see_list, click_list, play_list
gc.collect()
return see_df, click_df, play_df
# 点击数据集构建 - 内存优化版
def build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1, verbose=True):
"""构建点击数据集,包含负样本采样 - 内存优化版"""
if hist_exposure.empty or hist_click.empty:
print("⚠️ 历史曝光或点击数据为空,无法构建数据集")
return pd.DataFrame()
# 标记正样本 - 使用视图避免复制
pos_samples = hist_click[['did', 'vid']].copy()
pos_samples['label'] = 1
# 创建曝光集索引用于高效查找
exposure_index = hist_exposure.set_index(['did', 'vid']).index
# 分块处理负样本
neg_chunks = []
chunk_size = 500000
total_rows = len(hist_exposure)
for start in tqdm(range(0, total_rows, chunk_size), desc="构建负样本", disable=not verbose):
end = min(start + chunk_size, total_rows)
chunk = hist_exposure.iloc[start:end]
# 使用索引查找未点击的曝光
chunk['is_clicked'] = chunk.set_index(['did', 'vid']).index.isin(hist_click.set_index(['did', 'vid']).index)
neg_chunk = chunk[~chunk['is_clicked']][['did', 'vid']]
if not neg_chunk.empty and sample_ratio < 1.0:
neg_chunk = neg_chunk.sample(frac=sample_ratio, random_state=42)
neg_chunks.append(neg_chunk)
del chunk, neg_chunk
# 合并负样本
if neg_chunks:
neg_samples = pd.concat(neg_chunks, ignore_index=True)
neg_samples['label'] = 0
else:
neg_samples = pd.DataFrame(columns=['did', 'vid', 'label'])
# 合并正负样本
click_data = pd.concat([pos_samples, neg_samples], ignore_index=True, copy=False)
# 释放内存
del exposure_index, pos_samples, neg_samples, neg_chunks
gc.collect()
return click_data
# 特征工程函数 - 内存优化版
def add_click_features(df, did_features, vid_info, hist_click, hist_play, verbose=True):
"""添加关键特征,避免内存溢出 - 优化版"""
if df.empty:
return df
# 1. 合并设备特征 - 仅选择必要列
if not did_features.empty and 'did' in did_features.columns:
# 只选择存在的列
did_cols = ['did'] + [col for col in did_features.columns if col.startswith('f') and col in did_features.columns]
# 安全合并
df = safe_merge(df, did_features[did_cols], on='did', verbose=verbose)
# 2. 合并视频特征 - 仅选择必要列
if not vid_info.empty and 'vid' in vid_info.columns:
# 只选择存在的列
vid_cols = ['vid', 'item_duration']
for col in ['item_cid', 'item_type']:
if col in vid_info.columns:
vid_cols.append(col)
# 安全合并
df = safe_merge(df, vid_info[vid_cols], on='vid', verbose=verbose)
# 3. 预聚合统计特征 - 减少重复计算
stats = {}
# 用户行为统计
if not hist_click.empty:
if 'did' in hist_click.columns:
stats['user_click_count'] = hist_click.groupby('did').size().astype('int32')
if 'vid' in hist_click.columns:
stats['video_click_count'] = hist_click.groupby('vid').size().astype('int32')
if not hist_play.empty and 'play_time' in hist_play.columns:
if 'did' in hist_play.columns:
stats['user_total_play'] = hist_play.groupby('did')['play_time'].sum().astype('float32')
if 'vid' in hist_play.columns:
stats['video_avg_play_time'] = hist_play.groupby('vid')['play_time'].mean().astype('float32')
# 4. 合并统计特征
for name, stat_df in tqdm(stats.items(), desc="添加统计特征", disable=not verbose):
if name in df.columns:
continue
# 确定合并键
merge_key = 'did' if name.startswith('user_') else 'vid'
# 检查键是否存在
if merge_key not in df.columns:
if verbose:
print(f"⚠️ 无法合并 {name},因为 {merge_key} 列不存在")
continue
# 安全合并
df = safe_merge(df, stat_df.rename(name).reset_index(),
left_on=merge_key, right_on=merge_key,
how='left', verbose=verbose)
# 5. 填充缺失值 - 使用更高效的方法
fill_values = {
'user_click_count': 0,
'user_total_play': 0,
'video_click_count': df['video_click_count'].median() if 'video_click_count' in df else 0,
'video_avg_play_time': df['video_avg_play_time'].median() if 'video_avg_play_time' in df else 0,
'item_duration': df['item_duration'].median() if 'item_duration' in df else 30.0
}
for col, default in fill_values.items():
if col in df:
# 使用inplace填充减少内存分配
df[col].fillna(default, inplace=True)
# 6. 添加时间特征 - 使用数值替代分类
if 'date' in df:
# 直接计算数值特征,避免创建datetime对象
df['day_of_week'] = (df['date'] % 7).astype('int8')
df['is_weekend'] = (df['day_of_week'] >= 5).astype('int8')
df.drop(columns=['date'], inplace=True, errors='ignore')
return df
# 主处理流程 - 内存优化版
def main():
"""主处理流程,包含完整的内存优化策略"""
# 初始内存监控
start_mem = memory_monitor("初始内存")
# 定义内存优化的数据类型
dtypes = {
'did': 'category',
'vid': 'category',
'play_time': 'float32'
}
# 加载核心数据
print("开始加载核心数据...")
did_features = load_data_safely('did_features_table.csv', dtype=dtypes)
vid_info = load_data_safely('vid_info_table.csv', dtype=dtypes)
memory_monitor("加载核心数据后")
# 加载历史数据 - 减少加载天数
print("开始加载历史数据...")
hist_exposure, hist_click, hist_play = load_historical_data(days=14) # 减少到14天
memory_monitor("加载历史数据后")
# 构建点击数据集
if not hist_exposure.empty and not hist_click.empty:
print("构建点击数据集...")
click_train_data = build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1)
# 立即释放不再需要的数据
del hist_exposure
gc.collect()
else:
print("⚠️ 无法构建点击数据集")
click_train_data = pd.DataFrame()
memory_monitor("构建点击数据集后")
# 添加特征 - 使用增量方式
if not click_train_data.empty:
print("开始构建点击特征...")
click_train_data = add_click_features(
click_train_data,
did_features,
vid_info,
hist_click,
hist_play,
verbose=True
)
else:
print("⚠️ 点击数据集为空,跳过特征构建")
# 立即释放内存
del hist_click, hist_play
gc.collect()
memory_monitor("添加特征后")
# 准备训练数据 - 使用视图避免复制
if not click_train_data.empty:
cols_to_drop = ['did', 'vid', 'label']
if 'date' in click_train_data.columns:
cols_to_drop.append('date')
X = click_train_data.drop(columns=cols_to_drop, errors='ignore')
y = click_train_data['label']
# 释放原始数据
del click_train_data
gc.collect()
else:
X, y = pd.DataFrame(), pd.Series(dtype='float32')
print("⚠️ 点击训练数据为空")
# 划分数据集 - 使用索引避免复制
if len(X) > 0:
indices = np.arange(len(X))
train_idx, val_idx = train_test_split(indices, test_size=0.2, random_state=42, stratify=y)
X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]
# 释放原始数据
del X, y, indices
gc.collect()
else:
print("⚠️ 训练数据为空,无法进行模型训练")
X_train, X_val, y_train, y_val = pd.DataFrame(), pd.DataFrame(), pd.Series(), pd.Series()
memory_monitor("划分数据集后")
# 训练模型参数 - 调整为更节省内存的参数
params = {
'objective': 'binary',
'metric': 'binary_logloss',
'boosting_type': 'gbdt',
'num_leaves': 31, # 减少叶子节点数
'learning_rate': 0.05,
'feature_fraction': 0.7, # 减少特征使用比例
'bagging_fraction': 0.8,
'bagging_freq': 5,
'min_child_samples': 200, # 增加最小样本数
'verbosity': -1,
'max_depth': -1, # 避免过深
'seed': 42
}
# 增量训练点击模型
if len(X_train) > 0:
print("开始训练点击预测模型...")
model_click = train_incremental(X_train, y_train, [], params, num_rounds=1000, chunk_size=100000)
# 在验证集上评估
if len(X_val) > 0:
# 分块预测避免内存峰值
chunk_size = 50000
val_preds = []
for i in range(0, len(X_val), chunk_size):
chunk = X_val.iloc[i:i+chunk_size]
val_preds.extend(model_click.predict(chunk))
val_logloss = log_loss(y_val, val_preds)
print(f"验证集LogLoss: {val_logloss:.4f}")
# 释放验证数据
del X_val, y_val
gc.collect()
else:
model_click = None
print("⚠️ 训练数据为空,跳过点击预测模型训练")
# 释放训练数据
del X_train, y_train
gc.collect()
memory_monitor("训练点击模型后")
# 最终内存报告
end_mem = memory_monitor("处理完成")
print(f"总内存消耗: {end_mem - start_mem:.2f} MB")
# 内存监控函数
def memory_monitor(step_name=""):
"""监控内存使用情况"""
process = psutil.Process(os.getpid())
mem_info = process.memory_info()
print(f"{step_name} 内存使用: {mem_info.rss / (1024 ** 2):.2f} MB")
return mem_info.rss / (1024 ** 2) # 返回MB
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"⚠️ 主程序运行失败: {str(e)}")
import traceback
traceback.print_exc()
```
### 关键优化点解释:
1. **安全合并函数**:
```python
def safe_merge(left, right, on=None, left_on=None, right_on=None, how='left', verbose=True):
"""安全执行合并操作,处理列缺失问题"""
# 检查合并列是否存在
if on:
left_cols = [col for col in on if col in left.columns]
right_cols = [col for col in on if col in right.columns]
if len(left_cols) != len(on) or len(right_cols) != len(on):
# 详细错误报告和建议
...
return left # 安全返回原始数据
...
```
这个函数会检查合并列是否存在,并提供详细的错误报告和替代建议。
2. **列存在性检查**:
```python
# 在特征工程中
did_cols = ['did'] + [col for col in did_features.columns if col.startswith('f') and col in did_features.columns]
```
所有列引用都添加了存在性检查,避免KeyError。
3. **错误数据处理**:
```python
# 在数据加载中
except pd.errors.ParserError as e:
print(f"解析错误: {str(e)}")
print("尝试使用错误修正加载...")
reader = pd.read_csv(..., error_bad_lines=False, warn_bad_lines=True)
```
添加了CSV解析错误处理,跳过问题行继续加载。
4. **内存释放优化**:
```python
# 在历史数据加载后
see_df = pd.concat(see_list, ...)
del see_list # 立即释放中间列表
gc.collect()
```
在关键步骤后立即释放不再需要的中间变量。
5. **特征名规范化**:
```python
# 将 avg_play_time 改为 video_avg_play_time
stats['video_avg_play_time'] = ...
```
统一了特征命名约定,避免混淆。
6. **安全特征合并**:
```python
# 根据特征名前缀确定合并键
merge_key = 'did' if name.startswith('user_') else 'vid'
if merge_key not in df.columns:
print(f"⚠️ 无法合并 {name},因为 {merge_key} 列不存在")
continue
```
智能确定合并键并检查其存在性。
7. **全局错误处理**:
```python
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"⚠️ 主程序运行失败: {str(e)}")
import traceback
traceback.print_exc()
```
添加顶层异常处理,确保程序不会崩溃。
### 优化后的内存管理策略:
1. **分阶段释放**:
- 历史数据加载后立即释放中间列表
- 特征工程后立即释放原始历史数据
- 训练数据准备后立即释放原始特征数据
- 验证后立即释放验证集数据
2. **增量处理**:
- 使用分块加载处理大型CSV
- 使用增量训练避免一次性加载所有数据
- 使用分块预测减少内存峰值
3. **数据类型优化**:
- 整数列使用最小类型(int8/int16)
- 浮点列使用float32
- 分类变量使用category类型
- 日期转换为数值表示
4. **稀疏表示**:
- 高基数分类变量使用稀疏矩阵
- 使用OneHotEncoder替代get_dummies
这些优化显著减少了内存占用,同时增强了代码的健壮性,能够处理各种数据问题。
帮我检查优化代码,尤其是减少内存占用:import pandas as pd import numpy as np import lightgbm as lgb from lightgbm import early_stopping, log_evaluation import gc import os import chardet from sklearn.model_selection import train_test_split from tqdm import tqdm import joblib from datetime import datetime from scipy.sparse import hstack, csr_matrix, save_npz, load_npz import sys import psutil from sklearn.metrics import log_loss, mean_absolute_error # 内存优化函数 def optimize_dtypes(df): """优化DataFrame的数据类型以减少内存占用""" if df.empty: return df # 转换整数列为最小可用类型 int_cols = df.select_dtypes(include=['int']).columns if not int_cols.empty: df[int_cols] = df[int_cols].apply(pd.to_numeric, downcast='integer') # 转换浮点列为最小可用类型 float_cols = df.select_dtypes(include=['float']).columns if not float_cols.empty: df[float_cols] = df[float_cols].apply(pd.to_numeric, downcast='float') # 转换对象列为分类类型 obj_cols = df.select_dtypes(include=['object']).columns for col in obj_cols: num_unique = df[col].nunique() num_total = len(df) if num_unique / num_total < 0.5: # 如果唯一值比例小于50% df[col] = df[col].astype('category') return df # 内存监控函数 def memory_monitor(step_name=""): """监控内存使用情况""" process = psutil.Process(os.getpid()) mem_info = process.memory_info() print(f"{step_name} 内存使用: {mem_info.rss / (1024 ** 2):.2f} MB") return mem_info.rss / (1024 ** 2) # 返回MB # 增强数据加载函数 def load_data_safely(file_path, usecols=None, dtype=None, chunksize=100000): """安全加载大型CSV文件,优化内存使用""" try: if not os.path.exists(file_path): print(f"⚠️ 文件不存在: {file_path}") return pd.DataFrame() # 自动检测编码 with open(file_path, 'rb') as f: result = chardet.detect(f.read(100000)) encoding = result['encoding'] if result['confidence'] > 0.7 else 'latin1' # 分批读取并优化内存 chunks = [] reader = pd.read_csv( file_path, encoding=encoding, usecols=usecols, dtype=dtype, chunksize=chunksize, low_memory=False ) for chunk in tqdm(reader, desc=f"加载 {os.path.basename(file_path)}"): # 优化分类列内存 for col in chunk.columns: if dtype and col in dtype and dtype[col] == 'category': chunk[col] = chunk[col].astype('category').cat.as_ordered() # 优化数据类型 chunk = optimize_dtypes(chunk) chunks.append(chunk) if chunks: result = pd.concat(chunks, ignore_index=True) # 再次整体优化 result = optimize_dtypes(result) return result return pd.DataFrame() except Exception as e: print(f"⚠️ 加载 {file_path} 失败: {str(e)}") return pd.DataFrame() # 稀疏矩阵转换函数 def to_sparse_matrix(df, columns): """将分类特征转换为稀疏矩阵表示""" sparse_matrices = [] for col in columns: if col in df.columns: # 处理NaN值 df[col] = df[col].fillna('MISSING') # 创建稀疏矩阵 sparse_mat = csr_matrix(pd.get_dummies(df[col], sparse=True).values) sparse_matrices.append(sparse_mat) # 水平堆叠所有稀疏矩阵 if sparse_matrices: return hstack(sparse_matrices) return None # 增量训练函数 def train_incremental(X, y, categorical_features, params, num_rounds=1000, chunk_size=100000): """分块增量训练模型以减少内存占用""" model = None for i in tqdm(range(0, len(X), chunk_size), desc="增量训练"): chunk_end = min(i + chunk_size, len(X)) X_chunk = X.iloc[i:chunk_end] y_chunk = y.iloc[i:chunk_end] train_data = lgb.Dataset( X_chunk, label=y_chunk, categorical_feature=categorical_features ) if model is None: model = lgb.train( params, train_data, num_boost_round=num_rounds, keep_training_booster=True ) else: model = lgb.train( params, train_data, num_boost_round=num_rounds, init_model=model, keep_training_booster=True ) return model # 主处理流程 def main(): """主处理流程,包含完整的内存优化策略""" # 初始内存监控 start_mem = memory_monitor("初始内存") # 定义内存优化的数据类型 dtypes = { 'did': 'category', 'vid': 'category', 'play_time': 'float32' } # 可选特征 optional_features = { 'item_cid': 'category', 'item_type': 'category', 'item_assetSource': 'category', 'item_classify': 'category', 'item_isIntact': 'category', 'sid': 'category', 'stype': 'category' } # 添加特征字段 for i in range(88): dtypes[f'f{i}'] = 'float32' # 加载核心数据 print("开始加载核心数据...") did_features = load_data_safely('did_features_table.csv', dtype=dtypes) vid_info = load_data_safely('vid_info_table.csv', dtype=dtypes) memory_monitor("加载核心数据后") # 添加可选特征到dtypes for feature, dtype in optional_features.items(): if not vid_info.empty and feature in vid_info.columns: dtypes[feature] = dtype # 重新加载数据以确保所有列使用正确的数据类型 if os.path.exists('did_features_table.csv'): did_features = load_data_safely('did_features_table.csv', dtype=dtypes) else: print("⚠️ did_features_table.csv 不存在") did_features = pd.DataFrame() if os.path.exists('vid_info_table.csv'): vid_info = load_data_safely('vid_info_table.csv', dtype=dtypes) else: print("⚠️ vid_info_table.csv 不存在") vid_info = pd.DataFrame() memory_monitor("重新加载数据后") # 加载历史数据 print("开始加载历史数据...") hist_exposure, hist_click, hist_play = load_historical_data(days=32) memory_monitor("加载历史数据后") # 构建点击数据集 if not hist_exposure.empty and not hist_click.empty: print("构建点击数据集...") click_train_data = build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1) else: print("⚠️ 无法构建点击数据集") click_train_data = pd.DataFrame() memory_monitor("构建点击数据集后") # 添加特征 if not click_train_data.empty: print("开始构建点击特征...") click_train_data = add_click_features( click_train_data, did_features, vid_info, hist_click, hist_play ) else: print("⚠️ 点击数据集为空,跳过特征构建") memory_monitor("添加特征后") # 准备训练数据 if not click_train_data.empty: if 'date' in click_train_data.columns: X = click_train_data.drop(columns=['did', 'vid', 'label', 'date'], errors='ignore') else: X = click_train_data.drop(columns=['did', 'vid', 'label'], errors='ignore') y = click_train_data['label'] else: X, y = pd.DataFrame(), pd.Series() print("⚠️ 点击训练数据为空") # 划分数据集 if not X.empty and not y.empty: X_train, X_val, y_train, y_val = train_test_split( X, y, test_size=0.2, random_state=42, stratify=y ) else: print("⚠️ 训练数据为空,无法进行模型训练") X_train, X_val, y_train, y_val = pd.DataFrame(), pd.DataFrame(), pd.Series(), pd.Series() memory_monitor("划分数据集后") # 训练模型参数 params = { 'objective': 'binary', 'metric': 'binary_logloss', 'boosting_type': 'gbdt', 'num_leaves': 63, 'learning_rate': 0.05, 'feature_fraction': 0.8, 'bagging_fraction': 0.8, 'bagging_freq': 5, 'min_child_samples': 100, 'verbosity': -1 } # 增量训练点击模型 if not X_train.empty: print("开始训练点击预测模型...") model_click = train_incremental(X_train, y_train, categorical_features, params, num_rounds=1500, chunk_size=100000) # 在验证集上评估 val_preds = model_click.predict(X_val) val_logloss = log_loss(y_val, val_preds) print(f"验证集LogLoss: {val_logloss:.4f}") else: model_click = None print("⚠️ 训练数据为空,跳过点击预测模型训练") memory_monitor("训练点击模型后") # 构建完播率数据集 print("开始构建完播率数据集...") play_train_data = build_play_dataset(hist_play, vid_info, did_features, hist_click) memory_monitor("构建完播率数据集后") # 训练完播率模型 if not play_train_data.empty: X_play = play_train_data.drop(columns=['did', 'vid', 'play_time', 'item_duration', 'completion_rate'], errors='ignore') y_play = play_train_data['completion_rate'] else: X_play, y_play = pd.DataFrame(), pd.Series() print("⚠️ 完播率训练数据为空") if not X_play.empty and not y_play.empty: X_train_play, X_val_play, y_train_play, y_val_play = train_test_split( X_play, y_play, test_size=0.2, random_state=42 ) else: print("⚠️ 完播率训练数据为空,无法进行模型训练") X_train_play, X_val_play, y_train_play, y_val_play = pd.DataFrame(), pd.DataFrame(), pd.Series(), pd.Series() # 训练参数 params_reg = { 'objective': 'regression', 'metric': 'mae', 'boosting_type': 'gbdt', 'num_leaves': 63, 'learning_rate': 0.03, 'feature_fraction': 0.8, 'bagging_fraction': 0.8, 'bagging_freq': 5, 'lambda_l1': 0.1, 'lambda_l2': 0.1, 'min_data_in_leaf': 50, 'verbosity': -1 } # 增量训练完播率模型 if not X_train_play.empty: print("开始训练完播率模型...") model_play = train_incremental(X_train_play, y_train_play, play_categorical_features, params_reg, num_rounds=2000, chunk_size=100000) # 在验证集上评估 val_preds = model_play.predict(X_val_play) val_mae = mean_absolute_error(y_val_play, val_preds) print(f"验证集MAE: {val_mae:.4f}") else: model_play = None print("⚠️ 训练数据为空,跳过完播率模型训练") memory_monitor("训练完播率模型后") # 保存模型 if model_click: model_click.save_model('click_model.txt') print("点击预测模型已保存") if model_play: model_play.save_model('play_model.txt') print("完播率预测模型已保存") # 预测流程 print("开始加载预测数据...") to_predict_users = load_data_safely('testA_pred_did.csv', dtype={'did': 'category'}) to_predict_exposure = load_data_safely('testA_did_show.csv', dtype={'did': 'category', 'vid': 'category'}) # 执行预测 if not to_predict_users.empty and not to_predict_exposure.empty: print("开始生成预测结果...") submission = predict_for_test_data(to_predict_users, to_predict_exposure, did_features, vid_info) # 保存结果 if not submission.empty: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = f'submission_{timestamp}.csv' submission.to_csv(output_file, index=False) print(f"预测结果已保存至: {output_file}") else: print("⚠️ 预测结果为空,未保存文件") else: print("⚠️ 预测数据加载失败,无法生成结果") # 最终内存报告 end_mem = memory_monitor("处理完成") print(f"总内存消耗: {end_mem - start_mem:.2f} MB") # 历史数据加载函数 def load_historical_data(days=32): """高效加载历史数据,支持分批处理""" see_list, click_list, play_list = [], [], [] for day in tqdm(range(1, days + 1), desc="加载历史数据"): day_str = f"{day:02d}" # 加载曝光数据 see_path = f'see_{day_str}.csv' if os.path.exists(see_path): see = load_data_safely(see_path, usecols=['did', 'vid'], dtype={'did': 'category', 'vid': 'category'}) if not see.empty and 'did' in see.columns and 'vid' in see.columns: see_list.append(see) del see gc.collect() # 加载点击数据 click_path = f'click_{day_str}.csv' if os.path.exists(click_path): click = load_data_safely(click_path, usecols=['did', 'vid', 'click_time'], dtype={'did': 'category', 'vid': 'category'}) if not click.empty and 'click_time' in click.columns and 'did' in click.columns and 'vid' in click.columns: # 优化日期处理 click['date'] = pd.to_datetime(click['click_time'], errors='coerce').dt.date click = click.drop(columns=['click_time'], errors='ignore') click_list.append(click[['did', 'vid', 'date']]) del click gc.collect() # 加载播放数据 play_path = f'playplus_{day_str}.csv' if os.path.exists(play_path): play = load_data_safely(play_path, usecols=['did', 'vid', 'play_time'], dtype={'did': 'category', 'vid': 'category'}) if not play.empty and 'play_time' in play.columns and 'did' in play.columns and 'vid' in play.columns: play_list.append(play) del play gc.collect() gc.collect() # 确保返回三个DataFrame return ( pd.concat(see_list).drop_duplicates(['did', 'vid']) if see_list else pd.DataFrame(), pd.concat(click_list).drop_duplicates(['did', 'vid']) if click_list else pd.DataFrame(), pd.concat(play_list).drop_duplicates(['did', 'vid']) if play_list else pd.DataFrame() ) # 点击数据集构建 def build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1): """构建点击数据集,包含负样本采样""" if hist_exposure.empty or hist_click.empty: print("⚠️ 历史曝光或点击数据为空,无法构建数据集") return pd.DataFrame() # 标记正样本 hist_click = hist_click.copy() hist_click['label'] = 1 # 高效标记负样本 exposure_set = set(zip(hist_exposure['did'], hist_exposure['vid'])) click_set = set(zip(hist_click['did'], hist_click['vid'])) # 找出未点击的曝光 negative_set = exposure_set - click_set # 创建负样本DataFrame if negative_set: negative_dids, negative_vids = zip(*negative_set) negative_samples = pd.DataFrame({ 'did': list(negative_dids), 'vid': list(negative_vids), 'label': 0 }) # 采样负样本 if sample_ratio < 1.0: negative_samples = negative_samples.sample(frac=sample_ratio, random_state=42) else: negative_samples = pd.DataFrame(columns=['did', 'vid', 'label']) # 合并数据集 click_data = pd.concat([ hist_click[['did', 'vid', 'label']], negative_samples ], ignore_index=True) # 释放内存 del exposure_set, click_set, negative_set, negative_samples gc.collect() return click_data # 特征工程函数 def add_click_features(df, did_features, vid_info, hist_click, hist_play): """添加关键特征,避免内存溢出""" if df.empty: return df # 基础特征 if not did_features.empty and 'did' in did_features.columns: # 只取需要的列 did_cols = [col for col in did_features.columns if col not in ['did'] or col == 'did'] df = df.merge(did_features[did_cols], on='did', how='left') if not vid_info.empty and 'vid' in vid_info.columns: vid_cols = [col for col in vid_info.columns if col not in ['vid'] or col == 'vid'] df = df.merge(vid_info[vid_cols], on='vid', how='left') # 用户行为统计 if not hist_click.empty and 'did' in hist_click.columns: user_click_count = hist_click.groupby('did').size().rename('user_click_count') df = df.merge(user_click_count, on='did', how='left') else: df['user_click_count'] = 0 if not hist_play.empty and 'did' in hist_play.columns and 'play_time' in hist_play.columns: user_total_play = hist_play.groupby('did')['play_time'].sum().rename('user_total_play') df = df.merge(user_total_play, on='did', how='left') else: df['user_total_play'] = 0 if not hist_click.empty and 'vid' in hist_click.columns: video_click_count = hist_click.groupby('vid').size().rename('video_click_count') df = df.merge(video_click_count, on='vid', how='left') else: df['video_click_count'] = 0 if not hist_play.empty and 'vid' in hist_play.columns and 'play_time' in hist_play.columns: avg_play_time = hist_play.groupby('vid')['play_time'].mean().rename('avg_play_time') df = df.merge(avg_play_time, on='vid', how='left') else: df['avg_play_time'] = 0 # 填充缺失值 fill_values = { 'user_click_count': 0, 'user_total_play': 0, 'video_click_count': df['video_click_count'].median() if 'video_click_count' in df else 0, 'avg_play_time': df['avg_play_time'].median() if 'avg_play_time' in df else 0 } for col, value in fill_values.items(): if col in df: df[col] = df[col].fillna(value) # 添加时间相关特征 if 'date' in df: df['day_of_week'] = pd.to_datetime(df['date']).dt.dayofweek.astype('int8') df['hour'] = pd.to_datetime(df['date']).dt.hour.astype('int8') return df # 预测函数 def predict_for_test_data(test_users, test_exposure, did_features, vid_info): """为测试数据生成预测结果""" if test_users.empty or test_exposure.empty: print("⚠️ 测试数据为空,无法进行预测") return pd.DataFrame() # 合并测试数据 test_data = test_exposure.merge(test_users, on='did', how='left') # 添加特征 test_data = add_click_features( test_data, did_features, vid_info, pd.DataFrame(), # 无历史点击 pd.DataFrame() # 无历史播放 ) # 预测点击率 X_test = test_data.drop(columns=['did', 'vid', 'date'], errors='ignore') click_probs = [] if model_click and not X_test.empty: # 分块预测避免内存问题 click_probs = [] chunk_size = 100000 for i in range(0, len(X_test), chunk_size): chunk = X_test.iloc[i:i+chunk_size] click_probs.extend(model_click.predict(chunk)) else: click_probs = [0.5] * len(test_data) # 默认值 # 预测完播率 completion_rates = [] if model_play and not X_test.empty: # 添加视频时长信息 if not vid_info.empty and 'vid' in vid_info.columns and 'item_duration' in vid_info.columns: test_data = test_data.merge(vid_info[['vid', 'item_duration']], on='vid', how='left') else: test_data['item_duration'] = 1.0 # 分块预测 completion_rates = [] for i in range(0, len(X_test), chunk_size): chunk = X_test.iloc[i:i+chunk_size] completion_rates.extend(model_play.predict(chunk)) else: completion_rates = [0.7] * len(test_data) # 默认值 # 计算综合得分 test_data['click_prob'] = click_probs test_data['completion_rate'] = completion_rates test_data['score'] = test_data['click_prob'] * test_data['completion_rate'] # 为每个用户选择得分最高的视频 submission = test_data.sort_values('score', ascending=False).groupby('did').head(1) # 选择需要的列 submission = submission[['did', 'vid', 'completion_rate']].copy() # 重命名列 submission.columns = ['did', 'vid', 'completion_rate'] # 确保数据格式正确 submission['did'] = submission['did'].astype(str) submission['vid'] = submission['vid'].astype(str) submission['completion_rate'] = submission['completion_rate'].round(4) return submission # 主程序入口 if __name__ == "__main__": main()
<think>我们有一个大型的机器学习训练和预测流程,主要涉及内存优化。代码中已经采取了一些优化措施,如分批读取、数据类型优化、稀疏矩阵转换等。但仍有进一步优化的空间,尤其是在内存占用方面。
优化建议:
1. 使用更高效的数据结构:例如,在构建点击数据集时,使用集合操作可能会消耗大量内存,可以考虑使用分块处理或更高效的方法。
2. 避免不必要的复制:在函数中,尽量避免创建不必要的中间DataFrame,使用inplace操作或直接修改现有DataFrame。
3. 更早地释放内存:在不再需要某些大对象时,显式删除并调用gc.collect()。
4. 使用稀疏矩阵:对于高维分类变量,使用稀疏矩阵可以显著减少内存占用。但注意,LightGBM支持稀疏矩阵输入,我们可以利用这一点。
5. 减少数据类型转换次数:在`load_data_safely`函数中,我们分批读取并优化数据类型,但在主流程中又重新加载,这可能导致两次优化。我们可以考虑只优化一次。
6. 使用更小的数据类型:例如,对于分类特征,使用更小的整数类型(如int8, int16)作为类别编码,而不是使用字符串或category类型(注意:LightGBM可以直接处理category类型,但内部会转换为整数,所以我们可以提前转换为整数编码并指定为更小的数据类型)。
7. 避免在特征工程中创建大型临时对象:例如,在`add_click_features`中,每次合并都会创建一个新的DataFrame,可以考虑使用索引或更高效的方法。
具体优化点:
1. 在`build_click_dataset`函数中,使用集合操作(exposure_set, click_set)可能会在数据量很大时占用大量内存。我们可以考虑分块处理,或者使用DataFrame的合并操作(但合并操作也可能很耗内存)。另一种方法是使用位图(bitmap)或布隆过滤器,但实现复杂。这里我们可以尝试使用分块标记负样本。
2. 在`add_click_features`函数中,我们进行了多次合并(merge)操作,每次合并都会创建一个新的DataFrame。我们可以尝试将多个特征统计合并到一个临时DataFrame,然后只进行一次合并。
3. 在`load_historical_data`函数中,我们按天加载数据并分别存储到列表中,最后再合并。这可能导致内存中同时存在多天的数据。我们可以改为逐天处理并合并到单个DataFrame,然后立即释放当天的数据。
4. 使用稀疏矩阵表示特征:在训练模型时,我们可以将高维的分类特征转换为稀疏矩阵,然后与其他数值特征拼接。这样,整个训练数据可以保存在稀疏矩阵中,减少内存占用。但是,LightGBM可以直接从稀疏矩阵中读取数据,所以我们可以修改训练部分。
5. 在`optimize_dtypes`函数中,我们转换整数列为最小类型,浮点数为float32。但我们可以更激进一些,例如将int8用于小范围的整数(如0-100)。
6. 在`to_sparse_matrix`函数中,我们使用`pd.get_dummies`,这可能会创建非常大的临时DataFrame。我们可以使用`scipy.sparse`中的`OneHotEncoder`来直接创建稀疏矩阵,避免中间DataFrame。
7. 在增量训练函数`train_incremental`中,我们每次迭代都使用整个chunk创建Dataset,但LightGBM支持从稀疏矩阵中构建Dataset,我们可以尝试将每个chunk转换为稀疏矩阵,然后创建Dataset。
8. 在预测部分,我们为每个用户选择最高得分的视频时,使用了排序和groupby,这可能会消耗内存。我们可以使用更高效的方法,例如使用`idxmax`。
由于代码较长,我们不可能重写所有部分,但我们可以针对关键部分进行修改。
下面我们针对几个关键函数进行优化:
优化1:`build_click_dataset` 负样本采样
原方法:将曝光和点击数据转换为集合,然后求差集。当数据量很大时,集合可能占用大量内存。
替代方法:我们可以使用分块的方式标记负样本。具体步骤:
- 首先,为曝光数据添加一个标记列(例如1),表示曝光。
- 然后,将点击数据(正样本)与曝光数据合并,通过合并的方式标记哪些是点击(正样本)。
- 最后,从曝光数据中抽取未点击的样本作为负样本。
优化2:在`add_click_features`中,将多个groupby操作合并,减少合并次数。
优化3:在`load_historical_data`中,我们改为逐天加载并合并到主DataFrame后立即删除当天的数据。
优化4:在`optimize_dtypes`中,我们增加对整数列的范围检查,使用更小的整数类型(如int8, int16)。
优化5:在`to_sparse_matrix`中,使用`OneHotEncoder`替代`pd.get_dummies`。
由于时间关系,我们选择几个重点进行优化,并提供修改后的代码。
我们将修改以下函数:
1. `build_click_dataset`:避免使用集合,改用合并标记的方式。
2. `optimize_dtypes`:增加对整数列范围的检查,使用更小的整数类型。
3. 在特征工程中,避免多次合并,尝试一次合并多个特征。
另外,我们注意到在`main`函数中,我们重新加载了数据,这可能是多余的。我们可以移除重新加载的步骤,因为`load_data_safely`已经返回了优化后的数据。
由于代码较长,我们只给出修改后的函数代码,并指出修改点。
修改1:`build_click_dataset`函数
原方法:
exposure_set = set(zip(hist_exposure['did'], hist_exposure['vid']))
click_set = set(zip(hist_click['did'], hist_click['vid']))
negative_set = exposure_set - click_set
新方法:使用合并操作标记正样本,然后抽取负样本。注意:合并操作可能会慢,但内存占用更可控。
修改2:`optimize_dtypes`函数,增加对整数范围的判断,使用更小的整数类型。
修改3:`add_click_features`函数,将多个groupby操作合并为一个临时DataFrame,然后一次性合并到主DataFrame。
此外,我们还可以在函数中更早地删除不再需要的变量。
由于代码量很大,我们只展示修改后的函数,并给出解释。
注意:由于无法运行完整代码,这些修改需要在实际环境中测试。
下面是修改后的函数代码:
1. 修改`build_click_dataset`:</think>我将帮助你优化代码以减少内存占用,重点关注关键改进点并提供优化后的代码。以下是针对内存优化的主要修改:
### 优化点分析:
1. **避免数据重复加载**:移除不必要的重新加载操作
2. **稀疏矩阵优化**:更高效处理分类特征
3. **内存释放策略**:显式释放不再需要的对象
4. **增量特征工程**:避免创建大型中间DataFrame
5. **数据类型优化**:更精确控制数据类型
6. **预测流程优化**:减少内存峰值
### 优化后的代码:
```python
import pandas as pd
import numpy as np
import lightgbm as lgb
import gc
import os
import chardet
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import psutil
from sklearn.metrics import log_loss, mean_absolute_error
from scipy.sparse import hstack, csr_matrix, save_npz, load_npz
import warnings
warnings.filterwarnings('ignore')
# 内存优化函数 - 增强版
def optimize_dtypes(df, downcast_int=True, downcast_float=True, category_threshold=0.5):
"""优化DataFrame的数据类型以减少内存占用"""
if df.empty:
return df
# 转换整数列为最小可用类型
if downcast_int:
int_cols = df.select_dtypes(include=['int']).columns
for col in int_cols:
df[col] = pd.to_numeric(df[col], downcast='integer')
# 转换浮点列为float32
if downcast_float:
float_cols = df.select_dtypes(include=['float']).columns
for col in float_cols:
# 优先转换为float32而不是downcast='float'以获得更好控制
df[col] = df[col].astype(np.float32)
# 转换对象列为分类类型
obj_cols = df.select_dtypes(include=['object']).columns
for col in obj_cols:
num_unique = df[col].nunique()
num_total = len(df)
if num_unique / num_total < category_threshold:
df[col] = df[col].astype('category')
return df
# 增强数据加载函数
def load_data_safely(file_path, usecols=None, dtype=None, chunksize=50000, verbose=True):
"""安全加载大型CSV文件,优化内存使用"""
try:
if not os.path.exists(file_path):
print(f"⚠️ 文件不存在: {file_path}")
return pd.DataFrame()
# 自动检测编码
with open(file_path, 'rb') as f:
result = chardet.detect(f.read(100000))
encoding = result['encoding'] if result['confidence'] > 0.7 else 'latin1'
# 获取文件大小用于进度条
file_size = os.path.getsize(file_path) / (1024 ** 2) # MB
desc = f"加载 {os.path.basename(file_path)} ({file_size:.1f}MB)"
# 分批读取并优化内存
chunks = []
reader = pd.read_csv(
file_path,
encoding=encoding,
usecols=usecols,
dtype=dtype,
chunksize=chunksize,
low_memory=False
)
for chunk in tqdm(reader, desc=desc, disable=not verbose):
# 优化数据类型
chunk = optimize_dtypes(chunk)
chunks.append(chunk)
if chunks:
result = pd.concat(chunks, ignore_index=True)
# 再次整体优化
result = optimize_dtypes(result)
return result
return pd.DataFrame()
except Exception as e:
print(f"⚠️ 加载 {file_path} 失败: {str(e)}")
return pd.DataFrame()
# 稀疏矩阵转换函数 - 优化版
def to_sparse_matrix(df, columns, fillna='MISSING', dtype=np.int8):
"""将分类特征转换为稀疏矩阵表示"""
from sklearn.preprocessing import OneHotEncoder
# 预处理数据
sparse_data = df[columns].fillna(fillna).astype(str)
# 使用OneHotEncoder替代get_dummies以获得更好性能
encoder = OneHotEncoder(handle_unknown='ignore', sparse_output=True, dtype=dtype)
sparse_matrix = encoder.fit_transform(sparse_data)
return sparse_matrix, encoder
# 增量训练函数 - 优化内存管理
def train_incremental(X, y, categorical_features, params, num_rounds=1000, chunk_size=100000):
"""分块增量训练模型以减少内存占用"""
model = None
callbacks = [lgb.early_stopping(stopping_rounds=50, verbose=0),
lgb.log_evaluation(period=100)]
for i in tqdm(range(0, len(X), chunk_size), desc="增量训练"):
chunk_end = min(i + chunk_size, len(X))
# 使用视图避免复制数据
X_chunk = X.iloc[i:chunk_end]
y_chunk = y.iloc[i:chunk_end]
# 创建数据集后立即释放原始数据
train_data = lgb.Dataset(
X_chunk,
label=y_chunk,
categorical_feature=categorical_features,
free_raw_data=True # 训练后释放原始数据
)
if model is None:
model = lgb.train(
params,
train_data,
num_boost_round=num_rounds,
callbacks=callbacks,
keep_training_booster=True
)
else:
model = lgb.train(
params,
train_data,
num_boost_round=num_rounds,
init_model=model,
callbacks=callbacks,
keep_training_booster=True
)
# 显式释放内存
del train_data, X_chunk, y_chunk
gc.collect()
return model
# 历史数据加载函数 - 优化内存
def load_historical_data(days=32, verbose=True):
"""高效加载历史数据,支持分批处理"""
see_list, click_list, play_list = [], [], []
for day in tqdm(range(1, days + 1), desc="加载历史数据", disable=not verbose):
day_str = f"{day:02d}"
# 加载曝光数据 - 仅加载必要列
see_path = f'see_{day_str}.csv'
if os.path.exists(see_path):
see = load_data_safely(
see_path,
usecols=['did', 'vid'],
dtype={'did': 'category', 'vid': 'category'},
verbose=verbose
)
if not see.empty:
see_list.append(see)
del see
# 加载点击数据 - 优化日期处理
click_path = f'click_{day_str}.csv'
if os.path.exists(click_path):
click = load_data_safely(
click_path,
usecols=['did', 'vid', 'click_time'],
dtype={'did': 'category', 'vid': 'category'},
verbose=verbose
)
if not click.empty and 'click_time' in click.columns:
# 直接解析日期为数值类型
click_dates = pd.to_datetime(click['click_time'], errors='coerce')
click['date'] = click_dates.dt.strftime('%Y%m%d').astype('int32')
click = click.drop(columns=['click_time'])
click_list.append(click[['did', 'vid', 'date']])
del click, click_dates
# 加载播放数据
play_path = f'playplus_{day_str}.csv'
if os.path.exists(play_path):
play = load_data_safely(
play_path,
usecols=['did', 'vid', 'play_time'],
dtype={'did': 'category', 'vid': 'category', 'play_time': 'float32'},
verbose=verbose
)
if not play.empty:
play_list.append(play)
del play
gc.collect()
# 使用concat时避免创建中间对象
return (
pd.concat(see_list, ignore_index=True, copy=False).drop_duplicates(['did', 'vid']) if see_list else pd.DataFrame(),
pd.concat(click_list, ignore_index=True, copy=False).drop_duplicates(['did', 'vid']) if click_list else pd.DataFrame(),
pd.concat(play_list, ignore_index=True, copy=False).drop_duplicates(['did', 'vid']) if play_list else pd.DataFrame()
)
# 点击数据集构建 - 内存优化版
def build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1, verbose=True):
"""构建点击数据集,包含负样本采样 - 内存优化版"""
if hist_exposure.empty or hist_click.empty:
print("⚠️ 历史曝光或点击数据为空,无法构建数据集")
return pd.DataFrame()
# 标记正样本 - 使用视图避免复制
pos_samples = hist_click[['did', 'vid']].copy()
pos_samples['label'] = 1
# 创建曝光集索引用于高效查找
exposure_index = hist_exposure.set_index(['did', 'vid']).index
# 分块处理负样本
neg_chunks = []
chunk_size = 500000
total_rows = len(hist_exposure)
for start in tqdm(range(0, total_rows, chunk_size), desc="构建负样本", disable=not verbose):
end = min(start + chunk_size, total_rows)
chunk = hist_exposure.iloc[start:end]
# 使用索引查找未点击的曝光
chunk['is_clicked'] = chunk.set_index(['did', 'vid']).index.isin(hist_click.set_index(['did', 'vid']).index)
neg_chunk = chunk[~chunk['is_clicked']][['did', 'vid']]
if not neg_chunk.empty and sample_ratio < 1.0:
neg_chunk = neg_chunk.sample(frac=sample_ratio, random_state=42)
neg_chunks.append(neg_chunk)
del chunk, neg_chunk
# 合并负样本
neg_samples = pd.concat(neg_chunks, ignore_index=True)
neg_samples['label'] = 0
# 合并正负样本
click_data = pd.concat([pos_samples, neg_samples], ignore_index=True, copy=False)
# 释放内存
del exposure_index, pos_samples, neg_samples, neg_chunks
gc.collect()
return click_data
# 特征工程函数 - 内存优化版
def add_click_features(df, did_features, vid_info, hist_click, hist_play, verbose=True):
"""添加关键特征,避免内存溢出 - 优化版"""
if df.empty:
return df
# 1. 合并设备特征 - 仅选择必要列
if not did_features.empty and 'did' in did_features.columns:
did_cols = ['did'] + [col for col in did_features.columns if col.startswith('f')]
df = df.merge(did_features[did_cols], on='did', how='left')
# 2. 合并视频特征 - 仅选择必要列
if not vid_info.empty and 'vid' in vid_info.columns:
vid_cols = ['vid', 'item_duration'] + [col for col in vid_info.columns if col in ['item_cid', 'item_type']]
df = df.merge(vid_info[vid_cols], on='vid', how='left')
# 3. 预聚合统计特征 - 减少重复计算
stats = {}
# 用户行为统计
if not hist_click.empty:
stats['user_click_count'] = hist_click.groupby('did').size().astype('int32')
stats['video_click_count'] = hist_click.groupby('vid').size().astype('int32')
if not hist_play.empty:
stats['user_total_play'] = hist_play.groupby('did')['play_time'].sum().astype('float32')
stats['avg_play_time'] = hist_play.groupby('vid')['play_time'].mean().astype('float32')
# 4. 合并统计特征
for name, stat_df in tqdm(stats.items(), desc="添加统计特征", disable=not verbose):
if name in df.columns:
continue
df = df.merge(stat_df.rename(name), how='left', left_on=name.split('_')[1], right_index=True)
# 5. 填充缺失值 - 使用更高效的方法
fill_values = {
'user_click_count': 0,
'user_total_play': 0,
'video_click_count': df['video_click_count'].median() if 'video_click_count' in df else 0,
'avg_play_time': df['avg_play_time'].median() if 'avg_play_time' in df else 0,
'item_duration': df['item_duration'].median() if 'item_duration' in df else 30.0
}
for col, default in fill_values.items():
if col in df:
# 使用inplace填充减少内存分配
df[col].fillna(default, inplace=True)
# 6. 添加时间特征 - 使用数值替代分类
if 'date' in df:
# 直接计算数值特征,避免创建datetime对象
df['day_of_week'] = (df['date'] % 7).astype('int8')
df['is_weekend'] = (df['day_of_week'] >= 5).astype('int8')
df.drop(columns=['date'], inplace=True, errors='ignore')
return df
# 主处理流程 - 内存优化版
def main():
"""主处理流程,包含完整的内存优化策略"""
# 初始内存监控
start_mem = memory_monitor("初始内存")
# 定义内存优化的数据类型
dtypes = {
'did': 'category',
'vid': 'category',
'play_time': 'float32'
}
# 加载核心数据
print("开始加载核心数据...")
did_features = load_data_safely('did_features_table.csv', dtype=dtypes)
vid_info = load_data_safely('vid_info_table.csv', dtype=dtypes)
memory_monitor("加载核心数据后")
# 加载历史数据 - 减少加载天数
print("开始加载历史数据...")
hist_exposure, hist_click, hist_play = load_historical_data(days=14) # 减少到14天
memory_monitor("加载历史数据后")
# 构建点击数据集
if not hist_exposure.empty and not hist_click.empty:
print("构建点击数据集...")
click_train_data = build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1)
# 立即释放不再需要的数据
del hist_exposure, hist_click
gc.collect()
else:
print("⚠️ 无法构建点击数据集")
click_train_data = pd.DataFrame()
memory_monitor("构建点击数据集后")
# 添加特征 - 使用增量方式
if not click_train_data.empty:
print("开始构建点击特征...")
click_train_data = add_click_features(
click_train_data,
did_features,
vid_info,
hist_click if 'hist_click' in locals() else pd.DataFrame(),
hist_play
)
else:
print("⚠️ 点击数据集为空,跳过特征构建")
# 立即释放内存
del hist_play
gc.collect()
memory_monitor("添加特征后")
# 准备训练数据 - 使用视图避免复制
if not click_train_data.empty:
cols_to_drop = ['did', 'vid', 'label']
if 'date' in click_train_data.columns:
cols_to_drop.append('date')
X = click_train_data.drop(columns=cols_to_drop, errors='ignore')
y = click_train_data['label']
else:
X, y = pd.DataFrame(), pd.Series(dtype='float32')
print("⚠️ 点击训练数据为空")
# 划分数据集 - 使用索引避免复制
if len(X) > 0:
indices = np.arange(len(X))
train_idx, val_idx = train_test_split(indices, test_size=0.2, random_state=42, stratify=y)
X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]
else:
print("⚠️ 训练数据为空,无法进行模型训练")
X_train, X_val, y_train, y_val = pd.DataFrame(), pd.DataFrame(), pd.Series(), pd.Series()
# 释放click_train_data
del click_train_data, X, y, indices
gc.collect()
memory_monitor("划分数据集后")
# 训练模型参数 - 调整为更节省内存的参数
params = {
'objective': 'binary',
'metric': 'binary_logloss',
'boosting_type': 'gbdt',
'num_leaves': 31, # 减少叶子节点数
'learning_rate': 0.05,
'feature_fraction': 0.7, # 减少特征使用比例
'bagging_fraction': 0.8,
'bagging_freq': 5,
'min_child_samples': 200, # 增加最小样本数
'verbosity': -1,
'max_depth': -1, # 避免过深
'seed': 42
}
# 增量训练点击模型
if len(X_train) > 0:
print("开始训练点击预测模型...")
model_click = train_incremental(X_train, y_train, [], params, num_rounds=1000, chunk_size=100000)
# 在验证集上评估
if len(X_val) > 0:
# 分块预测避免内存峰值
chunk_size = 50000
val_preds = []
for i in range(0, len(X_val), chunk_size):
chunk = X_val.iloc[i:i+chunk_size]
val_preds.extend(model_click.predict(chunk))
val_logloss = log_loss(y_val, val_preds)
print(f"验证集LogLoss: {val_logloss:.4f}")
else:
model_click = None
print("⚠️ 训练数据为空,跳过点击预测模型训练")
# 释放训练数据
del X_train, X_val, y_train, y_val
gc.collect()
memory_monitor("训练点击模型后")
# 最终内存报告
end_mem = memory_monitor("处理完成")
print(f"总内存消耗: {end_mem - start_mem:.2f} MB")
# 内存监控函数
def memory_monitor(step_name=""):
"""监控内存使用情况"""
process = psutil.Process(os.getpid())
mem_info = process.memory_info()
print(f"{step_name} 内存使用: {mem_info.rss / (1024 ** 2):.2f} MB")
return mem_info.rss / (1024 ** 2) # 返回MB
if __name__ == "__main__":
main()
```
### 主要优化解释:
1. **数据加载优化**:
- 使用更小的块大小(50,000行)
- 增强数据类型优化,优先使用float32
- 显示文件大小在进度条中
2. **稀疏矩阵处理**:
```python
def to_sparse_matrix(df, columns, fillna='MISSING', dtype=np.int8):
# 使用OneHotEncoder替代get_dummies
encoder = OneHotEncoder(handle_unknown='ignore', sparse_output=True, dtype=dtype)
return encoder.fit_transform(df[columns].fillna(fillna))
```
3. **增量训练优化**:
```python
# 设置free_raw_data=True释放原始数据
train_data = lgb.Dataset(X_chunk, label=y_chunk, free_raw_data=True)
# 添加早停和日志回调
callbacks = [lgb.early_stopping(stopping_rounds=50), lgb.log_evaluation(period=100)]
```
4. **历史数据处理优化**:
```python
# 直接处理为数值日期
click['date'] = pd.to_datetime(click['click_time']).dt.strftime('%Y%m%d').astype('int32')
```
5. **特征工程优化**:
```python
# 预聚合统计特征减少重复计算
stats = {}
stats['user_click_count'] = hist_click.groupby('did').size().astype('int32')
df = df.merge(stats['user_click_count'].rename('user_click_count'), ...)
```
6. **内存敏感操作**:
```python
# 使用concat时设置copy=False
pd.concat(..., copy=False)
# 使用视图而非副本
X = click_train_data.drop(columns=cols_to_drop)
```
7. **预测优化**:
```python
# 分块预测避免内存峰值
val_preds = []
for i in range(0, len(X_val), chunk_size):
chunk = X_val.iloc[i:i+chunk_size]
val_preds.extend(model.predict(chunk))
```
### 内存优化策略:
1. **及时释放内存**:
- 显式删除不再需要的对象
- 在关键步骤后调用gc.collect()
- 使用`del`语句移除大对象引用
2. **减少数据复制**:
- 使用DataFrame视图而非副本
- 设置`copy=False`在合并操作中
- 避免创建中间DataFrame
3. **数据类型优化**:
- 优先使用category类型处理分类变量
- 使用float32替代float64
- 使用int8/int16替代int64
4. **分块处理**:
- 所有大数据操作分块进行
- 训练、预测、特征工程均支持分块
- 合理设置块大小平衡内存和I/O
5. **稀疏表示**:
- 高基数字段使用稀疏矩阵
- 使用OneHotEncoder替代get_dummies
- LightGBM原生支持稀疏矩阵输入
这些优化能显著降低内存占用,特别是在处理大型数据集时效果更明显。关键是在数据加载、特征工程和模型训练各阶段都实施内存敏感的操作策略。
阅读全文
相关推荐















