Pytorch Fsdp2

PyTorch FSDP2 implementation for automated large-scale model training and distributed computing integration

PyTorch FSDP2 is a community skill for distributed training using Fully Sharded Data Parallelism version 2, covering model sharding configuration, mixed precision training, activation checkpointing, and multi-GPU training optimization.

What Is This?

Overview

PyTorch FSDP2 provides patterns for training large models across multiple GPUs using the second generation of Fully Sharded Data Parallelism. It covers per-parameter sharding configuration, composable API design for mixing sharding strategies, mixed precision training with dtype management, activation checkpointing for memory savings, and communication optimization. The skill enables practitioners to train models that exceed single GPU memory by distributing parameters across devices.

Who Should Use This

This skill serves ML engineers training models too large for single GPU memory, teams scaling training across multi-GPU servers, and researchers who need fine-grained control over distributed training strategies for custom model architectures.

Why Use It?

Problems It Solves

Large models exceed single GPU memory, preventing training without distribution strategies. Data parallelism alone duplicates the full model on each GPU, wasting memory. The original FSDP API lacks composability for mixing different sharding strategies within a model. Communication overhead between GPUs reduces training throughput without proper optimization.

Core Highlights

Per-parameter sharding distributes individual parameters across GPUs rather than requiring uniform sharding. Composable APIs allow mixing full sharding, no sharding, and partial sharding within the same model. Mixed precision support manages parameter, gradient, and communication dtypes independently. Backward prefetching overlaps communication with computation for improved throughput.

How to Use It?

Basic Usage

from dataclasses import dataclass, field

@dataclass
class FSDP2Config:
    sharding_strategy: str = "full_shard"
    mixed_precision: str = "bf16"
    activation_checkpointing: bool = True
    backward_prefetch: bool = True
    num_gpus: int = 1
    grad_accumulation_steps: int = 1

@dataclass
class ShardedParameter:
    name: str
    full_shape: tuple
    shard_shape: tuple
    device: str
    dtype: str = "bf16"

class FSDP2Wrapper:
    def __init__(self, config: FSDP2Config):
        self.config = config
        self.sharded_params: list[ShardedParameter] = []

    def shard_parameter(self, name: str, shape: tuple,
                        device_id: int) -> ShardedParameter:
        shard_dim = shape[0] // self.config.num_gpus
        shard_shape = (shard_dim,) + shape[1:]
        param = ShardedParameter(
            name=name, full_shape=shape,
            shard_shape=shard_shape,
            device=f"cuda:{device_id}",
            dtype=self.config.mixed_precision
        )
        self.sharded_params.append(param)
        return param

    def get_memory_stats(self) -> dict:
        total_full = sum(
            eval('*'.join(str(d) for d in p.full_shape))
            for p in self.sharded_params)
        total_shard = sum(
            eval('*'.join(str(d) for d in p.shard_shape))
            for p in self.sharded_params)
        return {"full_params": total_full,
                "sharded_params": total_shard,
                "reduction": round(1 - total_shard / max(total_full, 1), 3)}

Real-World Examples

from dataclasses import dataclass, field

@dataclass
class TrainingState:
    step: int = 0
    loss_history: list[float] = field(default_factory=list)
    grad_norms: list[float] = field(default_factory=list)

class DistributedTrainer:
    def __init__(self, config: FSDP2Config):
        self.config = config
        self.state = TrainingState()
        self.wrapper = FSDP2Wrapper(config)

    def train_step(self, batch: dict,
                   forward_fn, loss_fn) -> float:
        output = forward_fn(batch)
        loss = loss_fn(output, batch["labels"])
        scaled_loss = loss / self.config.grad_accumulation_steps
        self.state.loss_history.append(loss)
        self.state.step += 1
        return loss

    def should_sync_gradients(self) -> bool:
        return (self.state.step %
                self.config.grad_accumulation_steps == 0)

    def get_training_summary(self) -> dict:
        recent = self.state.loss_history[-10:]
        return {
            "step": self.state.step,
            "avg_loss": round(sum(recent) / max(len(recent), 1), 4),
            "memory": self.wrapper.get_memory_stats()
        }

Advanced Tips

Apply different sharding strategies to different layers based on their parameter sizes and access patterns. Use activation checkpointing on transformer blocks to trade recomputation time for significant memory savings. Enable backward prefetching to overlap parameter all-gather communication with gradient computation.

When to Use It?

Use Cases

Train a multi-billion parameter model across a cluster of GPUs that individually lack sufficient memory. Fine-tune large foundation models with LoRA adapters while using FSDP to shard the base model. Scale training from single-GPU prototypes to multi-node clusters with minimal code changes.

Related Topics

Distributed training strategies, model parallelism, DeepSpeed ZeRO, gradient checkpointing, and mixed precision training techniques.

Important Notes

Requirements

PyTorch 2.0 or later with FSDP2 API support. Multiple GPUs connected via high-bandwidth interconnects for efficient communication. NCCL backend for GPU-to-GPU collective operations.

Usage Recommendations

Do: profile communication versus computation time to identify bottlenecks in the training loop. Start with full sharding and selectively switch to hybrid sharding for layers that benefit from reduced communication. Use gradient accumulation to increase effective batch size without additional memory.

Don't: enable full sharding on very small models where the communication overhead exceeds the memory savings. Mix incompatible dtype configurations between parameters and optimizer states. Ignore the impact of sharding on checkpoint saving and loading.

Limitations

Communication overhead can reduce throughput compared to data parallelism for smaller models. Debugging distributed training issues requires understanding the sharding state across all devices. Checkpoint compatibility may change between PyTorch versions as the FSDP2 API evolves.