When running distributed training with torchrun across multiple nodes or processes, all workers can log metrics to the same experiment using a shared run_id. This enables unified tracking of distributed training runs.
How It Works
Pluto supports attaching multiple processes to the same experiment run. When the first process calls pluto.init(), a new run is created. Subsequent processes with the same run_id will attach to that existing run instead of creating new ones.
Setting Up Distributed Logging
1. Set a Shared Run ID
Before launching your distributed job, set the PLUTO_RUN_ID environment variable to a unique identifier:
export PLUTO_RUN_ID="experiment-123"
All processes that share this environment variable will log to the same experiment.
When using konduktor launch , PLUTO_RUN_ID is set to the job name by default so you don’t need to explicitly set it yourself.
2. Initialize Pluto in Your Training Script
In your training script, initialize Pluto after setting up the distributed process group:
import pluto
import torch.distributed as dist
# Initialize distributed training
dist.init_process_group()
rank = dist.get_rank()
# Initialize Pluto - all ranks will attach to the same run
run = pluto.init(project="my-project", name="ddp-training")
# Check if this process resumed an existing run
print(f"Rank {rank}: resumed={run.resumed}, run_id={run.id}")
3. Log Metrics with Rank Prefixes
To distinguish metrics from different processes, prefix them with the rank:
# Log rank-specific metrics
run.log({f"train/loss/rank{rank}": loss})
# Or log only from rank 0 for global metrics
if rank == 0:
run.log({"global/epoch": epoch, "global/learning_rate": lr})
Complete Example
Here’s a full example for a Konduktor task that runs distributed training with Pluto logging:
name: distributed-training-with-logging
resources:
image_id: nvcr.io/nvidia/pytorch:23.10-py3
accelerators: H100:8
cpus: 60
memory: 500
labels:
kueue.x-k8s.io/queue-name: user-queue
num_nodes: 2
run: |
# (optional) Set shared run ID for all processes
export PLUTO_RUN_ID="training-123"
# Launch distributed training
torchrun --nproc_per_node=$NUM_GPUS_PER_NODE \
--nnodes=$NUM_NODES \
--node_rank=$RANK \
--master_addr=$MASTER_ADDR \
--master_port=8008 \
train.py
And the corresponding training script:
# train.py
import os
import pluto
import torch
import torch.distributed as dist
def main():
# Initialize distributed
dist.init_process_group(backend="nccl")
rank = dist.get_rank()
world_size = dist.get_world_size()
local_rank = int(os.environ.get("LOCAL_RANK", 0))
# Set device
torch.cuda.set_device(local_rank)
# Initialize Pluto - all processes attach to same run via PLUTO_RUN_ID
config = {
"world_size": world_size,
"learning_rate": 0.001,
"batch_size": 32,
}
run = pluto.init(
project="distributed-training",
name="ddp-experiment",
config=config if rank == 0 else None, # Only rank 0 sets config
)
# Training loop
for epoch in range(num_epochs):
for batch_idx, (data, target) in enumerate(train_loader):
# ... training step ...
loss = train_step(model, data, target)
# Log loss from each rank
run.log({f"train/loss/rank{rank}": loss.item()})
# Log epoch-level metrics from rank 0
if rank == 0:
run.log({"train/epoch": epoch})
run.finish()
dist.destroy_process_group()
if __name__ == "__main__":
main()
Run Properties
The run object provides useful properties for distributed scenarios:
| Property | Type | Description |
|---|
run.resumed | bool | True if this process attached to an existing run |
run.run_id | str | The user-provided external run ID |
run.id | int | The server-assigned numeric run ID |
Environment Variables
Pluto recognizes the following environment variables for distributed logging:
| Variable | Description |
|---|
PLUTO_RUN_ID | Primary environment variable for shared run identification |
MLOP_RUN_ID | Fallback environment variable (for compatibility) |
Best Practices
- Set run_id before launching: Ensure
PLUTO_RUN_ID is set before calling torchrun so all processes inherit the same value.
- Use rank prefixes: Prefix metrics with the rank to distinguish data from different processes in the dashboard.
- Log config from rank 0 only: Pass
config only from rank 0 to avoid duplicate metadata.
- Unique run IDs: Include a timestamp or UUID in your run ID to ensure each training run is distinct.
Handle the name parameter: The name parameter is only used when creating a new run. Processes that resume an existing run will ignore this parameter (a warning is logged to indicate this).