Data Parallelism from Associativity
Data parallelism works because gradient accumulation is associative. This isn't an implementation detail—it's the mathematical foundation that makes the entire approach valid. Understanding this foundation reveals both the power and the limits of data parallelism.
The Question: We compute gradients on different batches on different GPUs, then sum them. Why does this give us the same result as computing on the full batch? When does it fail?
Building On: Part III Collectives
This part assumes mastery of collective operations—especially AllReduce (Chapter 11), the ring and tree algorithms (Chapter 12), and cost modeling (Chapter 13). Each parallelism strategy we derive here will rely on specific collectives. Data parallelism, our first strategy, uses AllReduce to synchronize gradients.
The Mathematical Foundation¶
The Loss Function Structure¶
Consider a loss function over a dataset \(\mathcal{D}\):
where \(\ell(x, \theta)\) is the per-sample loss.
In minibatch SGD, we approximate this with a batch \(B\):
The gradient is:
The Partitioning Theorem¶
Intuition check
We compute gradients on different data subsets on different GPUs, then average them. Will this give exactly the same gradient as computing on the full batch? What mathematical property makes this work?
Theorem: For a batch \(B\) partitioned into \(P\) disjoint subsets \(B = B_1 \cup B_2 \cup \cdots \cup B_P\) with \(|B_i| = |B|/P\) for all \(i\):
Proof:
Starting from the definition:
Since \(B = \bigcup_{i=1}^{P} B_i\) with disjoint \(B_i\):
With \(|B_i| = |B|/P\):
This partitioning is valid because:
- Addition is associative: \((a + b) + c = a + (b + c)\)
- Addition is commutative: \(a + b = b + a\)
- Gradients are elements of a vector space: They inherit these properties from real numbers
The Key Insight¶
The gradient of a sum equals the sum of gradients:
This linearity of the gradient operator combined with associativity of addition is what makes data parallelism mathematically sound.
The Synchronous Data Parallel Algorithm¶
Basic Algorithm¶
Input: Model θ, Dataset D, P GPUs, learning rate η
Output: Trained model θ
1. Broadcast θ to all P GPUs
2. For each training step:
a. Sample batch B of size |B| = P × b
b. Partition: Bi = samples [(i-1)b : ib] for GPU i
c. On each GPU i in parallel:
- Forward: yi = f(Bi; θ)
- Loss: Li = L(yi, targets)
- Backward: gi = ∇θLi
d. AllReduce: g = (1/P) Σi gi
e. Update: θ ← θ - η·g
f. (Implicit) All GPUs now have identical θ
3. Return θ
Correctness Invariant¶
Invariant: At the start of each step, all GPUs hold identical model parameters.
Proof by induction:
Base case: Step 0 broadcasts θ, so all GPUs start identical.
Inductive step: Assume all GPUs have identical θ at step \(t\). After AllReduce, all GPUs have identical gradient \(g\). Applying the same update rule with the same learning rate:
produces identical \(\theta^{(t+1)}\) on all GPUs. \(\square\)
This invariant is why we don't need to broadcast parameters every step—the identical update maintains synchronization.
Pseudocode Implementation¶
import os
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
def train_data_parallel(model, dataloader, optimizer, epochs):
"""Synchronous data parallel training."""
rank = dist.get_rank()
world_size = dist.get_world_size()
# Wrap model in DDP (handles gradient synchronization)
model = DDP(model, device_ids=[rank])
for epoch in range(epochs):
# Distributed sampler ensures non-overlapping batches
dataloader.sampler.set_epoch(epoch)
for batch in dataloader:
# Forward pass (identical on each GPU with different data)
loss = model(batch)
# Backward pass (computes local gradients)
loss.backward()
# DDP automatically performs AllReduce here
# Gradients are averaged across all ranks
# Update (identical update on all GPUs)
optimizer.step()
optimizer.zero_grad()
Communication Analysis¶
Per-Step Communication Volume¶
Let \(\Psi\) be the number of model parameters (in elements, not bytes).
AllReduce volume: Each parameter's gradient must be synchronized.
Using ring AllReduce:
For 32-bit floats:
As \(P \to \infty\):
Communication Time¶
Using the α-β model with ring AllReduce:
For large \(P\) and large \(\Psi\) (bandwidth-dominated):
Compute Time¶
Per GPU, with local batch size \(b\):
where \(F\) is the GPU's FLOP/s.
Compute-Communication Ratio¶
The critical ratio:
Simplifying:
For FP16 training:
Practice
If scaling stalls, compute \(R\) with your actual \(b\), \(F\), and network \(\beta\). When \(R < 1\), you're communication-bound—increase local batch, accumulate gradients, or move to hierarchical AllReduce.
Example: H100 with \(F = 989 \times 10^{12}\) FLOP/s (dense BF16, see Hardware Assumptions), NVLink \(\beta = 900\) GB/s:
For \(R > 1\) (compute-bound), need:
Scaling Efficiency¶
Define scaling efficiency:
where \(T_1\) is the single-GPU time for the same global batch, and \(T_P\) is \(P\)-GPU time per step (strong scaling).
With perfect overlap:
Compute-bound regime (\(T_{\text{compute}} > T_{\text{comm}}\)):
Perfect scaling! But this assumes: 1. Perfect overlap of communication and computation 2. No additional overhead
Communication-bound regime (\(T_{\text{comm}} > T_{\text{compute}}\)):
Efficiency degrades as \(P\) increases.
Overlapping Communication and Computation¶
The Overlap Opportunity¶
Backward pass computes gradients layer by layer, from output to input. Once a layer's gradient is computed, it can be communicated while computing earlier layers' gradients.
Time →
Layer 4: [backward][ AllReduce ]
Layer 3: [backward][ AllReduce ]
Layer 2: [backward][ AllReduce ]
Layer 1: [backward][ AllReduce ]
↑
Overlap region
Bucket-Based Overlap¶
PyTorch DDP groups gradients into buckets:
# DDP bucket configuration
model = DDP(
model,
device_ids=[rank],
bucket_cap_mb=25, # Bucket size in MB
)
Bucketing algorithm: 1. Register gradients in reverse computation order 2. When bucket reaches capacity, trigger AllReduce 3. Continue computing while AllReduce proceeds
Optimal bucket size:
- Too small: Many small AllReduces (latency overhead)
- Too large: Less overlap opportunity
Empirically: 10-50 MB buckets work well.
Gradient Ready Order¶
DDP uses hooks to detect when gradients are ready:
class DDPGradientHook:
def __init__(self, bucket_manager):
self.bucket_manager = bucket_manager
def __call__(self, grad):
# Called when gradient is computed
self.bucket_manager.add_gradient(grad)
if self.bucket_manager.bucket_ready():
# Launch async AllReduce
self.bucket_manager.flush_bucket()
return grad
The communication for layer \(L\) overlaps with backward computation of layers \(1, ..., L-1\).
Theoretical Overlap Efficiency¶
Let \(f_i\) be the fraction of compute time for layer \(i\)'s backward pass.
Achievable overlap:
where \(T_{\text{compute},<i}\) is compute time for layers before \(i\).
In practice, 70-90% overlap is achievable.
When Associativity Fails¶
Floating-Point Non-Associativity¶
IEEE 754 floating-point addition is not associative:
Example:
import numpy as np
a = np.float32(1e20)
b = np.float32(-1e20)
c = np.float32(1.0)
print((a + b) + c) # = 1.0
print(a + (b + c)) # = 0.0 (c absorbed into b)
Reduction Order Matters¶
Different AllReduce implementations use different reduction orders:
Ring AllReduce:
Tree AllReduce (binary tree):
These give slightly different results for the same inputs!
Sources of Non-Determinism¶
| Source | Cause | Mitigation |
|---|---|---|
| AllReduce order | Ring direction varies | Fix ring order |
| Tree reduction | Different groupings | Use consistent tree |
| Async completion | Race conditions | Synchronous reduction |
| Fused operations | Different algorithms | Disable fusion |
| Hardware variations | Different GPUs | Homogeneous cluster |
Achieving Determinism¶
import torch
def enable_deterministic_training():
"""Enable deterministic training for reproducibility."""
# Set seeds
torch.manual_seed(42)
torch.cuda.manual_seed_all(42)
# Enable deterministic algorithms
torch.use_deterministic_algorithms(True)
# Disable auto-tuning (chooses different algorithms)
torch.backends.cudnn.benchmark = False
# Set deterministic NCCL reduction order (still not a strict bitwise guarantee)
os.environ["NCCL_ALGO"] = "Ring"
os.environ["NCCL_PROTO"] = "Simple"
Even with these settings, NCCL can still vary channel selection or topology; strict bitwise determinism often requires fixed topology and limited channels.
Cost of determinism: Often 10-20% slower due to disabled optimizations.
Higher Precision Accumulation¶
For gradient accumulation, use higher precision:
class HighPrecisionAccumulator:
"""Accumulate gradients in FP32 even if model is FP16."""
def __init__(self, model):
self.fp32_grads = {
name: torch.zeros_like(p, dtype=torch.float32)
for name, p in model.named_parameters()
}
def accumulate(self, model):
for name, p in model.named_parameters():
if p.grad is not None:
self.fp32_grads[name] += p.grad.float()
def get_and_reset(self, model):
for name, p in model.named_parameters():
p.grad = self.fp32_grads[name].to(p.dtype)
self.fp32_grads[name].zero_()
Gradient Compression (Preview)¶
Gradients are high-dimensional but often compressible. We can trade computation for communication bandwidth. Key approaches include:
- Sparsification (Top-K, random): Send only the largest gradient elements
- Quantization (1-bit SGD, QSGD): Reduce precision of communicated gradients
- Low-rank (PowerSGD): Approximate gradient matrices with low-rank factors
These techniques can compress communication by 10-100× while preserving convergence when combined with error feedback.
Full treatment in Part VII
Chapter 28 (Gradient Compression) covers these methods in depth with convergence proofs, unbiasedness analysis, error feedback theory, and practical implementation guidance.
Gradient Accumulation¶
When Memory Limits Batch Size¶
If local batch \(b\) doesn't fit in memory, accumulate over micro-batches:
def train_with_accumulation(model, batch, accumulation_steps, optimizer):
"""Gradient accumulation for effective large batches."""
optimizer.zero_grad()
# Split batch into micro-batches
micro_batches = batch.chunk(accumulation_steps)
for micro_batch in micro_batches:
# Forward and backward (gradients accumulate)
loss = model(micro_batch) / accumulation_steps
loss.backward()
# Now perform AllReduce and update
# (DDP would AllReduce here automatically)
optimizer.step()
Mathematical Equivalence¶
With accumulation over \(A\) micro-batches of size \(b/A\):
This equals \(\nabla L_B\) by the partitioning theorem.
Communication Pattern¶
Without accumulation: AllReduce every micro-batch With accumulation: AllReduce every \(A\) micro-batches
Speedup from reduced communication:
where \(T_c\) is compute time per micro-batch and \(T_r\) is the AllReduce time for one gradient synchronization.
For \(T_r = T_c\): speedup = \((2A)/(A+1)\) → 2× as \(A \to \infty\).
Scaling Behavior¶
Weak Scaling¶
Keep local batch size \(b\) constant, add more GPUs:
- Total batch size: \(B = P \cdot b\)
- Compute per GPU: constant
- Communication: increases with \(P\) (latency term)
Efficiency:
where \(T_r(P) = 2(P-1)\alpha + 2\frac{P-1}{P} \cdot \Psi/\beta\).
Strong Scaling¶
Keep total batch size \(B\) constant, add more GPUs:
- Local batch size: \(b = B/P\)
- Compute per GPU: decreases as \(1/P\)
- Communication: approximately constant (bandwidth term dominates)
Efficiency:
As \(P\) increases, \(T_c/P\) becomes small compared to \(T_r\), and efficiency drops.
Practical Limits¶
These limits are order-of-magnitude and depend strongly on hardware, model architecture, and batch size.
| Model Size | Typical P Limit (90% efficiency) |
|---|---|
| 1B params | ~256 GPUs |
| 10B params | ~128 GPUs |
| 100B params | ~64 GPUs |
| 1T params | ~16 GPUs |
Beyond these limits, data parallelism alone becomes communication-bound.
Asynchronous Data Parallelism¶
The Synchronization Bottleneck¶
Synchronous DP waits for the slowest worker ("straggler problem"):
Variance in \(T_i\) reduces efficiency.
Asynchronous SGD¶
Workers don't wait for each other:
Worker 1: [compute][push grad][pull params][compute]...
Worker 2: [compute][push][pull][compute][push]...
Worker 3: [compute][push][pull][compute]...
← Staleness →
Staleness: Worker applies update to params \(\theta^{(t-\tau)}\) where \(\tau\) is delay.
Staleness-Adjusted Updates¶
To compensate for staleness:
where \(f(\tau)\) is a staleness penalty:
- \(f(\tau) = 1\): ignore staleness (often diverges)
- \(f(\tau) = 1/\tau\): inverse scaling
- \(f(\tau) = e^{-\lambda\tau}\): exponential decay
Trade-offs¶
| Aspect | Synchronous | Asynchronous |
|---|---|---|
| Correctness | Exact | Approximate |
| Staleness | 0 | Variable |
| Straggler handling | Poor | Good |
| Convergence | Faster per step | More steps needed |
| Implementation | Simple | Complex |
| Debugging | Easy | Hard |
Trend: Synchronous is dominant in modern large-scale training due to simpler convergence guarantees.
Implementation: PyTorch DDP¶
Architecture¶
DDP Module
├── Forward Hook: Sync forward (no-op for most cases)
├── Backward Hooks: Register gradient ready callbacks
├── Bucket Manager: Group gradients for AllReduce
├── Reducer: Execute AllReduce operations
└── Comm Hook: Customizable communication (compression, etc.)
Custom Communication Hooks¶
import torch.distributed.algorithms.ddp_comm_hooks.default_hooks as default
def compression_hook(state, bucket):
"""Custom hook for gradient compression."""
tensor = bucket.buffer()
# Compress
compressed, ctx = my_compressor.compress(tensor)
# AllReduce compressed gradient
fut = dist.all_reduce(compressed, async_op=True).get_future()
def decompress(fut):
synced = fut.value()[0]
return my_compressor.decompress(synced, ctx)
return fut.then(decompress)
# Register hook
model = DDP(model)
model.register_comm_hook(state=None, hook=compression_hook)
Gradient Synchronization Points¶
DDP synchronizes at these points:
1. First forward (if find_unused_parameters=True)
2. After backward (main gradient sync)
# For models with unused parameters
model = DDP(
model,
device_ids=[rank],
find_unused_parameters=True, # Extra overhead
)
Exercises¶
- Partitioning proof: Extend the partitioning theorem to unequal partition sizes. If \(|B_i| = w_i \cdot |B|\) where \(\sum_i w_i = 1\), what is the correct averaging formula?
Solution
Extended Partitioning Theorem for Unequal Sizes
Given batch \(B\) partitioned into \(P\) disjoint subsets \(B_1, B_2, \ldots, B_P\) where \(|B_i| = w_i \cdot |B|\) and \(\sum_{i=1}^P w_i = 1\).
Claim: $\(\nabla_\theta L_B(\theta) = \sum_{i=1}^{P} w_i \cdot \nabla_\theta L_{B_i}(\theta)\)$
Proof:
Starting from the definition:
Since \(B = \bigcup_{i=1}^{P} B_i\) with disjoint \(B_i\):
Multiplying and dividing by \(|B_i|\):
Substituting \(|B_i| = w_i \cdot |B|\):
Practical implication:
When GPUs have unequal batch sizes (e.g., due to load balancing), the AllReduce should compute a weighted average, not a simple average:
# Weighted gradient averaging
local_batch_size = len(local_batch)
total_batch_size = dist.all_reduce(
torch.tensor(local_batch_size), op=dist.ReduceOp.SUM
)
weight = local_batch_size / total_batch_size
for param in model.parameters():
param.grad *= weight
dist.all_reduce(param.grad, op=dist.ReduceOp.SUM)
Special case: When all \(w_i = 1/P\), we recover the original theorem with simple averaging.
- Compute-communication balance: For a 7B parameter model on 8 H100s with NVLink (900 GB/s), what local batch size achieves \(R = 2\) (compute 2× communication)?
Solution
Given:
- \(\Psi = 7 \times 10^9\) parameters
- \(P = 8\) GPUs
- \(\beta = 900\) GB/s (NVLink bandwidth)
- \(F = 989 \times 10^{12}\) FLOP/s (H100 dense FP16/BF16 peak)
- FP16 training: sizeof(dtype) = 2 bytes
- Target: \(R = 2\)
Compute-Communication Ratio Formula (from chapter):
Solving for batch size \(b\):
Verification:
Communication time: $\(T_{\text{comm}} = 2 \times \frac{P-1}{P} \times \frac{\Psi \times 2}{\beta}\)$
Compute time: $\(T_{\text{compute}} = \frac{6\Psi \times b}{F}\)$
Ratio check: $\(R = \frac{62.2}{27.2} = 2.29 \approx 2 \checkmark\)$
Practical considerations:
| Sequence Length | Samples (b=1465) | Tokens per GPU |
|---|---|---|
| 512 | 1,465 | 750K |
| 2048 | 1,465 | 3.0M |
| 4096 | 1,465 | 6.0M |
With 4096 sequence length, this requires ~12M tokens per GPU per step—likely too large for memory. Gradient accumulation would be needed.
- Overlap analysis: A model has 100 layers of equal size. If AllReduce for all gradients takes 100ms and backward for all layers takes 80ms, what fraction of communication can be overlapped? What is the effective step time?
Solution
Given:
- \(L = 100\) layers of equal size
- \(T_{\text{AR}}^{\text{total}} = 100\) ms (total AllReduce time)
- \(T_{\text{bwd}}^{\text{total}} = 80\) ms (total backward time)
- All layers equal → per-layer times are uniform
Per-layer times:
Overlap analysis:
With bucket-based overlap, layer \(i\)'s AllReduce can overlap with layers \(1, \ldots, i-1\)'s backward computation.
Layer 100: [bwd 0.8ms][ AR 1ms ]
Layer 99: [bwd 0.8ms][ AR 1ms ]
Layer 98: [bwd 0.8ms][ AR 1ms ]
...
For each layer except the first: - AllReduce (1 ms) can potentially overlap with backward of earlier layers
Since backward is faster than AllReduce (0.8 ms < 1 ms), AllReduce is the bottleneck.
Critical path analysis:
- Layer 100: starts at t=0, backward finishes at t=0.8ms, AR finishes at t=1.8ms
- Layer 99: backward can start at t=0.8ms (after layer 100 backward), finishes at t=1.6ms
- But AR for layer 100 is still running until t=1.8ms
- Layer 99 AR starts at t=1.8ms, finishes at t=2.8ms
- This continues with AR being the bottleneck
Actually, the overlap is limited by the relative speeds.
Let's compute more carefully:
Total backward time without overlap: 80 ms
Total AR time without overlap: 100 ms
With perfect pipelining: - First layer backward: 0.8 ms - Then 99 AllReduces in sequence: 99 × 1 ms = 99 ms (partially overlapped) - Last AllReduce finishes after last backward
The overlappable portion is the backward time of layers that can run concurrently with AllReduce:
Since total AR time is 100 ms and backward (after first layer) is 79.2 ms:
Fraction overlapped: $\(\text{Overlap fraction} = \frac{79.2}{100} = \boxed{79.2\%}\)$
Effective step time:
Since AR (100 ms) > remaining backward (79.2 ms):
Comparison:
| Scenario | Step Time |
|---|---|
| No overlap | 80 + 100 = 180 ms |
| With overlap | 100.8 ms |
| Speedup | 1.79× |
Key insight: When AR is slower than backward, the step time approaches the AR time. The system is communication-bound.
- Compression analysis: Top-1% sparsification compresses gradients 100×. If the original AllReduce takes 50ms, what is the new time? Account for compression/decompression compute (assume 5ms each).
Solution
Given:
- Original AllReduce time: \(T_{\text{AR}}^{\text{orig}} = 50\) ms
- Compression ratio: 100× (Top-1% sparsification)
- Compression compute: \(T_{\text{compress}} = 5\) ms
- Decompression compute: \(T_{\text{decompress}} = 5\) ms
Analysis:
Original communication is bandwidth-dominated (large gradients): $\(T_{\text{AR}}^{\text{orig}} \approx \frac{2(P-1)}{P} \cdot \frac{n}{\beta} \approx \frac{2n}{\beta} = 50 \text{ ms}\)$
Compressed communication: $\(T_{\text{AR}}^{\text{compressed}} \approx \frac{2n/100}{\beta} = \frac{50}{100} = 0.5 \text{ ms}\)$
However, sparse AllReduce is more complex:
Sparse tensors require AllGather of indices + values, not standard AllReduce:
- Indices: \(0.01n\) elements × 4 bytes (int32) = \(0.04n\) bytes
- Values: \(0.01n\) elements × 2 bytes (FP16) = \(0.02n\) bytes
- Total: \(0.06n\) bytes (vs original \(2n\) bytes)
Effective compression: \(2n / 0.06n = 33×\) (not 100×!)
Revised compressed communication: $\(T_{\text{AR}}^{\text{compressed}} = \frac{50}{33} \approx 1.5 \text{ ms}\)$
Total time with compression:
Speedup: $\(\text{Speedup} = \frac{T_{\text{orig}}}{T_{\text{total}}} = \frac{50}{11.5} = \boxed{4.3\times}\)$
Summary:
| Component | Time |
|---|---|
| Compression | 5 ms |
| Sparse AllReduce | 1.5 ms |
| Decompression | 5 ms |
| Total | 11.5 ms |
Key insights:
- Compression overhead matters: 10 ms of compute overhead is significant
- Index overhead reduces effective compression: 100× sparsity ≠ 100× bandwidth reduction
- Still valuable: 4.3× speedup is substantial for communication-bound training
- Break-even analysis: Compression only helps if \(T_{\text{compress}} + T_{\text{decompress}} < T_{\text{AR}}^{\text{orig}} - T_{\text{AR}}^{\text{compressed}}\)
- Accumulation trade-off: Training with local batch 32 takes 100ms compute and 40ms AllReduce per micro-batch. If you accumulate over 4 micro-batches, what is the time per effective step? What if memory allowed batch 128 directly (single forward-backward)?
Solution
Given:
- Micro-batch size: 32
- \(T_{\text{compute}}^{\text{micro}} = 100\) ms per micro-batch
- \(T_{\text{AR}} = 40\) ms per AllReduce
- Accumulation steps: \(A = 4\)
- Effective batch size: \(32 \times 4 = 128\)
Case 1: Gradient Accumulation (4 micro-batches)
With accumulation, we compute 4 micro-batches but AllReduce only once:
Case 2: Direct Batch 128 (no accumulation)
Compute time scales linearly with batch size:
Total:
Comparison:
| Method | Compute | AllReduce | Total |
|---|---|---|---|
| Accumulation (4×32) | 4 × 100 = 400 ms | 40 ms | 440 ms |
| Direct (128) | 400 ms | 40 ms | 440 ms |
Why are they equal?
Gradient accumulation doesn't add overhead—it just splits the compute into multiple passes. The AllReduce only happens once in both cases.
When does direct batching win?
If there's overlap between compute and communication:
With overlap (direct): $\(T_{\text{direct}}^{\text{overlap}} = \max(T_{\text{compute}}, T_{\text{AR}}) = \max(400, 40) = 400 \text{ ms}\)$
With overlap (accumulation): - Can't overlap micro-batch 1-3 AR (we don't do AR) - Can only overlap final AR with... nothing (compute is done) $\(T_{\text{accum}}^{\text{overlap}} = 4 \times 100 + 40 = 440 \text{ ms}\)$
Speedup from direct batching with overlap: $\(\frac{440}{400} = 1.1\times\)$
Key insight: Direct large batches enable better compute-communication overlap than gradient accumulation.
- Staleness bound: In asynchronous SGD with \(P = 16\) workers and exponential staleness penalty \(f(\tau) = e^{-0.1\tau}\), if maximum staleness is \(\tau = 15\), what is the effective learning rate scaling?
Solution
Given:
- \(P = 16\) workers
- Staleness penalty: \(f(\tau) = e^{-0.1\tau}\)
- Maximum staleness: \(\tau_{\max} = 15\)
Staleness-adjusted update rule:
The effective learning rate is: \(\eta_{\text{eff}} = \eta \cdot f(\tau)\)
Staleness distribution analysis:
In asynchronous SGD with \(P\) workers, staleness \(\tau\) ranges from 0 to \(P-1\) in the worst case (when one worker is always ahead).
With \(\tau_{\max} = 15\) (close to \(P - 1 = 15\)), we have maximum asynchrony.
Effective learning rate at different staleness levels:
| Staleness \(\tau\) | \(f(\tau) = e^{-0.1\tau}\) | \(\eta_{\text{eff}}/\eta\) |
|---|---|---|
| 0 | \(e^0 = 1.000\) | 100% |
| 5 | \(e^{-0.5} = 0.607\) | 60.7% |
| 10 | \(e^{-1.0} = 0.368\) | 36.8% |
| 15 | \(e^{-1.5} = 0.223\) | 22.3% |
At maximum staleness (\(\tau = 15\)):
The learning rate is scaled down to 22.3% of the base rate.
Average effective learning rate:
Assuming uniform staleness distribution from 0 to \(\tau_{\max}\):
This is a geometric series:
Average effective learning rate: \(\boxed{52.5\%}\) of base rate.
Practical implications:
| Aspect | Value |
|---|---|
| Minimum scaling (fresh gradients) | 100% |
| Maximum scaling (stalest gradients) | 22.3% |
| Average scaling | 52.5% |
| Effective throughput loss | ~47.5% |
To match synchronous SGD convergence speed, you would need to increase the base learning rate by approximately \(1/0.525 \approx 1.9\times\), but this may cause instability with fresh gradients.
- Non-determinism quantification: Two reduction orders \(((g_0 + g_1) + g_2)\) and \((g_0 + (g_1 + g_2))\) differ by at most \(\epsilon_{\text{machine}} \cdot |g_0 + g_1 + g_2|\). For a 1B parameter model with FP16 gradients (\(\epsilon \approx 10^{-3}\)), estimate the maximum gradient difference.
Solution
Given:
- Model parameters: \(\Psi = 1 \times 10^9\)
- Data type: FP16
- Machine epsilon: \(\epsilon \approx 10^{-3}\) (actually \(\epsilon_{\text{FP16}} = 2^{-10} \approx 9.77 \times 10^{-4}\))
- Error bound per reduction: \(\epsilon \cdot |g_0 + g_1 + g_2|\)
Single reduction error:
For a single floating-point addition of two numbers \(a\) and \(b\):
The absolute error is:
Error accumulation in AllReduce:
For \(P\) GPUs using ring AllReduce, we perform \(P-1\) additions per parameter:
Each step accumulates error. Using standard error analysis:
Per-parameter error estimate:
Assuming typical gradient magnitude \(|g_i| \approx 10^{-4}\) (common for normalized training):
- Sum of \(P = 8\) gradients: \(|g_{\text{sum}}| \approx 8 \times 10^{-4}\) (if aligned)
- Per-parameter error: \(\leq 7 \times 10^{-3} \times 8 \times 10^{-4} = 5.6 \times 10^{-6}\)
Different reduction orders:
Two orderings can differ by at most:
(Factor of 2 because each ordering can err in opposite directions)
Total model-wide error:
For 1B parameters, the aggregate gradient difference:
L2 norm of differences:
If errors are independent across parameters with variance \(\sigma^2 = \epsilon^2 \cdot |g|^2\):
With \(|g_{\text{avg}}| \approx 10^{-4}\), \(P = 8\), \(\epsilon = 10^{-3}\):
Per-parameter maximum difference:
Relative error:
Summary:
| Metric | Value |
|---|---|
| Per-parameter max difference | \(1.4 \times 10^{-6}\) |
| Relative error per parameter | ~1.4% |
| L2 norm of difference (1B params) | ~0.022 |
Practical implications:
- Training stability: 1.4% per-step variance accumulates over millions of steps
- Reproducibility: Same code, different GPU count → different results
- Debugging: Bitwise comparison across runs is impossible
Mitigation strategies:
| Strategy | Overhead | Determinism |
|---|---|---|
| Fixed reduction order | ~5% | Bit-exact per config |
| FP32 accumulation | 2× memory | Reduces error ~1000× |
| Kahan summation | 2× compute | Near FP64 accuracy |
| Tree reduction (balanced) | Same | More stable than ring |
Knobs and Trade-offs¶
| Knob | Primary Effect | Cost |
|---|---|---|
| Data-parallel degree (P) | More aggregate compute | More gradient communication and latency sensitivity |
| Local batch size (b) | Higher comm intensity | Memory pressure and optimizer stability |
| Gradient accumulation | Larger effective batch | More step latency, less responsiveness |
| Bucketing size | Better overlap | Higher peak memory and delayed updates |
Key Takeaways¶
-
Associativity enables data parallelism: The partitioning theorem is the mathematical foundation—gradient of sum equals sum of gradients.
-
AllReduce is the communication primitive: \(2(P-1)/P \cdot \Psi\) bytes per step.
-
Compute-communication ratio determines efficiency: Need local batch large enough to amortize communication.
-
Overlap is critical: Bucket-based AllReduce during backward pass achieves 70-90% overlap.
-
Floating-point breaks exact associativity: Determinism requires fixed reduction order and disabled optimizations.
-
Gradient compression trades compute for bandwidth: Top-K, quantization, and low-rank methods can achieve 10-100× compression.
-
Scaling has limits: Beyond ~100 GPUs for large models, data parallelism alone becomes communication-bound.
-
Synchronous dominates: Simpler correctness and convergence make synchronous preferred despite straggler sensitivity.