Skip to content

第55天:模型训练与评估

学习目标

  • 掌握训练配置方法
  • 学习超参数调优技术
  • 理解训练监控
  • 掌握模型评估方法
  • 了解性能优化策略

训练配置

基础配置

python
from dataclasses import dataclass
from typing import Optional

@dataclass
class TrainingConfig:
    model_name: str
    output_dir: str = "./output"
    num_train_epochs: int = 3
    per_device_train_batch_size: int = 4
    per_device_eval_batch_size: int = 4
    gradient_accumulation_steps: int = 4
    learning_rate: float = 2e-4
    weight_decay: float = 0.01
    warmup_steps: int = 500
    logging_steps: int = 10
    save_steps: int = 500
    eval_steps: int = 500
    save_total_limit: int = 3
    fp16: bool = True
    bf16: bool = False
    gradient_checkpointing: bool = False
    dataloader_num_workers: int = 4
    seed: int = 42
    
    def to_dict(self) -> dict:
        return {
            "model_name": self.model_name,
            "output_dir": self.output_dir,
            "num_train_epochs": self.num_train_epochs,
            "per_device_train_batch_size": self.per_device_train_batch_size,
            "per_device_eval_batch_size": self.per_device_eval_batch_size,
            "gradient_accumulation_steps": self.gradient_accumulation_steps,
            "learning_rate": self.learning_rate,
            "weight_decay": self.weight_decay,
            "warmup_steps": self.warmup_steps,
            "logging_steps": self.logging_steps,
            "save_steps": self.save_steps,
            "eval_steps": self.eval_steps,
            "save_total_limit": self.save_total_limit,
            "fp16": self.fp16,
            "bf16": self.bf16,
            "gradient_checkpointing": self.gradient_checkpointing,
            "dataloader_num_workers": self.dataloader_num_workers,
            "seed": self.seed
        }

LoRA配置

python
@dataclass
class LoRAConfig:
    r: int = 8
    lora_alpha: int = 32
    target_modules: list = None
    lora_dropout: float = 0.05
    bias: str = "none"
    task_type: str = "CAUSAL_LM"
    inference_mode: bool = False
    
    def __post_init__(self):
        if self.target_modules is None:
            self.target_modules = ["q_proj", "v_proj"]
    
    def to_dict(self) -> dict:
        return {
            "r": self.r,
            "lora_alpha": self.lora_alpha,
            "target_modules": self.target_modules,
            "lora_dropout": self.lora_dropout,
            "bias": self.bias,
            "task_type": self.task_type,
            "inference_mode": self.inference_mode
        }

QLoRA配置

python
@dataclass
class QLoRAConfig:
    load_in_4bit: bool = True
    bnb_4bit_use_double_quant: bool = True
    bnb_4bit_quant_type: str = "nf4"
    bnb_4bit_compute_dtype: str = "bfloat16"
    device_map: str = "auto"
    
    def to_dict(self) -> dict:
        return {
            "load_in_4bit": self.load_in_4bit,
            "bnb_4bit_use_double_quant": self.bnb_4bit_use_double_quant,
            "bnb_4bit_quant_type": self.bnb_4bit_quant_type,
            "bnb_4bit_compute_dtype": self.bnb_4bit_compute_dtype,
            "device_map": self.device_map
        }

超参数调优

网格搜索

python
from itertools import product

class GridSearchOptimizer:
    def __init__(self, param_grid: dict):
        self.param_grid = param_grid
    
    def generate_configs(self) -> list:
        keys = self.param_grid.keys()
        values = self.param_grid.values()
        
        combinations = product(*values)
        
        configs = []
        for combination in combinations:
            config = dict(zip(keys, combination))
            configs.append(config)
        
        return configs
    
    def optimize(self, train_func, eval_func, 
                 n_trials: int = None) -> dict:
        configs = self.generate_configs()
        
        if n_trials:
            configs = configs[:n_trials]
        
        best_config = None
        best_score = float('-inf')
        
        for i, config in enumerate(configs):
            print(f"Trial {i+1}/{len(configs)}: {config}")
            
            model = train_func(config)
            score = eval_func(model)
            
            print(f"Score: {score}")
            
            if score > best_score:
                best_score = score
                best_config = config
        
        return {
            "best_config": best_config,
            "best_score": best_score
        }

随机搜索

python
import random

