PipelineRL
A scalable asynchronous reinforcement learning implementation with in-flight weight updates.
Install / Use
/learn @ServiceNow/PipelineRLREADME
Table of Contents
Overview
A scalable asynchronous reinforcement learning implementation with in-flight weight updates. Designed to maximize GPU utilization while staying as on-policy as possible.
<p align="center"> <img src="assets/figure1.jpg" alt="Pipeline-RL Architecture" width="600"> </p>PipelineRL tackles the classic trade-off between inference throughput (large batches on many GPUs) and on-policy data freshness by performing inflight weight updates. After each optimizer step, updated weights are broadcast to the inference servers without halting sampling. This keeps batch sizes optimal and data near on-policy, yielding fast, stable RL for large language models.
<p align="center"> <img src="assets/losses.png" alt="Pipeline-RL Effectiveness" width="600"> </p>- In experiments on 7B and 32B models (batch size 4096, lr=1e-6, max tokens=8192), PipelineRL matches or exceeds Open-Reasoner-Zero on AIME-2024 and MATH-500.
- Uses a simplified GRPO algorithm: no value network, no trust-region clamping, no KL or entropy bonuses by default (though KL support is available).
Get started
PipelineRL is agent framework agnostic, meaning you can use it to train any agent by implementing a load_problems and generate_rollout functions for your task. For example, we can easily design and train a multi-turn LLM agent that must guess a number between 1 and 1024. After each guess, the agent receives feedback whether the guess was higher or lower than the target number.
First, we must implement load_problems to generate a list of train and test problems. Each problem is a dictionary with an answer key and a dataset key indicating whether it belongs to the training or testing dataset.
def load_problems(dataset_names: list[str]) -> list[dict]:
n = 1024
c = 191
problems = []
for name in dataset_names:
if name == "train":
problems.extend([
{"answer": (2 * i * c) % n + 1, "dataset": "train"} for i in range(512)
])
elif name == "test":
problems.extend([
{"answer": ((2 * i + 1) * c) % n + 1, "dataset": "test"} for i in range(512)
])
return problems
Then, we must implement a generate_rollout function which takes a problem from the load_problems function and generate a RolloutResult. RolloutResult contains the a list of TrainingText (token ids, log probs, reward, etc.), BaseMetrics (reward, success, etc.), latency of the rollout in seconds, and the dataset_name which will be used for grouping the metrics. Here, the function should use an LLM to generate guesses and provide feedback based on the problem's answer.
async def generate_rollout(
cfg: DictConfig,
llm: TrainableLLM,
problem: dict,
session: aiohttp.ClientSession,
) -> RolloutResult:
initial_messages = [
{
"role": "system",
"content": "You are a helpful assistant",
},
{
"role": "user",
"content": f"You must guess a number between 1 and 1024. Output the answer as <answer>number</answer>."
" After each guess I will tell you if your answer is higher or lower than the target number."
}
]
time_start = time.time()
llm_calls = []
guess_history = []
reward = 0
success = 0
error = 0
for i in range(13):
messages = initial_messages.copy()
if i > 0:
last_message = f"Your {i} previous guesses:"
for guess in guess_history:
relation = "lower" if guess < problem["answer"] else "higher"
last_message += f"\n{guess}, which is {relation} than the target number."
else:
last_message += "\n<wrong output>"
messages.append({
"role": "user",
"content": last_message
})
llm_call = await llm_async_generate(llm, Prompt(messages=messages), session)
llm_calls.append(llm_call)
output_text = llm_call.output.content or ""
answer = re.search("<answer>(\d+)</answer>", output_text)
if answer:
answer = int(answer.group(1))
if answer == problem["answer"]:
reward = 2 - i / 10
success = 1
break
else:
guess_history.append(answer)
else:
# bonus for using the correct output format in the first turns
reward = -2 + i / 10
error = 1
break
latency = time.time() - time_start
# TrainingText contains the prompt and output tokens, reward, and the log probs of the output tokens necessary for RL training.
training_texts = [make_training_text(llm, llm_call) for llm_call in llm_calls]
for text in training_texts:
text.reward = reward
metrics = BaseMetrics(
reward=reward,
success=success,
no_error=not error,
no_answer=error,
)
return RolloutResult(
training_texts=training_texts,
metrics=metrics,
latency=latency,
dataset_name=problem["dataset"],
)
Finally you need to create a Hydra config file that points to the rollout function and the dataset loader. Additional hyper-parameters such as model path, learning rate, etc. can also be modified. For example, guessing.yaml:
defaults:
- base
- _self_
actor:
rollout_policy: pipelinerl.domains.guessing.generate_guessing_rollout
environment: null
dataset_loader: pipelinerl.domains.guessing.load_problems
train_dataset_names:
- train
test_dataset_names:
- test
You can now launch the training with the following command:
python -m pipelinerl.launch --config-name=guessing output_dir=results/guessing
Once the LLMs are served, the actor will be evaluated on the test dataset before collecting training rollouts.
<p align="center"> <img src="assets/actor.png" alt="Actor" width="800"> </p>When enough data has been collected, the trainer will perform a RL step and update the actor's weights.
<p align="center"> <img src="assets/rl_loss.png" alt="RL loss" width="800"> </p>The streaming logs can be overwhelming, and it is therefore easier to debug using the each process log files in the results/guessing:
After roughly 20 minutes, the actor will have learned a strategy to guess the number correctly. The training can be monitored in real-time using WANDB, which will show the training and test metrics:
<p align="center"> <img src="assets/guessing_success.png" alt="Logs folder" width="800"> </p>Setup
Clone the repository and change the directory to pipelinerl
git clone git@github.com:ServiceNow/PipelineRL.git
cd PipelineRL
Create the environments with dependencies.
conda create -n pipeline-rl -y python=3.11
conda run --no-capture-output -n pipeline-rl pip install -e .
conda run --no-capture-output -n pipeline-rl pip install flash-attn==2.7.4.post1 --no-build-isolation
Alternatively for flash-attn, you can install it via prebuilt packages (on Linux):
# Check your PyTorch's C++ ABI setting first:
# python -c "import torch; print(torch._C._GLIBCXX_USE_CXX11_ABI)"
# Use cxx11abiTRUE or cxx11abiFALSE in the URL accordingly
conda run --no-capture-output -n pipeline-rl pip install https://github.com/Dao-AILab/flash-attention/releases/download/v2.7.4.post1/flash_attn-2.7.4.post1+cu12torch2.6cxx11abiTRUE-cp311-cp311-linux_x86_64.whl
By default Pipeline-RL will use the file system as the medium for streaming the generated data to the trainer processes. This works on one node, but the files can get quite large. To use Redis instead you will need to install the Redis server in the same conda environment:
conda install redis-server==7.4.0 -c conda-forge
Optional: SandboxFusion for coding verification
PipelineRL supports using SandboxFusion to execute and verify coding-task outputs in a remote sandbox.
To run SandboxFusion locally, follow the deployment guide and startup logs here: https://bytedance.github.io/SandboxFusion/docs/docs/get-started#local-deployment
Then point PipelineRL to your sandbox endpoint by setting sandbox_endpoint in your config (for example in conf/coding.yaml) or by exporting SANDBOX_ENDPOINT:
export SANDBOX_ENDPOINT=http://127.0.0.1:8080
Optional: SandboxFusion for coding verification
PipelineRL supports using SandboxFusion to execute and verify coding-task outputs in a remote sandbox.
To run SandboxFusion locally, follow the deployment guide and startup logs here: https://bytedance.github.io/SandboxFusion/docs/docs/get-started#local-deployment
Related Skills
YC-Killer
2.7kA library of enterprise-grade AI agents designed to democratize artificial intelligence and provide free, open-source alternatives to overvalued Y Combinator startups. If you are excited about democratizing AI access & AI agents, please star ⭐️ this repository and use the link in the readme to join our open source AI research team.
groundhog
398Groundhog's primary purpose is to teach people how Cursor and all these other coding agents work under the hood. If you understand how these coding assistants work from first principles, then you can drive these tools harder (or perhaps make your own!).
isf-agent
a repo for an agent that helps researchers apply for isf funding
last30days-skill
17.6kAI agent skill that researches any topic across Reddit, X, YouTube, HN, Polymarket, and the web - then synthesizes a grounded summary
