model-training
1
总安装量
1
周安装量
#49195
全站排名
安装命令
npx skills add https://github.com/gitwalter/cursor-agent-factory --skill model-training
Agent 安装分布
kilo
1
amp
1
opencode
1
kimi-cli
1
droid
1
codex
1
Skill 文档
Model Training Skill
Implement production-ready PyTorch training loops with modern patterns including Accelerate, distributed training (DeepSpeed/FSDP), hyperparameter optimization with Optuna, experiment tracking, mixed precision training, and robust checkpointing strategies.
When to Use
- Training deep learning models from scratch
- Implementing custom training loops
- Setting up distributed training for large models
- Optimizing hyperparameters systematically
- Tracking experiments and metrics
- Training with mixed precision for efficiency
- Implementing checkpointing and resume logic
Prerequisites
pip install torch torchvision torchaudio
pip install accelerate transformers
pip install deepspeed # For DeepSpeed
pip install optuna # For hyperparameter tuning
pip install mlflow wandb # For experiment tracking
pip install tensorboard # For visualization
Process
Step 1: Modern PyTorch Training Loop with Accelerate
Use Accelerate for simplified distributed training:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from accelerate import Accelerator
from accelerate.utils import set_seed
import os
class SimpleDataset(Dataset):
"""Example dataset."""
def __init__(self, size=1000):
self.data = torch.randn(size, 10)
self.labels = torch.randint(0, 2, (size,))
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
return self.data[idx], self.labels[idx]
class SimpleModel(nn.Module):
"""Example model."""
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(10, 64)
self.fc2 = nn.Linear(64, 32)
self.fc3 = nn.Linear(32, 2)
self.relu = nn.ReLU()
def forward(self, x):
x = self.relu(self.fc1(x))
x = self.relu(self.fc2(x))
x = self.fc3(x)
return x
def train_with_accelerate(
model: nn.Module,
train_loader: DataLoader,
val_loader: DataLoader,
num_epochs: int = 10,
learning_rate: float = 1e-3,
checkpoint_dir: str = "./checkpoints"
):
"""Training loop with Accelerate."""
# Initialize accelerator
accelerator = Accelerator(
gradient_accumulation_steps=4,
mixed_precision="fp16", # or "bf16" or None
log_with="tensorboard", # or "wandb", "mlflow"
project_dir="./logs"
)
# Set seed for reproducibility
set_seed(42)
# Prepare model, optimizer, and dataloader
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss()
model, optimizer, train_loader, val_loader = accelerator.prepare(
model, optimizer, train_loader, val_loader
)
# Training loop
for epoch in range(num_epochs):
model.train()
total_loss = 0
for batch_idx, (data, target) in enumerate(train_loader):
# Forward pass
outputs = model(data)
loss = criterion(outputs, target)
# Backward pass with gradient accumulation
accelerator.backward(loss)
# Optimizer step (only after accumulation)
if (batch_idx + 1) % accelerator.gradient_accumulation_steps == 0:
optimizer.step()
optimizer.zero_grad()
total_loss += loss.item()
# Logging
if batch_idx % 100 == 0:
accelerator.log(
{"train_loss": loss.item()},
step=epoch * len(train_loader) + batch_idx
)
# Validation
model.eval()
val_loss = 0
correct = 0
total = 0
with torch.no_grad():
for data, target in val_loader:
outputs = model(data)
loss = criterion(outputs, target)
val_loss += loss.item()
_, predicted = torch.max(outputs.data, 1)
total += target.size(0)
correct += (predicted == target).sum().item()
avg_train_loss = total_loss / len(train_loader)
avg_val_loss = val_loss / len(val_loader)
accuracy = 100 * correct / total
accelerator.log({
"epoch": epoch,
"train_loss": avg_train_loss,
"val_loss": avg_val_loss,
"val_accuracy": accuracy
}, step=epoch)
# Save checkpoint
if accelerator.is_main_process:
checkpoint_path = os.path.join(checkpoint_dir, f"checkpoint_epoch_{epoch}.pt")
accelerator.save_state(checkpoint_path)
# Save best model
if epoch == 0 or accuracy > best_accuracy:
best_accuracy = accuracy
accelerator.save_model(model, os.path.join(checkpoint_dir, "best_model.pt"))
accelerator.print(f"Epoch {epoch}: Train Loss={avg_train_loss:.4f}, "
f"Val Loss={avg_val_loss:.4f}, Accuracy={accuracy:.2f}%")
accelerator.end_training()
# Usage
model = SimpleModel()
train_dataset = SimpleDataset(size=1000)
val_dataset = SimpleDataset(size=200)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
train_with_accelerate(
model=model,
train_loader=train_loader,
val_loader=val_loader,
num_epochs=10,
learning_rate=1e-3
)
Step 2: Distributed Training with DeepSpeed
Implement DeepSpeed ZeRO for large model training:
import deepspeed
from deepspeed.ops.adam import FusedAdam
import json
def create_deepspeed_config(
zero_stage: int = 2,
offload_optimizer: bool = False,
offload_param: bool = False,
gradient_accumulation_steps: int = 1,
train_batch_size: int = 32,
learning_rate: float = 1e-3
) -> dict:
"""Create DeepSpeed configuration."""
config = {
"train_batch_size": train_batch_size,
"train_micro_batch_size_per_gpu": train_batch_size,
"gradient_accumulation_steps": gradient_accumulation_steps,
"optimizer": {
"type": "AdamW",
"params": {
"lr": learning_rate,
"betas": [0.9, 0.999],
"eps": 1e-8,
"weight_decay": 0.01
}
},
"scheduler": {
"type": "WarmupLR",
"params": {
"warmup_min_lr": 0,
"warmup_max_lr": learning_rate,
"warmup_num_steps": 1000
}
},
"zero_optimization": {
"stage": zero_stage,
"offload_optimizer": {
"device": "cpu" if offload_optimizer else "none"
},
"offload_param": {
"device": "cpu" if offload_param else "none"
},
"overlap_comm": True,
"contiguous_gradients": True,
"sub_group_size": 1e9,
"reduce_bucket_size": "auto",
"stage3_prefetch_bucket_size": "auto",
"stage3_param_persistence_threshold": "auto",
"stage3_max_live_parameters": 1e9,
"stage3_max_reuse_distance": 1e9,
"stage3_gather_16bit_weights_on_model_save": True
},
"gradient_clipping": 1.0,
"fp16": {
"enabled": True,
"loss_scale": 0,
"loss_scale_window": 1000,
"initial_scale_power": 16,
"hysteresis": 2,
"min_loss_scale": 1
},
"wall_clock_breakdown": False
}
return config
def train_with_deepspeed(
model: nn.Module,
train_loader: DataLoader,
config_path: str = "deepspeed_config.json"
):
"""Training with DeepSpeed."""
# Save config
config = create_deepspeed_config(zero_stage=2, offload_optimizer=False)
with open(config_path, "w") as f:
json.dump(config, f, indent=2)
# Initialize DeepSpeed
model_engine, optimizer, train_loader, _ = deepspeed.initialize(
model=model,
model_parameters=model.parameters(),
training_data=train_dataset,
config=config_path
)
# Training loop
for epoch in range(num_epochs):
model_engine.train()
for batch_idx, (data, target) in enumerate(train_loader):
# Forward pass
outputs = model_engine(data)
loss = criterion(outputs, target)
# Backward pass
model_engine.backward(loss)
# Optimizer step
model_engine.step()
# Logging
if batch_idx % 100 == 0:
print(f"Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item()}")
# Save checkpoint
model_engine.save_checkpoint(
save_dir=f"./checkpoints/epoch_{epoch}",
tag=f"epoch_{epoch}"
)
Step 3: Distributed Training with FSDP (Fully Sharded Data Parallel)
Use PyTorch FSDP for efficient distributed training:
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
from torch.distributed.fsdp import ShardingStrategy
from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy
import torch.distributed as dist
def train_with_fsdp(
model: nn.Module,
train_loader: DataLoader,
num_epochs: int = 10
):
"""Training with FSDP."""
# Initialize distributed
dist.init_process_group(backend="nccl")
# Auto-wrap policy for transformer layers
auto_wrap_policy = transformer_auto_wrap_policy
# Create FSDP model
model = FSDP(
model,
sharding_strategy=ShardingStrategy.FULL_SHARD,
auto_wrap_policy=auto_wrap_policy,
mixed_precision=torch.distributed.fsdp.MixedPrecision(
param_dtype=torch.float16,
reduce_dtype=torch.float16,
buffer_dtype=torch.float16
),
device_id=torch.cuda.current_device()
)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)
# Training loop
for epoch in range(num_epochs):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data = data.cuda()
target = target.cuda()
optimizer.zero_grad()
outputs = model(data)
loss = criterion(outputs, target)
loss.backward()
optimizer.step()
if batch_idx % 100 == 0:
print(f"Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item()}")
# Save checkpoint
if dist.get_rank() == 0:
save_policy = torch.distributed.fsdp.FullStateDictConfig(offload_to_cpu=True)
with FSDP.state_dict_type(model, StateDictType.FULL_STATE_DICT, save_policy):
state_dict = model.state_dict()
torch.save(state_dict, f"./checkpoints/epoch_{epoch}.pt")
dist.destroy_process_group()
Step 4: Hyperparameter Tuning with Optuna
Systematic hyperparameter optimization:
import optuna
from optuna.trial import TrialState
import joblib
def objective(trial: optuna.Trial, train_loader: DataLoader, val_loader: DataLoader):
"""Optuna objective function."""
# Suggest hyperparameters
learning_rate = trial.suggest_loguniform("learning_rate", 1e-5, 1e-2)
batch_size = trial.suggest_categorical("batch_size", [16, 32, 64, 128])
weight_decay = trial.suggest_loguniform("weight_decay", 1e-6, 1e-3)
dropout_rate = trial.suggest_uniform("dropout_rate", 0.0, 0.5)
# Create model with suggested dropout
model = SimpleModel()
# Add dropout layers based on dropout_rate
# Create optimizer
optimizer = torch.optim.AdamW(
model.parameters(),
lr=learning_rate,
weight_decay=weight_decay
)
# Training loop (simplified)
num_epochs = 5 # Fewer epochs for hyperparameter search
best_val_loss = float("inf")
for epoch in range(num_epochs):
model.train()
for data, target in train_loader:
optimizer.zero_grad()
outputs = model(data)
loss = criterion(outputs, target)
loss.backward()
optimizer.step()
# Validation
model.eval()
val_loss = 0
with torch.no_grad():
for data, target in val_loader:
outputs = model(data)
loss = criterion(outputs, target)
val_loss += loss.item()
avg_val_loss = val_loss / len(val_loader)
# Report intermediate value
trial.report(avg_val_loss, epoch)
# Pruning
if trial.should_prune():
raise optuna.TrialPruned()
if avg_val_loss < best_val_loss:
best_val_loss = avg_val_loss
return best_val_loss
def run_hyperparameter_search(
train_loader: DataLoader,
val_loader: DataLoader,
n_trials: int = 50,
study_name: str = "hyperparameter_study"
):
"""Run hyperparameter optimization."""
# Create study
study = optuna.create_study(
direction="minimize",
study_name=study_name,
pruner=optuna.pruners.MedianPruner(n_startup_trials=5, n_warmup_steps=3)
)
# Optimize
study.optimize(
lambda trial: objective(trial, train_loader, val_loader),
n_trials=n_trials,
timeout=3600 # 1 hour timeout
)
# Print results
print("Number of finished trials: ", len(study.trials))
print("Number of pruned trials: ", len([t for t in study.trials if t.state == TrialState.PRUNED]))
print("Number of complete trials: ", len([t for t in study.trials if t.state == TrialState.COMPLETE]))
print("\nBest trial:")
trial = study.best_trial
print(f" Value: {trial.value}")
print(" Params: ")
for key, value in trial.params.items():
print(f" {key}: {value}")
# Save study
joblib.dump(study, f"{study_name}.pkl")
# Visualize
try:
import optuna.visualization as vis
fig = vis.plot_optimization_history(study)
fig.show()
except:
pass
return study
# Usage
study = run_hyperparameter_search(train_loader, val_loader, n_trials=50)
best_params = study.best_params
Step 5: Experiment Tracking with MLflow
Track experiments and metrics:
import mlflow
import mlflow.pytorch
from mlflow.tracking import MlflowClient
def train_with_mlflow(
model: nn.Module,
train_loader: DataLoader,
val_loader: DataLoader,
experiment_name: str = "model_training",
run_name: str = None
):
"""Training with MLflow tracking."""
# Set experiment
mlflow.set_experiment(experiment_name)
# Start run
with mlflow.start_run(run_name=run_name):
# Log hyperparameters
mlflow.log_params({
"learning_rate": 1e-3,
"batch_size": 32,
"num_epochs": 10,
"optimizer": "AdamW"
})
# Training loop
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
for epoch in range(10):
model.train()
train_loss = 0
for data, target in train_loader:
optimizer.zero_grad()
outputs = model(data)
loss = criterion(outputs, target)
loss.backward()
optimizer.step()
train_loss += loss.item()
# Validation
model.eval()
val_loss = 0
correct = 0
total = 0
with torch.no_grad():
for data, target in val_loader:
outputs = model(data)
loss = criterion(outputs, target)
val_loss += loss.item()
_, predicted = torch.max(outputs.data, 1)
total += target.size(0)
correct += (predicted == target).sum().item()
# Log metrics
mlflow.log_metrics({
"train_loss": train_loss / len(train_loader),
"val_loss": val_loss / len(val_loader),
"val_accuracy": 100 * correct / total
}, step=epoch)
# Log model
mlflow.pytorch.log_model(model, "model")
# Log artifacts
mlflow.log_artifact("./checkpoints/best_model.pt")
# Register model (optional)
mlflow.pytorch.log_model(
model,
"model",
registered_model_name="SimpleModel"
)
print(f"MLflow run completed. View at: {mlflow.get_tracking_uri()}")
# Usage
train_with_mlflow(model, train_loader, val_loader, experiment_name="my_experiment")
Step 6: Mixed Precision Training
Implement automatic mixed precision (AMP):
from torch.cuda.amp import autocast, GradScaler
def train_with_amp(
model: nn.Module,
train_loader: DataLoader,
num_epochs: int = 10
):
"""Training with automatic mixed precision."""
model = model.cuda()
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
scaler = GradScaler() # For gradient scaling
for epoch in range(num_epochs):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.cuda(), target.cuda()
optimizer.zero_grad()
# Forward pass with autocast
with autocast():
outputs = model(data)
loss = criterion(outputs, target)
# Backward pass with scaler
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
if batch_idx % 100 == 0:
print(f"Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item()}")
Step 7: Checkpointing and Resume Strategies
Robust checkpointing with resume capability:
import os
from pathlib import Path
class CheckpointManager:
"""Manage model checkpoints and resume training."""
def __init__(self, checkpoint_dir: str = "./checkpoints"):
self.checkpoint_dir = Path(checkpoint_dir)
self.checkpoint_dir.mkdir(exist_ok=True)
self.best_loss = float("inf")
def save_checkpoint(
self,
model: nn.Module,
optimizer: torch.optim.Optimizer,
epoch: int,
loss: float,
is_best: bool = False,
metadata: dict = None
):
"""Save checkpoint."""
checkpoint = {
"epoch": epoch,
"model_state_dict": model.state_dict(),
"optimizer_state_dict": optimizer.state_dict(),
"loss": loss,
"best_loss": self.best_loss,
"metadata": metadata or {}
}
# Save regular checkpoint
checkpoint_path = self.checkpoint_dir / f"checkpoint_epoch_{epoch}.pt"
torch.save(checkpoint, checkpoint_path)
# Save best model
if is_best or loss < self.best_loss:
self.best_loss = loss
best_path = self.checkpoint_dir / "best_model.pt"
torch.save(checkpoint, best_path)
print(f"Saved best model with loss: {loss:.4f}")
# Save latest checkpoint
latest_path = self.checkpoint_dir / "latest_checkpoint.pt"
torch.save(checkpoint, latest_path)
def load_checkpoint(
self,
model: nn.Module,
optimizer: torch.optim.Optimizer = None,
checkpoint_path: str = None,
resume: bool = True
):
"""Load checkpoint."""
if checkpoint_path is None:
checkpoint_path = self.checkpoint_dir / "latest_checkpoint.pt"
if not os.path.exists(checkpoint_path):
print(f"No checkpoint found at {checkpoint_path}")
return 0
checkpoint = torch.load(checkpoint_path)
model.load_state_dict(checkpoint["model_state_dict"])
if optimizer and resume:
optimizer.load_state_dict(checkpoint["optimizer_state_dict"])
start_epoch = checkpoint["epoch"] + 1 if resume else 0
self.best_loss = checkpoint.get("best_loss", float("inf"))
print(f"Loaded checkpoint from epoch {checkpoint['epoch']}")
print(f"Resume training from epoch {start_epoch}")
return start_epoch
# Usage in training loop
checkpoint_manager = CheckpointManager()
# Resume training
start_epoch = checkpoint_manager.load_checkpoint(model, optimizer, resume=True)
for epoch in range(start_epoch, num_epochs):
# Training...
val_loss = validate(model, val_loader)
# Save checkpoint
is_best = val_loss < checkpoint_manager.best_loss
checkpoint_manager.save_checkpoint(
model=model,
optimizer=optimizer,
epoch=epoch,
loss=val_loss,
is_best=is_best,
metadata={"learning_rate": optimizer.param_groups[0]["lr"]}
)
Output
After training, you’ll have:
- Trained Model – Optimized model weights
- Checkpoints – Regular and best model checkpoints
- Training Logs – Metrics, losses, and training history
- Hyperparameter Results – Optimal hyperparameters from tuning
- Experiment Tracking – MLflow/W&B runs with all metrics
- Resume Capability – Ability to resume training from checkpoints
Best Practices
- Always use Accelerate for simplified distributed training
- Implement proper checkpointing and resume logic
- Track experiments with MLflow or W&B
- Use mixed precision (AMP) for faster training
- Optimize hyperparameters systematically with Optuna
- Validate data before training
- Monitor for overfitting with validation metrics
- Use gradient accumulation for effective larger batch sizes
Anti-Patterns
| Anti-Pattern | Fix |
|---|---|
| No checkpointing | Save checkpoints regularly |
| No experiment tracking | Use MLflow/W&B |
| Hardcoded hyperparameters | Use Optuna for optimization |
| No validation | Always validate during training |
| Sync operations in training loop | Use async/overlap operations |
| No gradient scaling with AMP | Always use GradScaler |
Related
- Knowledge:
{directories.knowledge}/pytorch-patterns.json,{directories.knowledge}/model-training-patterns.json - Skill:
model-fine-tuning - Skill:
llm-evaluation