Interaction System for Multi-turn RL Training

Overview

The verl interaction system enables dynamic, multi-turn conversational feedback during reinforcement learning training. This system allows models to engage in iterative problem-solving scenarios where an interaction agent can provide corrective feedback, guidance, or evaluation based on the model’s responses.

Key features:

  • Async-based Architecture: Non-blocking interaction processing for distributed training

  • Instance Management: Stateful session handling with unique instance IDs for concurrent interactions

  • SGLang Integration: Seamless integration with SGLang rollout system for multi-turn conversations

  • Configuration-driven: Dynamic agent loading via YAML configuration files

  • Reward Integration: Turn-level scoring mechanism integrated with verl’s reward system

Architecture

The interaction system follows a plugin-based architecture with clear separation of concerns:

BaseInteraction (Abstract Interface)
     ↓
Gsm8kInteraction (Concrete Implementation)
     ↓
SGLang Rollout Integration
     ↓
Async Request Lifecycle Management

Core Components

BaseInteraction Interface

All interaction agents must implement the BaseInteraction abstract class:

from verl.interactions.base import BaseInteraction
from typing import Dict, Any, List, Tuple, Optional

class BaseInteraction:
    async def start_interaction(self, instance_id: Optional[str] = None, **kwargs) -> str:
        """Initialize interaction session, return instance_id"""

    async def generate_response(self, instance_id: str, messages: List[Dict[str, Any]], **kwargs) -> Tuple[bool, str, float, Dict[str, Any]]:
        """Generate response, return (should_terminate, response, score, metadata)"""

    async def calculate_score(self, instance_id: str, **kwargs) -> float:
        """Calculate turn-level score for RL training"""

    async def finalize_interaction(self, instance_id: str, **kwargs) -> None:
        """Clean up resources"""

Request Lifecycle

The interaction system integrates with SGLang’s async rollout via state management:

  1. PENDING → Initialize interaction via start_interaction()

  2. GENERATING → Model generates response

  3. INTERACTING → Process response via generate_response()

  4. GENERATING → Continue if not terminated, otherwise COMPLETED

Configuration

Basic Setup

Enable interaction in your rollout configuration:

actor_rollout_ref:
    rollout:
        multi_turn:
            enable: true
            interaction_config_path: "path/to/interaction_config.yaml"
            max_user_turns: 10
            max_assistant_turns: 10

Interaction Configuration File

Create an interaction configuration file (e.g., gsm8k_interaction_config.yaml):

interaction:
  - class_name: "verl.interactions.gsm8k_interaction.Gsm8kInteraction"
    config: {}

The system will dynamically load the specified interaction class using importlib.

Implementation Example: GSM8K

The GSM8K interaction demonstrates a complete implementation for math problem-solving scenarios:

from verl.interactions.base import BaseInteraction
from verl.utils.reward_score import gsm8k
from uuid import uuid4

class Gsm8kInteraction(BaseInteraction):
    def __init__(self, config: dict):
        super().__init__(config)
        self._instance_dict = {}

    async def start_interaction(self, instance_id=None, ground_truth=None, **kwargs):
        if instance_id is None:
            instance_id = str(uuid4())
        self._instance_dict[instance_id] = {
            "response": "",
            "ground_truth": ground_truth,
            "reward": 0.0,
        }
        return instance_id

    async def generate_response(self, instance_id, messages, **kwargs):
        # Extract last user message content
        content = ""
        for item in reversed(messages):
            if item.get("role") == "user":
                content = item.get("content", "")
                break

        # Ensure GSM8K format (#### prefix)
        if content.startswith("#### "):
            self._instance_dict[instance_id]["response"] = content
        else:
            self._instance_dict[instance_id]["response"] = "#### " + content

        reward = await self.calculate_score(instance_id)
        if reward == 1.0:
            return True, "Your response is correct!", 1.0, {}
        else:
            return False, "Your response is incorrect! You need to reflect on your answer and try again.", 0.0, {}

    async def calculate_score(self, instance_id, **kwargs):
        return gsm8k.compute_score(
            self._instance_dict[instance_id]["response"],
            self._instance_dict[instance_id]["ground_truth"],
            method="flexible", format_score=0.0, score=1.0,
        )

    async def finalize_interaction(self, instance_id, **kwargs):
        del self._instance_dict[instance_id]

Training Integration

Training Script Configuration

Include interaction configuration in your training command:

python3 -m verl.trainer.main_ppo \\
    --config-path="$CONFIG_PATH" \\
    --config-name='gsm8k_multiturn_grpo_w_interaction' \\
    algorithm.adv_estimator=grpo \\
    data.train_batch_size=512 \\
    data.return_raw_chat=True \\
    actor_rollout_ref.rollout.name=sglang \\
    actor_rollout_ref.rollout.multi_turn.interaction_config_path="$PROJECT_DIR/examples/sglang_multiturn/config/interaction_config/gsm8k_interaction_config.yaml" \\
    trainer.total_epochs=15

Data Requirements

Ensure your dataset includes interaction parameters:

# Dataset should include interaction_kwargs in non_tensor_batch
interaction_kwargs = [
    {"query": "What is 2+2?", "ground_truth": "4"},
    {"query": "What is 3+3?", "ground_truth": "6"},
]

Best Practices

Resource Management

  • Always implement proper cleanup in finalize_interaction()

  • Use unique instance IDs to avoid conflicts in concurrent training

  • Handle edge cases like empty messages or malformed content

Performance Optimization

  • Keep interaction logic lightweight to avoid blocking training

  • Use async/await properly to maintain non-blocking behavior

  • Consider caching expensive computations within interaction instances

Testing

Comprehensive testing is essential for interaction systems:

import pytest
from unittest.mock import patch

@pytest.mark.asyncio
async def test_interaction_workflow():
    interaction = YourInteraction({})

    # Test complete workflow
    instance_id = await interaction.start_interaction(ground_truth="expected_answer")

    messages = [{"role": "user", "content": "user_response"}]
    should_terminate, response, reward, metadata = await interaction.generate_response(instance_id, messages)

    assert should_terminate in [True, False]
    assert isinstance(reward, float)

    await interaction.finalize_interaction(instance_id)

Advanced Usage

Custom Scoring Functions

You can integrate custom reward functions:

async def calculate_score(self, instance_id, **kwargs):
    response = self._instance_dict[instance_id]["response"]
    ground_truth = self._instance_dict[instance_id]["ground_truth"]

    # Custom evaluation logic
    if custom_evaluation_function(response, ground_truth):
        return 1.0
    else:
        return 0.0

Multi-step Interactions

For complex scenarios requiring multiple feedback rounds:

async def generate_response(self, instance_id, messages, **kwargs):
    instance = self._instance_dict[instance_id]
    instance["attempts"] += 1

    # Evaluate current response
    reward = await self.calculate_score(instance_id)

    if reward > 0.8:
        return True, "Excellent work!", reward, {}
    elif instance["attempts"] < 3:
        return False, "Good attempt, but try to improve...", reward, {}
    else:
        return True, "Maximum attempts reached.", reward, {}

Troubleshooting

Common Issues

  1. Instance ID Conflicts: Ensure unique instance IDs across concurrent sessions

  2. Memory Leaks: Always call finalize_interaction() to clean up resources

  3. Blocking Operations: Keep interaction logic async and non-blocking

  4. Configuration Errors: Verify interaction config path and class name are correct

Debugging

Enable debug logging to trace interaction flow:

export VERL_LOGGING_LEVEL=DEBUG

Performance Monitoring

Monitor interaction performance impact on training throughput and adjust accordingly.