SkillAgentSearch skills...

PipelineRL

A scalable asynchronous reinforcement learning implementation with in-flight weight updates.

Install / Use

/learn @ServiceNow/PipelineRL
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

<p align="center"> <img src="assets/logo.png" alt="PipelineRL" width="500"> </p> <h1 align="center">PipelineRL: Fast LLM Agent Training</h1> <p align="center"> <a href="https://huggingface.co/blog/ServiceNow/pipelinerl/"><img src="https://img.shields.io/badge/HF%20Blog%20Post-0000" alt="Blog"></a> <a href="https://arxiv.org/abs/2509.19128"><img src="https://img.shields.io/badge/arXiv-2509.19128-b31b1b" alt="Paper"></a> </p>

Table of Contents

Overview

A scalable asynchronous reinforcement learning implementation with in-flight weight updates. Designed to maximize GPU utilization while staying as on-policy as possible.

<p align="center"> <img src="assets/figure1.jpg" alt="Pipeline-RL Architecture" width="600"> </p>

PipelineRL tackles the classic trade-off between inference throughput (large batches on many GPUs) and on-policy data freshness by performing inflight weight updates. After each optimizer step, updated weights are broadcast to the inference servers without halting sampling. This keeps batch sizes optimal and data near on-policy, yielding fast, stable RL for large language models.

<p align="center"> <img src="assets/losses.png" alt="Pipeline-RL Effectiveness" width="600"> </p>
  • In experiments on 7B and 32B models (batch size 4096, lr=1e-6, max tokens=8192), PipelineRL matches or exceeds Open-Reasoner-Zero on AIME-2024 and MATH-500.
  • Uses a simplified GRPO algorithm: no value network, no trust-region clamping, no KL or entropy bonuses by default (though KL support is available).

Get started

PipelineRL is agent framework agnostic, meaning you can use it to train any agent by implementing a load_problems and generate_rollout functions for your task. For example, we can easily design and train a multi-turn LLM agent that must guess a number between 1 and 1024. After each guess, the agent receives feedback whether the guess was higher or lower than the target number.

First, we must implement load_problems to generate a list of train and test problems. Each problem is a dictionary with an answer key and a dataset key indicating whether it belongs to the training or testing dataset.

def load_problems(dataset_names: list[str]) -> list[dict]:
    n = 1024
    c = 191
    problems = []
    for name in dataset_names:
        if name == "train":
            problems.extend([
                {"answer": (2 * i * c) % n + 1, "dataset": "train"} for i in range(512)
            ])
        elif name == "test":
            problems.extend([
                {"answer": ((2 * i + 1) * c) % n + 1, "dataset": "test"} for i in range(512)
            ])
    return problems

Then, we must implement a generate_rollout function which takes a problem from the load_problems function and generate a RolloutResult. RolloutResult contains the a list of TrainingText (token ids, log probs, reward, etc.), BaseMetrics (reward, success, etc.), latency of the rollout in seconds, and the dataset_name which will be used for grouping the metrics. Here, the function should use an LLM to generate guesses and provide feedback based on the problem's answer.

async def generate_rollout(
    cfg: DictConfig,
    llm: TrainableLLM,
    problem: dict,
    session: aiohttp.ClientSession,
) -> RolloutResult:
    initial_messages = [
        {
            "role": "system",
            "content": "You are a helpful assistant",
        },
        {
            "role": "user",
            "content": f"You must guess a number between 1 and 1024. Output the answer as <answer>number</answer>."
                        " After each guess I will tell you if your answer is higher or lower than the target number."
        }
    ]
    time_start = time.time()
    llm_calls = []
    guess_history = []
    reward = 0
    success = 0
    error = 0
    for i in range(13):
        messages = initial_messages.copy()
        if i > 0:
            last_message = f"Your {i} previous guesses:"
            for guess in guess_history:
                relation = "lower" if guess < problem["answer"] else "higher"
                last_message += f"\n{guess}, which is {relation} than the target number."
            else:
                last_message += "\n<wrong output>"
            messages.append({
                "role": "user",
                "content": last_message
            })
        llm_call = await llm_async_generate(llm, Prompt(messages=messages), session)
        llm_calls.append(llm_call)

        output_text = llm_call.output.content or ""
        answer = re.search("<answer>(\d+)</answer>", output_text)
        if answer:
            answer = int(answer.group(1))
            if answer == problem["answer"]:
                reward = 2 - i / 10
                success = 1
                break
            else:
                guess_history.append(answer)                            
        else:
            # bonus for using the correct output format in the first turns
            reward = -2 + i / 10
            error = 1
            break
    latency = time.time() - time_start        

    # TrainingText contains the prompt and output tokens, reward, and the log probs of the output tokens necessary for RL training.
    training_texts = [make_training_text(llm, llm_call) for llm_call in llm_calls]
    for text in training_texts:
        text.reward = reward

    metrics = BaseMetrics(
        reward=reward,
        success=success,
        no_error=not error,
        no_answer=error,
    )

    return RolloutResult(
        training_texts=training_texts,
        metrics=metrics,
        latency=latency,
        dataset_name=problem["dataset"],
    )
    

