Utilities

This section documents the utility functions and classes in the VERL library.

Python Functional Utilities

Contain small python utility functions

verl.utils.py_functional.append_to_dict(data: Dict, new_data: Dict)

Append values from new_data to lists in data.

For each key in new_data, this function appends the corresponding value to a list stored under the same key in data. If the key doesn’t exist in data, a new list is created.

Parameters:
  • data (Dict) – The target dictionary containing lists as values.

  • new_data (Dict) – The source dictionary with values to append.

Returns:

The function modifies data in-place.

Return type:

None

File System Utilities

File-system agnostic IO APIs

verl.utils.fs.copy_to_local(src: str, cache_dir=None, filelock='.file.lock', verbose=False, always_recopy=False, use_shm: bool = False) str

Copy files/directories from HDFS to local cache with validation.

Parameters:
  • src (str) – Source path - HDFS path (hdfs://…) or local filesystem path

  • cache_dir (str, optional) – Local directory for cached files. Uses system tempdir if None

  • filelock (str) – Base name for file lock. Defaults to β€œ.file.lock”

  • verbose (bool) – Enable copy operation logging. Defaults to False

  • always_recopy (bool) – Force fresh copy ignoring cache. Defaults to False

  • use_shm (bool) – Enable shared memory copy. Defaults to False

Returns:

Local filesystem path to copied resource

Return type:

str

Tracking Utilities

A unified tracking interface that supports logging data to different backend

class verl.utils.tracking.Tracking(project_name, experiment_name, default_backend: str | List[str] = 'console', config=None)

A unified tracking interface for logging experiment data to multiple backends.

This class provides a centralized way to log experiment metrics, parameters, and artifacts to various tracking backends including WandB, MLflow, SwanLab, TensorBoard, and console.

supported_backend

List of supported tracking backends.

logger

Dictionary of initialized logger instances for each backend.

Metrics Utilities

verl.utils.metric.reduce_metrics(metrics: Dict[str, List[Any]]) Dict[str, Any]

Reduces a dictionary of metric lists by computing the mean, max, or min of each list. The reduce operation is determined by the key name: - If the key contains β€œmax”, np.max is used - If the key contains β€œmin”, np.min is used - Otherwise, np.mean is used

Parameters:

metrics – A dictionary mapping metric names to lists of metric values.

Returns:

A dictionary with the same keys but with each list replaced by its reduced value.

Example

>>> metrics = {
...     "loss": [1.0, 2.0, 3.0],
...     "accuracy": [0.8, 0.9, 0.7],
...     "max_reward": [5.0, 8.0, 6.0],
...     "min_error": [0.1, 0.05, 0.2]
... }
>>> reduce_metrics(metrics)
{"loss": 2.0, "accuracy": 0.8, "max_reward": 8.0, "min_error": 0.05}

Checkpoint Management

verl.utils.checkpoint.checkpoint_manager.find_latest_ckpt_path(path, directory_format='global_step_{}')

Return the most recent checkpoint directory based on a tracker file.

Parameters:
  • path (str) – Base directory containing the checkpoint tracker.

  • directory_format (str) – Template for checkpoint subfolders with one placeholder for the iteration number (default β€œglobal_step_{}”).

Returns:

Full path to the latest checkpoint directory, or None if the tracker or checkpoint folder is missing.

Return type:

str or None

class verl.utils.checkpoint.fsdp_checkpoint_manager.FSDPCheckpointManager(model: FullyShardedDataParallel, optimizer: Optimizer | None = None, lr_scheduler: LRScheduler | None = None, processing_class: PreTrainedTokenizer | ProcessorMixin = None, checkpoint_contents: DictConfig = None, **kwargs)

Manage FSDP checkpointing in SPMD training.

  • Saves/loads per-rank sharded model & optimizer states

  • Persists full lr_scheduler and RNG state

  • Stores HF tokenizer/processor and model/config for unified restore

Parameters:
  • model (FSDP) – Wrapped model instance.

  • optimizer (Optimizer) – Training optimizer.

  • lr_scheduler (LRScheduler) – Learning-rate scheduler.

  • processing_class (PreTrainedTokenizer or ProcessorMixin, optional) – Pre-/post-processing artifact handler.

  • DictConfig (checkpoint_contents) – Configuration for checkpoint contents. - β€˜load’: Components to load; must contain β€˜model’. Defaults to [β€˜model’, β€˜optimizer’, β€˜extra’]. - β€˜save’: Components to save; must contain β€˜model’. Defaults to [β€˜model’, β€˜optimizer’, β€˜extra’].

load_checkpoint(local_path: str, hdfs_path: str = None, del_local_after_load=False)

Load an FSDP checkpoint for this rank.

Downloads and loads:
  • model and optimizer shards

  • extra state dict (scheduler + RNG)

Parameters:
  • local_path – Directory with per-rank checkpoint files.

  • hdfs_path – Unused (for API compatibility).

  • del_local_after_load – Remove local files after loading.

save_checkpoint(local_path: str, hdfs_path: str = None, global_step: int = 0, max_ckpt_to_keep=None)

Save an FSDP checkpoint for this rank.

Writes:
  • model & optimizer shard files

  • extra state dict (scheduler + RNG)

  • HF tokenizer/processor and model/config on rank 0

  • optional full HF model under β€˜huggingface/’ if requested

Rotates old checkpoints, keeping at most max_ckpt_to_keep.

Parameters:
  • local_path – Target directory for checkpoint files.

  • hdfs_path – Unused (for API compatibility).

  • global_step – Current training step (used for bookkeeping).

  • max_ckpt_to_keep – Number of recent checkpoints to retain.

Dataset Utilities

class verl.utils.dataset.rl_dataset.RLHFDataset(data_files: str | List[str], tokenizer: PreTrainedTokenizer, config: DictConfig, processor: ProcessorMixin | None = None)

Load and preprocess RLHF data from Parquet files.

  • Caches files locally.

  • Reads into a HuggingFace Dataset and tokenizes prompts.

  • Optionally handles images/videos via a ProcessorMixin.

  • Filters prompts over a max length.

  • Supports resuming from checkpoints.

Parameters:
  • data_files (str or list) – Path(s) to Parquet file(s).

  • tokenizer (PreTrainedTokenizer) – For the tokenization of text to token IDs.

  • config (DictConfig) – Options like cache_dir, prompt_key, max_prompt_length, truncation, etc.

  • processor (ProcessorMixin, optional) – Multimodal preprocessor for images/videos.

verl.utils.dataset.rl_dataset.collate_fn(data_list: list[dict]) dict

Collate a batch of sample dicts into batched tensors and arrays.

Parameters:

data_list – List of dicts mapping feature names to torch.Tensor or other values.

Returns:

Dict where tensor entries are stacked into a torch.Tensor of shape (batch_size, *dims) and non-tensor entries are converted to np.ndarray of dtype object with shape (batch_size,).

Torch Functional Utilities

Contain small torch utilities

verl.utils.torch_functional.get_constant_schedule_with_warmup(optimizer: Optimizer, num_warmup_steps: int, last_epoch: int = -1)

Create a constant LR schedule with a linear warmup phase.

Parameters:
  • optimizer (Optimizer) – Wrapped optimizer.

  • num_warmup_steps (int) – Number of steps to ramp up the LR from 0 to initial value.

  • last_epoch (int, optional) – The index of the last epoch when resuming training. Defaults to -1.

Returns:

Scheduler that increases LR linearly during warmup, then holds it constant.

Return type:

LambdaLR

verl.utils.torch_functional.logprobs_from_logits(logits, labels, inplace_backward=True)

Compute per-token log-probabilities for the given labels.

Uses a Flash-Attention–based cross-entropy (if available) for efficient backward, otherwise falls back to a standard log-softmax+gather approach.

See: https://github.com/pytorch/pytorch/issues/563#issuecomment-330103591

Parameters:
  • logits (Tensor) – Model outputs of shape (…, vocab_size).

  • labels (LongTensor) – True class indices of shape matching logits[…, :-1].

  • inplace_backward (bool) – If True and Flash-Attn is available, perform backward in-place.

Returns:

Log-probabilities of the target labels, shape logits.shape[:-1].

Return type:

Tensor

verl.utils.torch_functional.masked_mean(values, mask, axis=None)

Compute the mean of values over elements selected by mask.

Parameters:
  • values (Tensor) – Input tensor.

  • mask (Tensor) – Boolean or numeric mask of the same shape as values.

  • axis (int or tuple of int, optional) – Dimension(s) along which to compute the mean. Defaults to None (over all elements).

Returns:

Masked mean, with shape equal to values reduced over axis.

Return type:

Tensor

verl.utils.torch_functional.masked_whiten(values, mask, shift_mean=True)

Whiten values by normalizing with mean and variance computed over mask.

Parameters:
  • values (torch.Tensor) – Input tensor.

  • mask (torch.Tensor) – Boolean tensor of same shape, selects elements for stats.

  • shift_mean (bool) – If True (default), output is zero-mean; if False, the original mean is re-added after scaling.

Returns:

Whitened tensor of same shape as values.

Return type:

torch.Tensor

Sequence Length Balancing

verl.utils.seqlen_balancing.get_reverse_idx(idx_map)

Build the inverse of an index mapping.

Parameters:

idx_map (Sequence[int]) – Sequence where idx_map[i] = j.

Returns:

Inverse mapping list such that output[j] = i for each i.

Return type:

List[int]

verl.utils.seqlen_balancing.rearrange_micro_batches(batch, max_token_len, dp_group=None, num_batches_divided_by=None, same_micro_num_in_dp=True, min_num_micro_batch=None)

Split a batch into micro-batches by total token count, with optional DP sync and padding.

Parameters:
  • batch (TensorDict) – must include β€œattention_mask” (B*S); other fields are sliced similarly.

  • max_token_len (int) – max sum of attention_mask per micro-batch.

  • dp_group (optional) – torch.distributed group for data-parallel sync.

  • num_batches_divided_by (optional) – virtual pipeline parallel size, for megatron.

  • same_micro_num_in_dp (bool) – if True and dp_group set, pad all ranks to the same count.

  • min_num_micro_batch (int, optional) – force at least this many splits (pads empty ones).

Returns:

the micro-batches. List[List[int]]: index lists mapping each micro-batch back to original positions.

Return type:

List[TensorDict]

Ulysses Utilities

Utilities for DeepSpeed Ulysses Sequence Parallelism. DeepSpeed Ulysses Paper: https://arxiv.org/abs/2309.14509 Inspired from: https://github.com/deepspeedai/DeepSpeed/blob/master/deepspeed/sequence/layer.py

verl.utils.ulysses.gather_outpus_and_unpad(x: Tensor, gather_dim: int, unpad_dim: int = None, padding_size: int = 0, grad_scaler: bool = True, group: ProcessGroup | None = None)

Gather a tensor across a process group and optionally unpad its padded elements.

Parameters:
  • x (Tensor) – Input tensor to gather.

  • gather_dim (int) – Dimension along which to gather across ranks.

  • unpad_dim (int, optional) – Dimension from which to remove padding. If None, no unpadding.

  • padding_size (int) – Number of padding elements to remove on unpad_dim. Defaults to 0.

  • grad_scaler (bool) – Whether to apply gradient scaling during gather. Defaults to True.

  • group (ProcessGroup, optional) – Process group for gathering. If None, uses get_ulysses_sequence_parallel_group(). If still None, returns x unchanged.

Returns:

The gathered tensor, with padding removed if requested.

Return type:

Tensor

verl.utils.ulysses.ulysses_pad_and_slice_inputs(input_ids_rmpad: Tensor, position_ids_rmpad: Tensor | None = None, sp_size: int = 1)

Pad and slice input_ids to be divisible by sp_size Pad position_ids to be divisible by sp_size.

Note both input_ids_rmpad and position_ids_rmpad will be padded and sliced.

The is the utility of pre-forward for ulysses sequence parallelism

Parameters:
  • input_ids_rmpad – shape of [bsz, seqlen]

  • position_ids_rmpad – shape of [bsz, seqlen], where bsz must be 1

  • sp_size (int) – ulysses sequence parallelism size

Returns:

padded and sliced input_ids torch.Tensor: padded and sliced position_ids int: pad size

Return type:

torch.Tensor

FSDP Utilities

verl.utils.fsdp_utils.get_fsdp_wrap_policy(module, config=None, is_lora=False)

Get FSDP wrap policy for the module.

Parameters:
  • module – The module to get wrap policy for

  • config – Configuration for wrap policy

  • is_lora – Whether to enable lambda policy for LoRA modules

Debug Utilities

class verl.utils.debug.GPUMemoryLogger(role: str, logger: Logger = None, level=10, log_only_rank_0: bool = True)

A decorator class to log GPU memory usage.

Example

>>> from verl.utils.debug.performance import GPUMemoryLogger
>>> @GPUMemoryLogger(role="actor")
>>> def update_actor(self, batch):
...     # real actor update logics
...     return