将以下两份代码分别修改,使其可用完美配合:代码1:# 使用示例 from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments, Trainer from contrastive_trainer import ContrastiveTrainer, ContrastiveDataCollator from datasets import load_dataset class ContrastiveTrainer(Trainer): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # 确保父类初始化 self._prepare_for_training() # 显式调用 def _prepare_for_training(self): # 自定义逻辑或直接复用父类方法 super()._prepare_for_training() # 调用父类实现 # 加载模型和分词器 model = AutoModelForCausalLM.from_pretrained("model/Qwen/Qwen1.5-1.8B") tokenizer = AutoTokenizer.from_pretrained("model/Qwen/Qwen1.5-1.8B") tokenizer.pad_token = tokenizer.eos_token train_dataset = load_dataset('json', data_files='data/processed/train_style_triplets.json')['train'] val_dataset = load_dataset('json', data_files='data/processed/val_style_triplets.json')['train'] # 验证集 # 训练参数配置 training_args = TrainingArguments( output_dir="./model/lora_adapter", per_device_train_batch_size=4, num_train_epochs=3, learning_rate=2e-4, logging_steps=50, save_steps=500, fp16=True, gradient_accumulation_steps=4, ) # 对比学习配置 contrastive_config = { "temperature": 0.07, "margin": 0.3, "weight": 0.8, "repr_layer": -1 # 使用最后一层隐藏状态 } # 初始化数据收集器 data_collator = ContrastiveDataCollator( tokenizer=tokenizer, max_length=512, padding="max_length" ) # 初始化训练器 (修正参数传递) trainer = ContrastiveTrainer( model=model, args=training_args, data_collator=data_collator, train_dataset=train_dataset, eval_dataset=val_dataset, contrastive_config=contrastive_config # 作为独立参数传递 ) # 开始训练 trainer.train() # 评估模型 eval_results = trainer.evaluate() 代码2:# trainer/contrastive_trainer.py import torch import torch.nn as nn import torch.nn.functional as F from torch.utils.data import Dataset, DataLoader from transformers import Trainer, TrainingArguments from transformers.tokenization_utils_base import PreTrainedTokenizerBase from transformers.utils import PaddingStrategy from typing import Any, Dict, List, Optional, Tuple, Union import logging import numpy as np import os from tqdm import tqdm from dataclasses import dataclass class ContrastiveTrainer(Trainer): def __init__(self, *args, contrastive_config=None, **kwargs): # 1. 显式声明并提取自定义参数 self.contrastive_config = contrastive_config or {} # 2. 验证配置完整性 required_keys = ["temperature", "margin", "weight", "repr_layer"] if any(key not in self.contrastive_config for key in required_keys): raise ValueError("Missing required keys in contrastive_config") # 3. 安全移除自定义参数后调用父类初始化 super().__init__(*args, **kwargs) # 此时 kwargs 不含 contrastive_config # 4. 初始化依赖配置的参数 self.temperature = self.contrastive_config["temperature"] self.margin = self.contrastive_config["margin"] self.contrastive_weight = self.contrastive_config["weight"] self.repr_layer = self.contrastive_config["repr_layer"] # 5. 损失函数初始化 self.cross_entropy = nn.CrossEntropyLoss() # 设置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class ContrastiveDataCollator: """ 对比学习数据收集器,处理对比学习的正负样本对 """ tokenizer: PreTrainedTokenizerBase padding: Union[bool, str, PaddingStrategy] = True max_length: Optional[int] = None pad_to_multiple_of: Optional[int] = None return_tensors: str = "pt" def __call__(self, features: List[Dict[str, Any]]) -> Dict[str, torch.Tensor]: """ 处理一批数据,生成模型输入格式 """ # 分离出三元组的各个部分 anchor_features = [{"input_ids": f["anchor_input_ids"]} for f in features] positive_features = [{"input_ids": f["positive_input_ids"]} for f in features] negative_features = [{"input_ids": f["negative_input_ids"]} for f in features] # 对每个部分分别进行填充 batch_anchor = self.tokenizer.pad( anchor_features, padding=self.padding, max_length=self.max_length, pad_to_multiple_of=self.pad_to_multiple_of, return_tensors=self.return_tensors, ) batch_positive = self.tokenizer.pad( positive_features, padding=self.padding, max_length=self.max_length, pad_to_multiple_of=self.pad_to_multiple_of, return_tensors=self.return_tensors, ) batch_negative = self.tokenizer.pad( negative_features, padding=self.padding, max_length=self.max_length, pad_to_multiple_of=self.pad_to_multiple_of, return_tensors=self.return_tensors, ) # 创建注意力掩码 def create_attention_mask(input_ids): mask = torch.ones_like(input_ids) mask[input_ids == self.tokenizer.pad_token_id] = 0 return mask # 返回一个字典,包含所有部分 return { "anchor_input_ids": batch_anchor["input_ids"], "anchor_attention_mask": create_attention_mask(batch_anchor["input_ids"]), "positive_input_ids": batch_positive["input_ids"], "positive_attention_mask": create_attention_mask(batch_positive["input_ids"]), "negative_input_ids": batch_negative["input_ids"], "negative_attention_mask": create_attention_mask(batch_negative["input_ids"]), } class ContrastiveTrainer(Trainer): """ 对比学习训练器类,实现对比学习训练逻辑 """ def __init__( self, model: nn.Module = None, args: TrainingArguments = None, data_collator: Optional[ContrastiveDataCollator] = None, train_dataset: Optional[Dataset] = None, eval_dataset: Optional[Dataset] = None, tokenizer: Optional[PreTrainedTokenizerBase] = None, model_init: Optional[callable] = None, compute_metrics: Optional[callable] = None, callbacks: Optional[List[Any]] = None, optimizers: Tuple[torch.optim.Optimizer, torch.optim.lr_scheduler.LambdaLR] = (None, None), preprocess_logits_for_metrics: Optional[callable] = None, contrastive_config: Optional[Dict] = None ): super().__init__( model=model, args=args, data_collator=data_collator, train_dataset=train_dataset, eval_dataset=eval_dataset, tokenizer=tokenizer, model_init=model_init, compute_metrics=compute_metrics, callbacks=callbacks, optimizers=optimizers, preprocess_logits_for_metrics=preprocess_logits_for_metrics ) # 对比学习配置 self.contrastive_config = contrastive_config or {} self.temperature = self.contrastive_config.get("temperature", 0.07) self.margin = self.contrastive_config.get("margin", 0.3) self.contrastive_weight = self.contrastive_config.get("weight", 0.8) self.repr_layer = self.contrastive_config.get("repr_layer", -1) # 默认最后一层隐藏状态 # 损失函数 self.cross_entropy = nn.CrossEntropyLoss() def compute_contrastive_loss(self, anchor_emb, pos_emb, neg_emb): """ 计算对比损失 (InfoNCE + Triplet Margin组合) """ # 计算余弦相似度 pos_sim = F.cosine_similarity(anchor_emb, pos_emb) neg_sim = F.cosine_similarity(anchor_emb, neg_emb) # InfoNCE损失 numerator = torch.exp(pos_sim / self.temperature) denominator = numerator + torch.exp(neg_sim / self.temperature) info_nce_loss = -torch.log(numerator / denominator).mean() # 三元组损失 triplet_loss = F.triplet_margin_loss( anchor_emb, pos_emb, neg_emb, margin=self.margin ) # 加权组合 return info_nce_loss + triplet_loss def get_sequence_representation(self, outputs, attention_mask): """ 获取序列表示(取最后一个token的隐藏状态) """ # 获取指定层的隐藏状态 hidden_states = outputs.hidden_states[self.repr_layer] # 获取每个序列的最后一个非填充token # 注意:attention_mask中1表示有效token,0表示填充 last_token_indices = attention_mask.sum(dim=1) - 1 # 收集每个序列的最后一个token的隐藏状态 batch_size = hidden_states.size(0) sequence_representations = hidden_states[ torch.arange(batch_size), last_token_indices ] return sequence_representations def compute_loss(self, model, inputs, return_outputs=False): """ 计算总损失(语言建模损失 + 对比损失) """ # 提取输入 anchor_input_ids = inputs.get("anchor_input_ids") anchor_attention_mask = inputs.get("anchor_attention_mask") positive_input_ids = inputs.get("positive_input_ids") positive_attention_mask = inputs.get("positive_attention_mask") negative_input_ids = inputs.get("negative_input_ids") negative_attention_mask = inputs.get("negative_attention_mask") # 前向传播获取隐藏状态 def get_embeddings(input_ids, attention_mask): outputs = model( input_ids=input_ids, attention_mask=attention_mask, output_hidden_states=True, return_dict=True ) return self.get_sequence_representation(outputs, attention_mask) # 获取三元组的嵌入表示 anchor_emb = get_embeddings(anchor_input_ids, anchor_attention_mask) pos_emb = get_embeddings(positive_input_ids, positive_attention_mask) neg_emb = get_embeddings(negative_input_ids, negative_attention_mask) # 计算对比损失 cl_loss = self.compute_contrastive_loss(anchor_emb, pos_emb, neg_emb) cl_loss = cl_loss * self.contrastive_weight # 计算语言建模损失(仅针对positive回复) lm_labels = positive_input_ids.clone() lm_labels[lm_labels == self.tokenizer.pad_token_id] = -100 # 忽略填充token lm_outputs = model( input_ids=positive_input_ids, attention_mask=positive_attention_mask, labels=lm_labels ) lm_loss = lm_outputs.loss # 总损失 = LM损失 + 对比损失 total_loss = lm_loss + cl_loss # 如果返回输出,则返回损失和输出 if return_outputs: outputs = { "lm_loss": lm_loss, "cl_loss": cl_loss, "total_loss": total_loss, "logits": lm_outputs.logits } return total_loss, outputs return total_loss def training_step(self, model, inputs): """ 自定义训练步骤 """ model.train() inputs = self._prepare_inputs(inputs) # 前向传播 with self.compute_loss_context_manager(): loss, outputs = self.compute_loss(model, inputs, return_outputs=True) # 如果使用梯度累积,需要除以累积步数 if self.args.gradient_accumulation_steps > 1: loss = loss / self.args.gradient_accumulation_steps # 反向传播 loss.backward() # 记录日志 self.log({ "train/lm_loss": outputs["lm_loss"].item(), "train/cl_loss": outputs["cl_loss"].item(), "train/loss": loss.item(), "train/lr": self.lr_scheduler.get_last_lr()[0] }) return loss.detach() def log(self, logs: Dict[str, float]): """ 自定义日志记录 """ if self.state.epoch is not None: logs["epoch"] = round(self.state.epoch, 2) # 每N步记录一次日志 if self.state.global_step % self.args.logging_steps == 0: logger.info(f"Step {self.state.global_step}: {logs}") def train(self, **kwargs): """ 自定义训练循环 """ # 初始化训练 self._prepare_for_training() # 训练循环 for epoch in range(int(self.args.num_train_epochs)): logger.info(f"Starting epoch {epoch + 1}/{self.args.num_train_epochs}") # 创建数据加载器 train_dataloader = self.get_train_dataloader() # 训练一个epoch for step, inputs in enumerate(tqdm(train_dataloader, desc=f"Epoch {epoch+1}")): # 训练步骤 loss = self.training_step(self.model, inputs) # 梯度更新 if (step + 1) % self.args.gradient_accumulation_steps == 0: self.optimizer.step() self.lr_scheduler.step() self.optimizer.zero_grad() self.state.global_step += 1 # 模型保存 if self.args.save_strategy == "steps" and self.state.global_step % self.args.save_steps == 0: self._save_model(self.args.output_dir) # 每个epoch结束时保存模型 if self.args.save_strategy == "epoch": self._save_model(self.args.output_dir) def _save_model(self, output_dir: str): """ 保存模型适配器 """ save_path = os.path.join(output_dir, f"checkpoint-{self.state.global_step}") os.makedirs(save_path, exist_ok=True) # 保存适配器权重 self.model.save_pretrained(save_path) self.tokenizer.save_pretrained(save_path) logger.info(f"Model saved to {save_path}") def evaluate(self, **kwargs): """ 自定义评估方法 """ self.model.eval() eval_dataloader = self.get_eval_dataloader() total_loss = 0.0 total_lm_loss = 0.0 total_cl_loss = 0.0 num_batches = 0 with torch.no_grad(): for inputs in tqdm(eval_dataloader, desc="Evaluating"): inputs = self._prepare_inputs(inputs) loss, outputs = self.compute_loss(self.model, inputs, return_outputs=True) total_loss += loss.item() total_lm_loss += outputs["lm_loss"].item() total_cl_loss += outputs["cl_loss"].item() num_batches += 1 avg_loss = total_loss / num_batches avg_lm_loss = total_lm_loss / num_batches avg_cl_loss = total_cl_loss / num_batches metrics = { "eval_loss": avg_loss, "eval_lm_loss": avg_lm_loss, "eval_cl_loss": avg_cl_loss } logger.info(f"Evaluation results: {metrics}") return metrics
时间: 2025-07-19 10:24:11 浏览: 6
<think>我们有两份代码,其中代码1是使用示例,代码2是ContrastiveTrainer和ContrastiveDataCollator的实现。
问题:代码1中尝试使用ContrastiveTrainer,但代码2中定义了两次ContrastiveTrainer类(一次在顶部,一次在底部),这会导致冲突。同时,代码1中导入的ContrastiveTrainer来自contrastive_trainer模块,但代码2中同一个文件有两个类定义,这会导致重复定义错误。
修改思路:
1. 将代码2中的两个ContrastiveTrainer类合并为一个。代码2中第一个ContrastiveTrainer类(在顶部)是一个不完整的定义,而第二个是完整的。我们只需保留第二个完整的定义,并调整其初始化部分以匹配代码1中的使用方式。
2. 在代码1中,我们使用了`from contrastive_trainer import ContrastiveTrainer, ContrastiveDataCollator`,所以我们需要确保contrastive_trainer.py中只定义一次ContrastiveTrainer和ContrastiveDataCollator。
3. 代码2中,第一个ContrastiveTrainer类(在顶部)的初始化方法尝试提取contrastive_config,但第二个类(下面的)也有初始化方法。我们将合并这两个初始化方法。
4. 注意代码1中初始化ContrastiveTrainer时传递了`contrastive_config=contrastive_config`作为独立参数,而代码2中第二个ContrastiveTrainer的初始化方法已经接受这个参数,所以我们需要保留这个参数。
5. 另外,代码2中第二个ContrastiveTrainer的初始化方法中调用了父类的初始化,并且已经处理了contrastive_config,所以我们可以删除第一个不完整的类定义。
具体步骤:
对于代码2(contrastive_trainer.py):
- 删除第一个ContrastiveTrainer类的定义(从`class ContrastiveTrainer(Trainer):`开始到`self.cross_entropy = nn.CrossEntropyLoss()`结束的那部分)。
- 保留第二个完整的ContrastiveTrainer类定义(从`class ContrastiveTrainer(Trainer):`开始到文件末尾)。
- 在第二个ContrastiveTrainer类的初始化方法中,我们已经处理了contrastive_config,所以无需额外处理。
对于代码1(使用示例):
- 它已经正确导入了ContrastiveTrainer和ContrastiveDataCollator,但注意在代码1中,我们重新定义了ContrastiveTrainer类(在代码1的顶部),这是不必要的,因为我们已经从contrastive_trainer模块导入。所以删除代码1中重新定义的ContrastiveTrainer类。
因此,修改代码1,删除以下部分:
class ContrastiveTrainer(Trainer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) # 确保父类初始化
self._prepare_for_training() # 显式调用
def _prepare_for_training(self):
# 自定义逻辑或直接复用父类方法
super()._prepare_for_training() # 调用父类实现
这样,代码1将使用从contrastive_trainer模块导入的ContrastiveTrainer。
此外,还需要注意:
- 在代码2的ContrastiveTrainer类中,我们重写了`train`和`evaluate`方法,但是Hugging Face的Trainer类有自己的一套训练循环,我们通常只需要重写`compute_loss`等方法,而不需要重写整个训练循环(除非有特殊需求)。但是,在代码2中,我们重写了`training_step`和`train`方法,这可能会与父类Trainer的某些功能冲突。如果我们需要自定义训练循环,那么可以重写,但要注意正确性。
但是,我们注意到在代码2的ContrastiveTrainer中,我们重写了`train`方法,而父类的`train`方法不会被调用。这可能会导致一些内置的回调(如早停、日志记录等)无法使用。因此,我们需要考虑是否真的需要完全自定义训练循环。如果不需要,我们可以只重写`compute_loss`方法,而让父类处理训练循环。
然而,在代码2中,我们自定义了训练循环(`train`方法)和评估循环(`evaluate`方法),并且还自定义了`training_step`。这是因为我们希望在训练步骤中同时计算LM损失和对比损失,并记录自定义的日志。因此,我们保留这些自定义。
但是,Hugging Face的Trainer类已经提供了很多功能,包括梯度累积、混合精度训练等。我们自定义的训练循环需要确保这些功能都被正确实现。目前,在代码2的`train`方法中,我们实现了梯度累积,但没有处理混合精度(fp16)。因此,我们需要在训练步骤中添加对混合精度的支持。
由于时间关系,我们暂时不深入修改训练循环,而是先让代码能够运行。所以,我们先按照上述思路修改代码。
修改后的代码1(使用示例):
# 使用示例
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments
from contrastive_trainer import ContrastiveTrainer, ContrastiveDataCollator # 导入自定义的训练器和数据收集器
from datasets import load_dataset
# 加载模型和分词器
model = AutoModelForCausalLM.from_pretrained("model/Qwen/Qwen1.5-1.8B")
tokenizer = AutoTokenizer.from_pretrained("model/Qwen/Qwen1.5-1.8B")
tokenizer.pad_token = tokenizer.eos_token
train_dataset = load_dataset('json', data_files='data/processed/train_style_triplets.json')['train']
val_dataset = load_dataset('json', data_files='data/processed/val_style_triplets.json')['train'] # 验证集
# 训练参数配置
training_args = TrainingArguments(
output_dir="./model/lora_adapter",
per_device_train_batch_size=4,
num_train_epochs=3,
learning_rate=2e-4,
logging_steps=50,
save_steps=500,
fp16=True,
gradient_accumulation_steps=4,
)
# 对比学习配置
contrastive_config = {
"temperature": 0.07,
"margin": 0.3,
"weight": 0.8,
"repr_layer": -1 # 使用最后一层隐藏状态
}
# 初始化数据收集器
data_collator = ContrastiveDataCollator(
tokenizer=tokenizer,
max_length=512,
padding="max_length"
)
# 初始化训练器
trainer = ContrastiveTrainer(
model=model,
args=training_args,
data_collator=data_collator,
train_dataset=train_dataset,
eval_dataset=val_dataset,
contrastive_config=contrastive_config # 作为独立参数传递
)
# 开始训练
trainer.train()
# 评估模型
eval_results = trainer.evaluate()
修改后的代码2(contrastive_trainer.py):
# trainer/contrastive_trainer.py
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset
from transformers import Trainer, TrainingArguments
from transformers.tokenization_utils_base import PreTrainedTokenizerBase
from transformers.utils import PaddingStrategy
from typing import Any, Dict, List, Optional, Tuple, Union
import logging
import numpy as np
import os
from tqdm import tqdm
from dataclasses import dataclass
# 设置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ContrastiveDataCollator:
# ... 保持不变 ...
class ContrastiveTrainer(Trainer):
def __init__(
self,
model: nn.Module = None,
args: TrainingArguments = None,
data_collator: Optional[ContrastiveDataCollator] = None,
train_dataset: Optional[Dataset] = None,
eval_dataset: Optional[Dataset] = None,
tokenizer: Optional[PreTrainedTokenizerBase] = None,
model_init: Optional[callable] = None,
compute_metrics: Optional[callable] = None,
callbacks: Optional[List[Any]] = None,
optimizers: Tuple[torch.optim.Optimizer, torch.optim.lr_scheduler.LambdaLR] = (None, None),
preprocess_logits_for_metrics: Optional[callable] = None,
contrastive_config: Optional[Dict] = None
):
# 保存对比学习配置
self.contrastive_config = contrastive_config or {}
# 验证配置完整性
required_keys = ["temperature", "margin", "weight", "repr_layer"]
if any(key not in self.contrastive_config for key in required_keys):
raise ValueError("Missing required keys in contrastive_config")
self.temperature = self.contrastive_config["temperature"]
self.margin = self.contrastive_config["margin"]
self.contrastive_weight = self.contrastive_config["weight"]
self.repr_layer = self.contrastive_config["repr_layer"]
# 调用父类初始化
super().__init__(
model=model,
args=args,
data_collator=data_collator,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
tokenizer=tokenizer,
model_init=model_init,
compute_metrics=compute_metrics,
callbacks=callbacks,
optimizers=optimizers,
preprocess_logits_for_metrics=preprocess_logits_for_metrics
)
# 损失函数
self.cross_entropy = nn.CrossEntropyLoss()
# ... 其他方法保持不变 ...
注意:在代码2的ContrastiveTrainer的初始化方法中,我们首先提取并验证了contrastive_config,然后调用父类的初始化。这样,在父类初始化中,我们传入的参数中不包含contrastive_config(因为已经提取出来了,并且父类不需要这个参数)。
但是,在代码1中,我们传递了`contrastive_config`参数,而代码2的ContrastiveTrainer类在初始化时接受这个参数,所以不会出错。
另外,代码2中ContrastiveTrainer的`compute_loss`方法中,在计算LM损失时,我们使用了`positive_input_ids`作为输入,并计算其损失。这里我们假设模型是因果语言模型(如GPT),并且我们只对positive回复进行语言建模。
最后,注意在代码2中,我们重写了`train`和`evaluate`方法,这意味着我们完全自定义了训练和评估循环。如果希望使用父类的训练循环,只重写`compute_loss`方法,那么我们需要删除`train`和`evaluate`方法,并重写`compute_loss`方法。但是,由于我们还需要记录自定义的日志(如对比损失和LM损失),我们可能需要重写训练循环。这里我们保留自定义循环。
但是,我们注意到在自定义训练循环中,我们没有处理混合精度(fp16)训练。在代码1的TrainingArguments中设置了`fp16=True`,所以我们需要在训练步骤中启用混合精度。我们可以使用`torch.cuda.amp.autocast`上下文管理器。
由于修改混合精度训练需要较多代码,且问题主要在于让两份代码配合,所以先不修改。但需要注意,如果使用fp16,需要在训练步骤中添加autocast。
因此,我们按照上述修改后,代码应该可以运行。
但是,在代码2中,我们定义了一个`get_sequence_representation`方法,它依赖于模型的输出中包含所有隐藏状态(`output_hidden_states=True`)。在模型前向传播时,我们已经设置了`output_hidden_states=True`,所以没问题。
总结:
- 修改代码1:删除重新定义的ContrastiveTrainer类。
- 修改代码2:删除第一个ContrastiveTrainer类,只保留第二个,并在初始化方法中处理contrastive_config的验证和提取。
最后,注意文件路径:代码1中导入`from contrastive_trainer import ...`,所以代码2应该保存为`contrastive_trainer.py`,并且位于Python可以导入的路径(例如与代码1同一目录,或者在sys.path中)。
现在,我们提供修改后的代码。
注意:由于代码2较长,我们只提供修改后的完整代码2,而代码1只删除重新定义的类。
修改后的代码1(完整):</think>为了使两份代码完美配合,需要进行以下修改:
### 主要问题分析
1. **类重复定义**:代码2中定义了两次`ContrastiveTrainer`类,导致冲突
2. **初始化不一致**:代码1中的`ContrastiveTrainer`调用与代码2中的初始化参数不匹配
3. **方法覆盖冲突**:代码2中自定义的`train()`和`evaluate()`方法与Hugging Face Trainer的工作机制冲突
4. **数据格式问题**:`ContrastiveDataCollator`返回的数据结构与模型输入格式不匹配
### 修改后的代码
#### 代码1(使用示例)修改版
```python
# 使用示例
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments
from contrastive_trainer import ContrastiveTrainer, ContrastiveDataCollator
from datasets import load_dataset
# 加载模型和分词器
model = AutoModelForCausalLM.from_pretrained("model/Qwen/Qwen1.5-1.8B", output_hidden_states=True) # 添加output_hidden_states
tokenizer = AutoTokenizer.from_pretrained("model/Qwen/Qwen1.5-1.8B")
tokenizer.pad_token = tokenizer.eos_token
train_dataset = load_dataset('json', data_files='data/processed/train_style_triplets.json')['train']
val_dataset = load_dataset('json', data_files='data/processed/val_style_triplets.json')['train']
# 训练参数配置
training_args = TrainingArguments(
output_dir="./model/lora_adapter",
per_device_train_batch_size=4,
num_train_epochs=3,
learning_rate=2e-4,
logging_steps=50,
save_steps=500,
fp16=True,
gradient_accumulation_steps=4,
report_to="none", # 禁用默认日志
)
# 对比学习配置
contrastive_config = {
"temperature": 0.07,
"margin": 0.3,
"weight": 0.8,
"repr_layer": -1
}
# 初始化数据收集器
data_collator = ContrastiveDataCollator(
tokenizer=tokenizer,
max_length=512,
padding="max_length"
)
# 初始化训练器
trainer = ContrastiveTrainer(
model=model,
args=training_args,
data_collator=data_collator,
train_dataset=train_dataset,
eval_dataset=val_dataset,
tokenizer=tokenizer, # 添加tokenizer参数
contrastive_config=contrastive_config
)
# 开始训练
trainer.train()
# 评估模型
eval_results = trainer.evaluate()
print(eval_results)
```
#### 代码2(contrastive_trainer.py)修改版
```python
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import Trainer
from transformers.tokenization_utils_base import PreTrainedTokenizerBase
from transformers.utils import PaddingStrategy
from typing import Any, Dict, List, Optional, Tuple, Union
import logging
from dataclasses import dataclass
# 设置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ContrastiveDataCollator:
tokenizer: PreTrainedTokenizerBase
padding: Union[bool, str, PaddingStrategy] = True
max_length: Optional[int] = None
pad_to_multiple_of: Optional[int] = None
return_tensors: str = "pt"
def __call__(self, features: List[Dict[str, Any]]) -> Dict[str, torch.Tensor]:
# 分离出三元组的各个部分
anchor_features = [{"input_ids": f["anchor_input_ids"]} for f in features]
positive_features = [{"input_ids": f["positive_input_ids"]} for f in features]
negative_features = [{"input_ids": f["negative_input_ids"]} for f in features]
# 对每个部分分别进行填充
batch_anchor = self.tokenizer.pad(
anchor_features,
padding=self.padding,
max_length=self.max_length,
pad_to_multiple_of=self.pad_to_multiple_of,
return_tensors=self.return_tensors,
)
batch_positive = self.tokenizer.pad(
positive_features,
padding=self.padding,
max_length=self.max_length,
pad_to_multiple_of=self.pad_to_multiple_of,
return_tensors=self.return_tensors,
)
batch_negative = self.tokenizer.pad(
negative_features,
padding=self.padding,
max_length=self.max_length,
pad_to_multiple_of=self.pad_to_multiple_of,
return_tensors=self.return_tensors,
)
# 创建注意力掩码
def create_attention_mask(input_ids):
return (input_ids != self.tokenizer.pad_token_id).int()
return {
"anchor_input_ids": batch_anchor["input_ids"],
"anchor_attention_mask": create_attention_mask(batch_anchor["input_ids"]),
"positive_input_ids": batch_positive["input_ids"],
"positive_attention_mask": create_attention_mask(batch_positive["input_ids"]),
"negative_input_ids": batch_negative["input_ids"],
"negative_attention_mask": create_attention_mask(batch_negative["input_ids"]),
}
class ContrastiveTrainer(Trainer):
def __init__(self, *args, contrastive_config=None, **kwargs):
super().__init__(*args, **kwargs)
if contrastive_config is None:
contrastive_config = {}
# 设置默认值并验证配置
self.temperature = contrastive_config.get("temperature", 0.07)
self.margin = contrastive_config.get("margin", 0.3)
self.contrastive_weight = contrastive_config.get("weight", 0.8)
self.repr_layer = contrastive_config.get("repr_layer", -1)
# 验证必要参数
if not hasattr(self.model.config, "output_hidden_states") or not self.model.config.output_hidden_states:
raise ValueError("模型必须设置output_hidden_states=True以获取隐藏状态")
self.cross_entropy = nn.CrossEntropyLoss()
def compute_contrastive_loss(self, anchor_emb, pos_emb, neg_emb):
# 计算余弦相似度
pos_sim = F.cosine_similarity(anchor_emb, pos_emb)
neg_sim = F.cosine_similarity(anchor_emb, neg_emb)
# InfoNCE损失
numerator = torch.exp(pos_sim / self.temperature)
denominator = numerator + torch.exp(neg_sim / self.temperature)
info_nce_loss = -torch.log(numerator / (denominator + 1e-8)).mean()
# 三元组损失
triplet_loss = F.relu(neg_sim - pos_sim + self.margin).mean()
# 加权组合
return info_nce_loss + triplet_loss
def get_sequence_representation(self, outputs, attention_mask):
# 获取指定层的隐藏状态
hidden_states = outputs.hidden_states[self.repr_layer]
# 获取每个序列的最后一个非填充token
seq_lengths = attention_mask.sum(dim=1) - 1
batch_indices = torch.arange(hidden_states.size(0))
return hidden_states[batch_indices, seq_lengths]
def compute_loss(self, model, inputs, return_outputs=False):
# 提取输入
anchor_ids = inputs["anchor_input_ids"]
anchor_mask = inputs["anchor_attention_mask"]
positive_ids = inputs["positive_input_ids"]
positive_mask = inputs["positive_attention_mask"]
negative_ids = inputs["negative_input_ids"]
negative_mask = inputs["negative_attention_mask"]
# 前向传播获取隐藏状态
def get_embeddings(input_ids, attention_mask):
outputs = model(
input_ids=input_ids,
attention_mask=attention_mask,
output_hidden_states=True,
return_dict=True
)
return self.get_sequence_representation(outputs, attention_mask)
# 获取三元组的嵌入表示
anchor_emb = get_embeddings(anchor_ids, anchor_mask)
pos_emb = get_embeddings(positive_ids, positive_mask)
neg_emb = get_embeddings(negative_ids, negative_mask)
# 计算对比损失
cl_loss = self.compute_contrastive_loss(anchor_emb, pos_emb, neg_emb)
cl_loss = cl_loss * self.contrastive_weight
# 计算语言建模损失(仅针对positive回复)
lm_labels = positive_ids.clone()
lm_labels[lm_labels == self.tokenizer.pad_token_id] = -100
lm_outputs = model(
input_ids=positive_ids,
attention_mask=positive_mask,
labels=lm_labels
)
lm_loss = lm_outputs.loss
# 总损失 = LM损失 + 对比损失
total_loss = lm_loss + cl_loss
# 记录自定义指标
self.log({"cl_loss": cl_loss.item(), "lm_loss": lm_loss.item()})
return (total_loss, lm_outputs) if return_outputs else total_loss
```
### 关键修改说明
1. **类定义冲突解决**:
- 移除了代码2中重复的`ContrastiveTrainer`定义
- 统一使用单个类定义
2. **初始化参数调整**:
- 添加了`tokenizer`参数传递(代码1)
- 简化了配置处理逻辑(代码2)
3. **方法覆盖修正**:
- 移除了自定义的`train()`和`evaluate()`方法
- 仅重写`compute_loss()`方法,保持与Hugging Face Trainer兼容
- 通过`self.log()`记录自定义指标
4. **数据格式适配**:
- 修正了注意力掩码生成逻辑
- 确保数据收集器输出格式匹配模型输入
5. **模型配置要求**:
- 添加`output_hidden_states=True`加载模型(代码1)
- 添加配置验证确保模型输出隐藏状态(代码2)
6. **数值稳定性增强**:
- 在分母计算中添加了微小值防止除零错误
- 使用更稳定的三元组损失实现
7. **日志记录优化**:
- 禁用Hugging Face默认日志报告
- 通过`self.log()`记录对比损失和语言模型损失
###
阅读全文