Finally you need to create a Hydra config file that points to the rollout function and the dataset loader. Additional hyper-parameters such as model path, learning rate, etc. can also be modified. For example, guessing.yaml:

defaults:
    - base
    - _self_

actor:
    rollout_policy: pipelinerl.domains.guessing.generate_guessing_rollout
environment: null
dataset_loader: pipelinerl.domains.guessing.load_problems
train_dataset_names:
    - train
test_dataset_names:
    - test

You can now launch the training with the following command:

python -m pipelinerl.launch --config-name=guessing output_dir=results/guessing

Once the LLMs are served, the actor will be evaluated on the test dataset before collecting training rollouts.

<p align="center"> <img src="assets/actor.png" alt="Actor" width="800"> </p>

When enough data has been collected, the trainer will perform a RL step and update the actor's weights.

<p align="center"> <img src="assets/rl_loss.png" alt="RL loss" width="800"> </p>

The streaming logs can be overwhelming, and it is therefore easier to debug using the each process log files in the results/guessing:

<p align="center"> <img src="assets/logs.png" alt="Logs folder" width="800"> </p>

After roughly 20 minutes, the actor will have learned a strategy to guess the number correctly. The training can be monitored in real-time using WANDB, which will show the training and test metrics:

<p align="center"> <img src="assets/guessing_success.png" alt="Logs folder" width="800"> </p>

Setup

Clone the repository and change the directory to pipelinerl

git clone git@github.com:ServiceNow/PipelineRL.git
cd PipelineRL

Create the environments with dependencies.

conda create -n pipeline-rl -y python=3.11
conda run --no-capture-output -n pipeline-rl pip install -e .
conda run --no-capture-output -n pipeline-rl pip install flash-attn==2.7.4.post1 --no-build-isolation

Alternatively for flash-attn, you can install it via prebuilt packages (on Linux):

# Check your PyTorch's C++ ABI setting first:
# python -c "import torch; print(torch._C._GLIBCXX_USE_CXX11_ABI)"
# Use cxx11abiTRUE or cxx11abiFALSE in the URL accordingly
conda run --no-capture-output -n pipeline-rl pip install https://github.com/Dao-AILab/flash-attention/releases/download/v2.7.4.post1/flash_attn-2.7.4.post1+cu12torch2.6cxx11abiTRUE-cp311-cp311-linux_x86_64.whl

By default Pipeline-RL will use the file system as the medium for streaming the generated data to the trainer processes. This works on one node, but the files can get quite large. To use Redis instead you will need to install the Redis server in the same conda environment:

conda install redis-server==7.4.0 -c conda-forge

Optional: SandboxFusion for coding verification

PipelineRL supports using SandboxFusion to execute and verify coding-task outputs in a remote sandbox.

To run SandboxFusion locally, follow the deployment guide and startup logs here: https://bytedance.github.io/SandboxFusion/docs/docs/get-started#local-deployment

Then point PipelineRL to your sandbox endpoint by setting sandbox_endpoint in your config (for example in conf/coding.yaml) or by exporting SANDBOX_ENDPOINT:

export SANDBOX_ENDPOINT=http://127.0.0.1:8080

Optional: SandboxFusion for coding verification

PipelineRL supports using SandboxFusion to execute and verify coding-task outputs in a remote sandbox.

To run SandboxFusion locally, follow the deployment guide and startup logs here: https://bytedance.github.io/SandboxFusion/docs/docs/get-started#local-deployment

Related Skills

View on GitHub
GitHub Stars390
CategoryEducation
Updated1d ago
Forks42

Languages

Python

Security Score

95/100

Audited on Apr 1, 2026

No findings