Utilitiesο
This section documents the utility functions and classes in the VERL library.
Python Functional Utilitiesο
Contain small python utility functions
- verl.utils.py_functional.append_to_dict(data: Dict, new_data: Dict)ο
Append values from new_data to lists in data.
For each key in new_data, this function appends the corresponding value to a list stored under the same key in data. If the key doesnβt exist in data, a new list is created.
- Parameters:
data (Dict) β The target dictionary containing lists as values.
new_data (Dict) β The source dictionary with values to append.
- Returns:
The function modifies data in-place.
- Return type:
None
File System Utilitiesο
File-system agnostic IO APIs
- verl.utils.fs.copy_to_local(src: str, cache_dir=None, filelock='.file.lock', verbose=False, always_recopy=False, use_shm: bool = False) str ο
Copy files/directories from HDFS to local cache with validation.
- Parameters:
src (str) β Source path - HDFS path (hdfs://β¦) or local filesystem path
cache_dir (str, optional) β Local directory for cached files. Uses system tempdir if None
filelock (str) β Base name for file lock. Defaults to β.file.lockβ
verbose (bool) β Enable copy operation logging. Defaults to False
always_recopy (bool) β Force fresh copy ignoring cache. Defaults to False
use_shm (bool) β Enable shared memory copy. Defaults to False
- Returns:
Local filesystem path to copied resource
- Return type:
str
Tracking Utilitiesο
A unified tracking interface that supports logging data to different backend
- class verl.utils.tracking.Tracking(project_name, experiment_name, default_backend: str | List[str] = 'console', config=None)ο
A unified tracking interface for logging experiment data to multiple backends.
This class provides a centralized way to log experiment metrics, parameters, and artifacts to various tracking backends including WandB, MLflow, SwanLab, TensorBoard, and console.
- supported_backendο
List of supported tracking backends.
- loggerο
Dictionary of initialized logger instances for each backend.
Metrics Utilitiesο
- verl.utils.metric.reduce_metrics(metrics: Dict[str, List[Any]]) Dict[str, Any] ο
Reduces a dictionary of metric lists by computing the mean, max, or min of each list. The reduce operation is determined by the key name: - If the key contains βmaxβ, np.max is used - If the key contains βminβ, np.min is used - Otherwise, np.mean is used
- Parameters:
metrics β A dictionary mapping metric names to lists of metric values.
- Returns:
A dictionary with the same keys but with each list replaced by its reduced value.
Example
>>> metrics = { ... "loss": [1.0, 2.0, 3.0], ... "accuracy": [0.8, 0.9, 0.7], ... "max_reward": [5.0, 8.0, 6.0], ... "min_error": [0.1, 0.05, 0.2] ... } >>> reduce_metrics(metrics) {"loss": 2.0, "accuracy": 0.8, "max_reward": 8.0, "min_error": 0.05}
Checkpoint Managementο
- verl.utils.checkpoint.checkpoint_manager.find_latest_ckpt_path(path, directory_format='global_step_{}')ο
Return the most recent checkpoint directory based on a tracker file.
- Parameters:
path (str) β Base directory containing the checkpoint tracker.
directory_format (str) β Template for checkpoint subfolders with one placeholder for the iteration number (default βglobal_step_{}β).
- Returns:
Full path to the latest checkpoint directory, or None if the tracker or checkpoint folder is missing.
- Return type:
str or None
- class verl.utils.checkpoint.fsdp_checkpoint_manager.FSDPCheckpointManager(model: FullyShardedDataParallel, optimizer: Optimizer | None = None, lr_scheduler: LRScheduler | None = None, processing_class: PreTrainedTokenizer | ProcessorMixin = None, checkpoint_contents: DictConfig = None, **kwargs)ο
Manage FSDP checkpointing in SPMD training.
Saves/loads per-rank sharded model & optimizer states
Persists full lr_scheduler and RNG state
Stores HF tokenizer/processor and model/config for unified restore
- Parameters:
model (FSDP) β Wrapped model instance.
optimizer (Optimizer) β Training optimizer.
lr_scheduler (LRScheduler) β Learning-rate scheduler.
processing_class (PreTrainedTokenizer or ProcessorMixin, optional) β Pre-/post-processing artifact handler.
DictConfig (checkpoint_contents) β Configuration for checkpoint contents. - βloadβ: Components to load; must contain βmodelβ. Defaults to [βmodelβ, βoptimizerβ, βextraβ]. - βsaveβ: Components to save; must contain βmodelβ. Defaults to [βmodelβ, βoptimizerβ, βextraβ].
- load_checkpoint(local_path: str, hdfs_path: str = None, del_local_after_load=False)ο
Load an FSDP checkpoint for this rank.
- Downloads and loads:
model and optimizer shards
extra state dict (scheduler + RNG)
- Parameters:
local_path β Directory with per-rank checkpoint files.
hdfs_path β Unused (for API compatibility).
del_local_after_load β Remove local files after loading.
- save_checkpoint(local_path: str, hdfs_path: str = None, global_step: int = 0, max_ckpt_to_keep=None)ο
Save an FSDP checkpoint for this rank.
- Writes:
model & optimizer shard files
extra state dict (scheduler + RNG)
HF tokenizer/processor and model/config on rank 0
optional full HF model under βhuggingface/β if requested
Rotates old checkpoints, keeping at most max_ckpt_to_keep.
- Parameters:
local_path β Target directory for checkpoint files.
hdfs_path β Unused (for API compatibility).
global_step β Current training step (used for bookkeeping).
max_ckpt_to_keep β Number of recent checkpoints to retain.
Dataset Utilitiesο
- class verl.utils.dataset.rl_dataset.RLHFDataset(data_files: str | List[str], tokenizer: PreTrainedTokenizer, config: DictConfig, processor: ProcessorMixin | None = None)ο
Load and preprocess RLHF data from Parquet files.
Caches files locally.
Reads into a HuggingFace Dataset and tokenizes prompts.
Optionally handles images/videos via a ProcessorMixin.
Filters prompts over a max length.
Supports resuming from checkpoints.
- Parameters:
data_files (str or list) β Path(s) to Parquet file(s).
tokenizer (PreTrainedTokenizer) β For the tokenization of text to token IDs.
config (DictConfig) β Options like cache_dir, prompt_key, max_prompt_length, truncation, etc.
processor (ProcessorMixin, optional) β Multimodal preprocessor for images/videos.
- verl.utils.dataset.rl_dataset.collate_fn(data_list: list[dict]) dict ο
Collate a batch of sample dicts into batched tensors and arrays.
- Parameters:
data_list β List of dicts mapping feature names to torch.Tensor or other values.
- Returns:
Dict where tensor entries are stacked into a torch.Tensor of shape (batch_size, *dims) and non-tensor entries are converted to np.ndarray of dtype object with shape (batch_size,).
Torch Functional Utilitiesο
Contain small torch utilities
- verl.utils.torch_functional.get_constant_schedule_with_warmup(optimizer: Optimizer, num_warmup_steps: int, last_epoch: int = -1)ο
Create a constant LR schedule with a linear warmup phase.
- Parameters:
optimizer (Optimizer) β Wrapped optimizer.
num_warmup_steps (int) β Number of steps to ramp up the LR from 0 to initial value.
last_epoch (int, optional) β The index of the last epoch when resuming training. Defaults to -1.
- Returns:
Scheduler that increases LR linearly during warmup, then holds it constant.
- Return type:
LambdaLR
- verl.utils.torch_functional.logprobs_from_logits(logits, labels, inplace_backward=True)ο
Compute per-token log-probabilities for the given labels.
Uses a Flash-Attentionβbased cross-entropy (if available) for efficient backward, otherwise falls back to a standard log-softmax+gather approach.
See: https://github.com/pytorch/pytorch/issues/563#issuecomment-330103591
- Parameters:
logits (Tensor) β Model outputs of shape (β¦, vocab_size).
labels (LongTensor) β True class indices of shape matching logits[β¦, :-1].
inplace_backward (bool) β If True and Flash-Attn is available, perform backward in-place.
- Returns:
Log-probabilities of the target labels, shape logits.shape[:-1].
- Return type:
Tensor
- verl.utils.torch_functional.masked_mean(values, mask, axis=None)ο
Compute the mean of values over elements selected by mask.
- Parameters:
values (Tensor) β Input tensor.
mask (Tensor) β Boolean or numeric mask of the same shape as values.
axis (int or tuple of int, optional) β Dimension(s) along which to compute the mean. Defaults to None (over all elements).
- Returns:
Masked mean, with shape equal to values reduced over axis.
- Return type:
Tensor
- verl.utils.torch_functional.masked_whiten(values, mask, shift_mean=True)ο
Whiten values by normalizing with mean and variance computed over mask.
- Parameters:
values (torch.Tensor) β Input tensor.
mask (torch.Tensor) β Boolean tensor of same shape, selects elements for stats.
shift_mean (bool) β If True (default), output is zero-mean; if False, the original mean is re-added after scaling.
- Returns:
Whitened tensor of same shape as values.
- Return type:
torch.Tensor
Sequence Length Balancingο
- verl.utils.seqlen_balancing.get_reverse_idx(idx_map)ο
Build the inverse of an index mapping.
- Parameters:
idx_map (Sequence[int]) β Sequence where idx_map[i] = j.
- Returns:
Inverse mapping list such that output[j] = i for each i.
- Return type:
List[int]
- verl.utils.seqlen_balancing.rearrange_micro_batches(batch, max_token_len, dp_group=None, num_batches_divided_by=None, same_micro_num_in_dp=True, min_num_micro_batch=None)ο
Split a batch into micro-batches by total token count, with optional DP sync and padding.
- Parameters:
batch (TensorDict) β must include βattention_maskβ (B*S); other fields are sliced similarly.
max_token_len (int) β max sum of attention_mask per micro-batch.
dp_group (optional) β torch.distributed group for data-parallel sync.
num_batches_divided_by (optional) β virtual pipeline parallel size, for megatron.
same_micro_num_in_dp (bool) β if True and dp_group set, pad all ranks to the same count.
min_num_micro_batch (int, optional) β force at least this many splits (pads empty ones).
- Returns:
the micro-batches. List[List[int]]: index lists mapping each micro-batch back to original positions.
- Return type:
List[TensorDict]
Ulysses Utilitiesο
Utilities for DeepSpeed Ulysses Sequence Parallelism. DeepSpeed Ulysses Paper: https://arxiv.org/abs/2309.14509 Inspired from: https://github.com/deepspeedai/DeepSpeed/blob/master/deepspeed/sequence/layer.py
- verl.utils.ulysses.gather_outpus_and_unpad(x: Tensor, gather_dim: int, unpad_dim: int = None, padding_size: int = 0, grad_scaler: bool = True, group: ProcessGroup | None = None)ο
Gather a tensor across a process group and optionally unpad its padded elements.
- Parameters:
x (Tensor) β Input tensor to gather.
gather_dim (int) β Dimension along which to gather across ranks.
unpad_dim (int, optional) β Dimension from which to remove padding. If None, no unpadding.
padding_size (int) β Number of padding elements to remove on unpad_dim. Defaults to 0.
grad_scaler (bool) β Whether to apply gradient scaling during gather. Defaults to True.
group (ProcessGroup, optional) β Process group for gathering. If None, uses get_ulysses_sequence_parallel_group(). If still None, returns x unchanged.
- Returns:
The gathered tensor, with padding removed if requested.
- Return type:
Tensor
- verl.utils.ulysses.ulysses_pad_and_slice_inputs(input_ids_rmpad: Tensor, position_ids_rmpad: Tensor | None = None, sp_size: int = 1)ο
Pad and slice input_ids to be divisible by sp_size Pad position_ids to be divisible by sp_size.
Note both input_ids_rmpad and position_ids_rmpad will be padded and sliced.
The is the utility of pre-forward for ulysses sequence parallelism
- Parameters:
input_ids_rmpad β shape of [bsz, seqlen]
position_ids_rmpad β shape of [bsz, seqlen], where bsz must be 1
sp_size (int) β ulysses sequence parallelism size
- Returns:
padded and sliced input_ids torch.Tensor: padded and sliced position_ids int: pad size
- Return type:
torch.Tensor
FSDP Utilitiesο
- verl.utils.fsdp_utils.get_fsdp_wrap_policy(module, config=None, is_lora=False)ο
Get FSDP wrap policy for the module.
- Parameters:
module β The module to get wrap policy for
config β Configuration for wrap policy
is_lora β Whether to enable lambda policy for LoRA modules
Debug Utilitiesο
- class verl.utils.debug.GPUMemoryLogger(role: str, logger: Logger = None, level=10, log_only_rank_0: bool = True)ο
A decorator class to log GPU memory usage.
Example
>>> from verl.utils.debug.performance import GPUMemoryLogger >>> @GPUMemoryLogger(role="actor") >>> def update_actor(self, batch): ... # real actor update logics ... return