class RandomSearchOptimizer:
    def __init__(self, param_space: dict):
        self.param_space = param_space
    
    def sample_config(self) -> dict:
        config = {}
        
        for param_name, param_range in self.param_space.items():
            if isinstance(param_range, list):
                config[param_name] = random.choice(param_range)
            elif isinstance(param_range, tuple):
                min_val, max_val = param_range
                config[param_name] = random.uniform(min_val, max_val)
            elif isinstance(param_range, dict):
                if param_range.get("type") == "log":
                    min_val, max_val = param_range["range"]
                    log_min = math.log10(min_val)
                    log_max = math.log10(max_val)
                    config[param_name] = 10 ** random.uniform(log_min, log_max)
        
        return config
    
    def optimize(self, train_func, eval_func, 
                 n_trials: int = 10) -> dict:
        best_config = None
        best_score = float('-inf')
        
        for i in range(n_trials):
            config = self.sample_config()
            
            print(f"Trial {i+1}/{n_trials}: {config}")
            
            model = train_func(config)
            score = eval_func(model)
            
            print(f"Score: {score}")
            
            if score > best_score:
                best_score = score
                best_config = config
        
        return {
            "best_config": best_config,
            "best_score": best_score
        }

贝叶斯优化

python
class BayesianOptimizer:
    def __init__(self, param_space: dict):
        self.param_space = param_space
        self.trials = []
    
    def suggest_config(self) -> dict:
        if len(self.trials) == 0:
            return self._random_config()
        
        return self._acquisition_function()
    
    def _random_config(self) -> dict:
        config = {}
        
        for param_name, param_range in self.param_space.items():
            if isinstance(param_range, list):
                config[param_name] = random.choice(param_range)
            elif isinstance(param_range, tuple):
                min_val, max_val = param_range
                config[param_name] = random.uniform(min_val, max_val)
        
        return config
    
    def _acquisition_function(self) -> dict:
        return self._random_config()
    
    def observe(self, config: dict, score: float):
        self.trials.append({
            "config": config,
            "score": score
        })
    
    def optimize(self, train_func, eval_func, 
                 n_trials: int = 10) -> dict:
        best_config = None
        best_score = float('-inf')
        
        for i in range(n_trials):
            config = self.suggest_config()
            
            print(f"Trial {i+1}/{n_trials}: {config}")
            
            model = train_func(config)
            score = eval_func(model)
            
            self.observe(config, score)
            
            print(f"Score: {score}")
            
            if score > best_score:
                best_score = score
                best_config = config
        
        return {
            "best_config": best_config,
            "best_score": best_score
        }

训练监控

指标追踪

python
import time
from typing import Dict, List

class MetricsTracker:
    def __init__(self):
        self.metrics = {
            "train_loss": [],
            "eval_loss": [],
            "learning_rate": [],
            "epoch": [],
            "step": [],
            "time": []
        }
    
    def log(self, metrics: Dict, step: int, epoch: int):
        for key, value in metrics.items():
            if key in self.metrics:
                self.metrics[key].append(value)
        
        self.metrics["epoch"].append(epoch)
        self.metrics["step"].append(step)
        self.metrics["time"].append(time.time())
    
    def get_metrics(self) -> Dict:
        return self.metrics
    
    def get_summary(self) -> Dict:
        summary = {}
        
        for key, values in self.metrics.items():
            if values:
                summary[key] = {
                    "mean": sum(values) / len(values),
                    "min": min(values),
                    "max": max(values),
                    "last": values[-1]
                }
        
        return summary

WandB集成

python
class WandBTracker:
    def __init__(self, project: str, config: Dict):
        try:
            import wandb
            self.wandb = wandb
            self.wandb.init(project=project, config=config)
        except ImportError:
            raise ImportError("Install wandb: pip install wandb")
    
    def log(self, metrics: Dict, step: int):
        self.wandb.log(metrics, step=step)
    
    def log_model(self, model_path: str, name: str):
        self.wandb.save(model_path)
        self.wandb.log_artifact(name, type="model")
    
    def finish(self):
        self.wandb.finish()

MLflow集成

python
class MLflowTracker:
    def __init__(self, experiment_name: str):
        try:
            import mlflow
            self.mlflow = mlflow
            self.mlflow.set_experiment(experiment_name)
        except ImportError:
            raise ImportError("Install mlflow: pip install mlflow")
    
    def log_params(self, params: Dict):
        for key, value in params.items():
            self.mlflow.log_param(key, value)
    
    def log_metrics(self, metrics: Dict, step: int):
        for key, value in metrics.items():
            self.mlflow.log_metric(key, value, step=step)
    
    def log_model(self, model, name: str):
        self.mlflow.pytorch.log_model(model, name)
    
    def end_run(self):
        self.mlflow.end_run()

