SkillAgentSearch skills...

TransferQueue

An asynchronous streaming data management module for efficient post-training.

Install / Use

/learn @Ascend/TransferQueue
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

<div align="center"> <h2 align="center"> TransferQueue: An asynchronous streaming data management module for efficient post-training </h2> <a href="https://arxiv.org/abs/2507.01663" target="_blank"><strong>Paper</strong></a> | <a href="https://zhuanlan.zhihu.com/p/1930244241625449814" target="_blank"><strong>Zhihu</strong></a> | <a href="https://github.com/TransferQueue/community_doc/blob/main/other_assets/WeChatGroup.png?raw=true" target="_blank"><strong>WeChat</strong></a> <br /> <br />

<a href="https://deepwiki.com/Ascend/TransferQueue"><img src="https://devin.ai/assets/deepwiki-badge.png" alt="Ask DeepWiki.com" style="height:20px;"></a> GitHub Repo stars GitHub commit activity

</div> <br/> <h2 id="overview">🎉 Overview</h2>

TransferQueue is a high-performance data storage and transfer module with panoramic data visibility and streaming scheduling capabilities, optimized for efficient dataflow in post-training workflows.

<p align="center"> <img src="https://github.com/TransferQueue/community_doc/blob/main/docs/tq_arch.png?raw=true" width="70%"> </p>

TransferQueue offers fine-grained, sub-sample-level data management and load-balancing capabilities. It serves as a data gateway that decouples explicit data dependencies across computational tasks, enabling a divide-and-conquer approach that significantly simplifies algorithm controller design.

<p align="center"> <img src="https://github.com/TransferQueue/community_doc/blob/main/docs/main_func.png?raw=true" width="100%"> </p> <h2 id="updates">🔄 Updates</h2>
  • Feb 8, 2026: 🔥 Initialization and usage are greatly simplified by high-level APIs PR#26, PR#28. You can now use a Redis-style API to take advantage of most of the advanced features provided by TransferQueue!
  • Jan 28, 2026: We experimentally introduce the StreamingDataLoader interface for a fully-streamed production-consumption pipeline. Refer to our tutorials/06_streaming_dataloader.py for details.
  • Dec 30, 2025: TransferQueue x verl integration has been tested with the DAPO algorithm at scale (64 nodes, 1024 cards). It significantly optimizes host memory utilization and accelerates data transfers. Stay tuned for more details!
  • Dec 20, 2025: 🔥 The official tutorial is released! Feel free to check it out.
  • Nov 10, 2025: We disentangled the data retrieval logic from TransferQueueController PR#101. Now you can implement your own Sampler to customize data consumption.
  • Nov 5, 2025: We provide a KVStorageManager that simplifies the integration with KV-based storage backends PR#96. The first available KV-based backend is Yuanrong.
  • Nov 4, 2025: Data partitioning capability is available in PR#98. Now you can define logical data partitions to manage your train/val/test datasets.
  • Oct 25, 2025: Storage backends are now pluggable in PR#66. You can try to integrate your own storage backend with TransferQueue now!
  • Oct 21, 2025: Official integration into verl is ready verl/pulls/3649. Following PRs will optimize the single controller architecture by fully decoupling data & control flows.
  • July 22, 2025: We published a series of Chinese blog posts on <a href="https://zhuanlan.zhihu.com/p/1930244241625449814">Zhihu 1</a>, <a href="https://zhuanlan.zhihu.com/p/1933259599953232589">2</a>.
  • July 21, 2025: We initiated an RFC in the verl community verl/RFC#2662.
  • July 2, 2025: We published the paper AsyncFlow.
<h2 id="components">🧩 Components</h2>

Control Plane: Panoramic Data Management

In the control plane, TransferQueueController tracks the production status and consumption status of each training sample as metadata. Once all required data fields are ready (i.e., written to the TransferQueueStorageManager), the data sample can be consumed by downstream tasks.

We also track the consumption history for each computational task (e.g., generate_sequences, compute_log_prob, etc.). Therefore, even when different computational tasks require the same data field, they can consume the data independently without interfering with each other.