模型评估

基础评估

python
import numpy as np
from typing import List, Dict

class ModelEvaluator:
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer
    
    def evaluate(self, eval_dataset) -> Dict:
        self.model.eval()
        
        total_loss = 0
        predictions = []
        labels = []
        
        with torch.no_grad():
            for batch in eval_dataset:
                outputs = self.model(**batch)
                
                total_loss += outputs.loss.item()
                
                if hasattr(outputs, "logits"):
                    preds = torch.argmax(outputs.logits, dim=-1)
                    predictions.extend(preds.cpu().numpy())
                    
                    if "labels" in batch:
                        labels.extend(batch["labels"].cpu().numpy())
        
        metrics = {
            "eval_loss": total_loss / len(eval_dataset)
        }
        
        if predictions and labels:
            metrics.update(self._calculate_classification_metrics(predictions, labels))
        
        return metrics
    
    def _calculate_classification_metrics(self, 
                                        predictions: List, 
                                        labels: List) -> Dict:
        from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
        
        return {
            "accuracy": accuracy_score(labels, predictions),
            "precision": precision_score(labels, predictions, average="weighted"),
            "recall": recall_score(labels, predictions, average="weighted"),
            "f1": f1_score(labels, predictions, average="weighted")
        }

生成评估

python
class GenerationEvaluator:
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer
    
    def evaluate_generation(self, test_prompts: List[str], 
                           max_length: int = 100) -> Dict:
        generations = []
        
        for prompt in test_prompts:
            generation = self._generate(prompt, max_length)
            generations.append({
                "prompt": prompt,
                "generation": generation
            })
        
        metrics = {
            "avg_length": self._calculate_avg_length(generations),
            "diversity": self._calculate_diversity(generations),
            "repetition": self._calculate_repetition(generations)
        }
        
        return {
            "generations": generations,
            "metrics": metrics
        }
    
    def _generate(self, prompt: str, max_length: int) -> str:
        inputs = self.tokenizer(prompt, return_tensors="pt")
        
        with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_length=max_length,
                do_sample=True,
                temperature=0.7,
                top_p=0.9
            )
        
        return self.tokenizer.decode(outputs[0], skip_special_tokens=True)
    
    def _calculate_avg_length(self, generations: List[Dict]) -> float:
        lengths = [len(gen["generation"].split()) for gen in generations]
        return sum(lengths) / len(lengths)
    
    def _calculate_diversity(self, generations: List[Dict]) -> float:
        all_words = set()
        
        for gen in generations:
            words = gen["generation"].split()
            all_words.update(words)
        
        total_words = sum(len(gen["generation"].split()) for gen in generations)
        
        return len(all_words) / total_words if total_words > 0 else 0
    
    def _calculate_repetition(self, generations: List[Dict]) -> float:
        total_repetition = 0
        
        for gen in generations:
            words = gen["generation"].split()
            unique_words = set(words)
            
            if len(words) > 0:
                repetition = 1 - (len(unique_words) / len(words))
                total_repetition += repetition
        
        return total_repetition / len(generations)

BLEU评估

python
class BLEUEvaluator:
    def __init__(self):
        pass
    
    def evaluate(self, references: List[str], 
                 hypotheses: List[str]) -> Dict:
        try:
            from nltk.translate.bleu_score import sentence_bleu, corpus_bleu
        except ImportError:
            raise ImportError("Install nltk: pip install nltk")
        
        sentence_scores = []
        for ref, hyp in zip(references, hypotheses):
            ref_tokens = ref.split()
            hyp_tokens = hyp.split()
            
            score = sentence_bleu([ref_tokens], hyp_tokens)
            sentence_scores.append(score)
        
        corpus_score = corpus_bleu(
            [[ref.split()] for ref in references],
            [hyp.split() for hyp in hypotheses]
        )
        
        return {
            "corpus_bleu": corpus_score,
            "avg_sentence_bleu": sum(sentence_scores) / len(sentence_scores),
            "sentence_bleu_scores": sentence_scores
        }

ROUGE评估

python
class ROUGEEvaluator:
    def __init__(self):
        pass
    
    def evaluate(self, references: List[str], 
                 hypotheses: List[str]) -> Dict:
        try:
            from rouge import Rouge
        except ImportError:
            raise ImportError("Install rouge: pip install rouge")
        
        rouge = Rouge()
        
        scores = rouge.get_scores(hypotheses, references, avg=True)
        
        return {
            "rouge-1": scores["rouge-1"],
            "rouge-2": scores["rouge-2"],
            "rouge-l": scores["rouge-l"]
        }

性能优化

混合精度训练

python
class MixedPrecisionTrainer:
    def __init__(self, model, config):
        self.model = model
        self.config = config
        self.scaler = torch.cuda.amp.GradScaler()
    
    def train_step(self, batch, optimizer):
        optimizer.zero_grad()
        
        with torch.cuda.amp.autocast():
            outputs = self.model(**batch)
            loss = outputs.loss
        
        self.scaler.scale(loss).backward()
        self.scaler.step(optimizer)
        self.scaler.update()
        
        return loss.item()

梯度累积

python
class GradientAccumulationTrainer:
    def __init__(self, model, accumulation_steps: int = 4):
        self.model = model
        self.accumulation_steps = accumulation_steps
        self.accumulated_batches = 0
    
    def train_step(self, batch, optimizer):
        loss = self.model(**batch).loss / self.accumulation_steps
        loss.backward()
        
        self.accumulated_batches += 1
        
        if self.accumulated_batches >= self.accumulation_steps:
            optimizer.step()
            optimizer.zero_grad()
            self.accumulated_batches = 0
        
        return loss.item() * self.accumulation_steps

梯度检查点

python
class GradientCheckpointingTrainer:
    def __init__(self, model):
        self.model = model
        self._enable_gradient_checkpointing()
    
    def _enable_gradient_checkpointing(self):
        if hasattr(self.model, "gradient_checkpointing_enable"):
            self.model.gradient_checkpointing_enable()
            self.model.enable_input_require_grads()
        else:
            for module in self.model.modules():
                if hasattr(module, "gradient_checkpointing_enable"):
                    module.gradient_checkpointing_enable()

实践练习

练习1:实现完整的训练流程

python
class CompleteTrainer:
    def __init__(self, model, tokenizer, config):
        self.model = model
        self.tokenizer = tokenizer
        self.config = config
        self.metrics_tracker = MetricsTracker()
    
    def train(self, train_dataset, eval_dataset):
        optimizer = self._create_optimizer()
        scheduler = self._create_scheduler(optimizer)
        
        for epoch in range(self.config.num_train_epochs):
            train_loss = self._train_epoch(train_dataset, optimizer)
            eval_metrics = self._evaluate(eval_dataset)
            
            print(f"Epoch {epoch+1}: Train Loss: {train_loss:.4f}")
            print(f"Eval: {eval_metrics}")
            
            scheduler.step()
        
        return self.model
    
    def _train_epoch(self, train_dataset, optimizer):
        self.model.train()
        total_loss = 0
        
        for step, batch in enumerate(train_dataset):
            loss = self._train_step(batch, optimizer)
            total_loss += loss
            
            if step % self.config.logging_steps == 0:
                self.metrics_tracker.log({
                    "train_loss": loss,
                    "learning_rate": optimizer.param_groups[0]["lr"]
                }, step, epoch)
        
        return total_loss / len(train_dataset)
    
    def _train_step(self, batch, optimizer):
        optimizer.zero_grad()
        outputs = self.model(**batch)
        loss = outputs.loss
        loss.backward()
        optimizer.step()
        return loss.item()
    
    def _evaluate(self, eval_dataset):
        evaluator = ModelEvaluator(self.model, self.tokenizer)
        return evaluator.evaluate(eval_dataset)
    
    def _create_optimizer(self):
        return torch.optim.AdamW(
            self.model.parameters(),
            lr=self.config.learning_rate,
            weight_decay=self.config.weight_decay
        )
    
    def _create_scheduler(self, optimizer):
        from transformers import get_linear_schedule_with_warmup
        
        return get_linear_schedule_with_warmup(
            optimizer,
            num_warmup_steps=self.config.warmup_steps,
            num_training_steps=len(train_dataset) * self.config.num_train_epochs
        )

总结

本节我们学习了模型训练与评估:

  1. 训练配置方法(基础配置、LoRA、QLoRA)
  2. 超参数调优技术(网格搜索、随机搜索、贝叶斯优化)
  3. 训练监控(指标追踪、WandB、MLflow)
  4. 模型评估方法(基础评估、生成评估、BLEU、ROUGE)
  5. 性能优化策略(混合精度、梯度累积、梯度检查点)

掌握这些技术可以显著提升微调效果和效率。

参考资源