<p align="center"> <img src="https://github.com/TransferQueue/community_doc/blob/main/docs/control_plane.png?raw=true" width="70%"> </p>

To make the data retrieval process more customizable, we provide a Sampler class that allows users to define their own data retrieval and consumption logic. Refer to the Customize section for details.

load-balancing capabilities are experimentally supported in the control plane. This design enables us to offload some data management capabilities from single controller. Refer to #PR70 for details.

Data Plane: Distributed Data Storage

In the data plane, we utilize a pluggable design, enabling TransferQueue to integrate with different storage backends based on user requirements.

Specifically, we provide a TransferQueueStorageManager abstraction class that defines the core APIs as follows:

  • async def put_data(self, data: TensorDict, metadata: BatchMeta) -> None
  • async def get_data(self, metadata: BatchMeta) -> TensorDict
  • async def clear_data(self, metadata: BatchMeta) -> None

This class encapsulates the core interaction logic within the TransferQueue system. You only need to write a simple subclass to integrate your custom storage backend. Refer to the Customize section for details.

Currently, we support the following storage backends:

  • SimpleStorage: A basic CPU memory storage with minimal data format constraints and ease of use.
  • Yuanrong (beta, #PR107, #PR96): An Ascend native data system that provides hierarchical storage interfaces including HBM/DRAM/SSD.
  • MooncakeStore (beta, #PR162): A high-performance, KV-based hierarchical storage that supports RDMA transport between GPU and DRAM.
  • RayRDT (alpha, #PR167): Ray's new feature that allows Ray to store and pass objects directly between Ray actors.

Among them, SimpleStorageUnit serves as our default storage backend, coordinated by the AsyncSimpleStorageManager class. Each storage unit can be deployed on a separate node, allowing for distributed data management.

SimpleStorageUnit employs a 2D data structure as follows:

  • Each row corresponds to a training sample, assigned a unique index within the corresponding global batch.
  • Each column represents the input/output data fields for computational tasks.

This data structure design is motivated by the computational characteristics of the post-training process, where each training sample is generated in a relayed manner across task pipelines. It provides precise addressing capabilities, enabling fine-grained, concurrent data read/write operations in a streaming manner.

<p align="center"> <img src="https://github.com/TransferQueue/community_doc/blob/main/docs/data_plane.png?raw=true" width="70%"> </p>

User Interface: High-Level & Low-Level APIs

| Level | Tier | Style | Fine-Grained Access | Streaming | Sampler | Multiple-Backends | |---|---|---|---|------------------|---|---| | High | KV Interface (PR#28)| Put/Get/List/Clear | ✓ | ○ | ✗ | ✓ | | High | StreamingDataLoader (PR#23) | PyTorch DataLoader | ✓ | ✓ | ✓ | ✓ | | Low | TransferQueueClient | Metadata-based | ✓ | ✓ | ✓ | ✓ |

Key-Value based API

To simplify the usage of TransferQueue, we provide a Redis-style high-level API that exposes most of its advanced features (PR#28).

Methods

  • (async_)kv_put: Insert/Update a multi-column sample by key, with an optional metadata tag.
  • (async_)kv_batch_put: Put multiple key-value pairs efficiently in batches.
  • (async_)kv_batch_get: Retrieve samples (by keys), supporting column selection (by fields).
  • (async_)kv_list: List keys and tags (metadata) in a partition.
  • (async_)kv_clear: Remove key-value pairs from storage.

Key Features

  • Redis-style Semantics: Familiar KV interface (Put/Get/List) for a zero learning curve.
  • Fine-grained Access: Update or retrieve specific fields (columns) within a key (row) without requiring a full-row operation.
  • Partition Isolation: Logical separation of storage namespaces.
  • Metadata Tags: Lightweight metadata for status tracking.
  • Pluggable Backends: Supports multiple backends.

Refer to [tutorials/basic.ipynb](https://github.com/Ascend/TransferQueue/blob/main/tutorial/basic.ipyn

View on GitHub
GitHub Stars43
CategoryDevelopment
Updated13h ago
Forks16

Languages

Python

Security Score

90/100

Audited on Apr 9, 2026

No findings