Datatrove
Freeing data processing from scripting madness by providing a set of platform-agnostic customizable pipeline processing blocks.
Install / Use
/learn @huggingface/DatatroveREADME
DataTrove
DataTrove is a library to process, filter and deduplicate text data at a very large scale. It provides a set of prebuilt commonly used processing blocks with a framework to easily add custom functionality.
DataTrove processing pipelines are platform-agnostic, running out of the box locally or on a slurm cluster. Its (relatively) low memory usage and multiple step design makes it ideal for large workloads, such as to process an LLM's training data.
Local, remote and other file systems are supported through fsspec.
Table of contents
<!-- toc -->- Installation
- Quickstart examples
- Terminology
- Pipeline
- Executors
- Logging
- DataFolder / paths
- Practical guides
- Contributing
- Citation
Installation
Requires Python 3.10+.
pip install datatrove[FLAVOUR]
Available flavours (combine them with , i.e. [processing,s3]):
allinstalls everything:pip install datatrove[all]iodependencies to readwarc/arc/wetfiles and arrow/parquet/Optimized-parquet formats:pip install datatrove[io]processingdependencies for text extraction, filtering and tokenization:pip install datatrove[processing]s3s3 support:pip install datatrove[s3]clifor command line tools:pip install datatrove[cli]rayfor distributed compute engine:pip install datatrove[ray]inferencefor LLM inference pipelines:pip install datatrove[inference]decontfor decontamination with lighteval:pip install datatrove[decont]multilingualfor multilingual text processing:pip install datatrove[multilingual]
Quickstart examples
You can check the following examples:
- fineweb.py full reproduction of the FineWeb dataset
- process_common_crawl_dump.py full pipeline to read commoncrawl warc files, extract their text content, filters and save the resulting data to s3. Runs on slurm
- tokenize_c4.py reads data directly from huggingface's hub to tokenize the english portion of the C4 dataset using the
gpt2tokenizer - estimate_tokens.py estimate total token counts for large HF datasets — needed to set the correct
SamplerFilterrate when creating a random shuffled subsample (e.g. 100B tokens from a multi-trillion-token dataset). Streams a small sample per dataset, converges on the average tokens/doc, and multiplies by the total row count. - smol_data.py builds ~100B token subsets, 50-30-20 mixtures, and shuffled variants for several large Hugging Face datasets
- minhash_deduplication.py full pipeline to run minhash deduplication of text data
- sentence_deduplication.py example to run sentence level exact deduplication
- exact_substrings.py example to run ExactSubstr (requires this repo)
- finephrase.py standalone example to generate a synthetic dataset at scale with multiple prompt templates
Terminology
pipeline: a list of processing steps to execute (read data, filter, write to disk, etc)executor: runs a specific pipeline on a given execution environment (slurm, multi cpu machine, etc)job: the execution of a pipeline on a given executortask: ajobis comprised of multipletasks, and these are used to parallelize execution, usually by having eachtaskprocess ashardof data. Datatrove keeps track of which tasks have completed and when you relaunch only incomplete tasks will run.file: an individual input file (.json, .csv, etc).
[!TIP] Note that each file will be processed by a single
task. Datatrove does not automatically split a file into multiple parts, so to fully parallelize you should have multiple medium sized files rather than a single large file)
shard: a group of input data (usually a group offiles), which will be assigned to a specifictask. Eachtaskwill process a different non overlappingshardof data, from the full list of input filesworker: compute resource that will execute a single task at a time, e.g., if you have 50 cpu cores you can run a LocalPipelineExecutor withworkers=50, to execute 50taskssimultaneously (one per cpu). Once aworkeris done with atask, it will start processing another waitingtask
[!TIP] Your number of
taskscontrols how much you can parallelize and also how much time each individual processing unit will take. If you have a small number of tasks (and they each therefore have to process a large number of files) and they fail, you will have to restart from scratch, whereas if you have a larger number of small tasks each failed task will take way less time to rerun.
[!CAUTION] If your
tasks>files, some tasks will not process any data, so there usually isn't a point in settingtasksto a number larger thanfiles.
Example
Running a job to process 10000 files, on a machine with 100 cpu cores (workers). If we choose to use 1000 tasks, each one will process a shard of 10 files. workers=100 means that we can process 100 tasks at a time.
Pipeline
DataTrove Document
Each pipeline block processes data in the datatrove Document format:
textthe actual text content for each sampleida unique id (string) for this samplemetadataa dictionary where any additional info may be stored
Types of pipeline blocks
Each pipeline block takes a generator of Document as input and returns another generator of Document.
- readers read data from different formats and yield
Document - writers save
Documentto disk/cloud in different formats - extractors extract text content from raw formats (such as webpage html)
- filters filter out (remove) some
Documents based on specific rules/criteria - stats blocks to collect statistics on the dataset
- tokens blocks to tokenize data or count tokens
- dedup blocks for deduplication
Full pipeline
A pipeline is defined as a list of pipeline blocks. As an example, the following pipeline would read data from disk, randomly filter (remove) some documents and write them back to disk:
from datatrove.pipeline.readers import CSVReader
from datatrove.pipeline.filters import SamplerFilter
from datatrove.pipeline.writers import JsonlWriter
pipeline = [
CSVReader(
data_folder="/my/input/path"
),
SamplerFilter(rate=0.5),
JsonlWriter(
output_folder="/my/output/path"
)
]
Executors
Pipelines are platform-agnostic, which means that the same pipeline can smoothly run on different execution environments without any changes to its steps. Each environment has its own PipelineExecutor.
Some options common to all executors:
pipelinea list consisting of the pipeline steps that should be runlogging_dira datafolder where log files, statistics and more should be saved. Do not reuse folders for different pipelines/jobs as this will overwrite your stats, logs and completions.skip_completed(bool,Trueby default) datatrove keeps track of completed tasks so that when you relaunch a job they can be skipped. Set this toFalseto disable this behaviourrandomize_start_duration(int,0by default) the maximum number of seconds to delay the start of each task to prevent all tasks from starting simultaneously and potentially overloading the system.
Call an executor's run method to execute its pipeline.
[!TIP] Datatrove keeps track of which tasks successfully completed by creating a marker (an empty file) in the
${logging_dir}/completionsfolder. Once the job finishes, if some of its tasks have failed, you can simply relaunch the exact same executor and datatrove will check and only run the tasks that were not previously completed.
[!CAUTION] If you relaunch a pipeline because some tasks failed, do not change the total number of tasks as this will affect the distribution of input files/sharding.
LocalPipelineExecutor
This executor will launch a pipeline on a local machine. Options:
taskstotal number of tasks to runworkershow many tasks to run simultaneously. If `-
Related Skills
node-connect
337.7kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
frontend-design
83.3kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
openai-whisper-api
337.7kTranscribe audio via OpenAI Audio Transcriptions API (Whisper).
commit-push-pr
83.3kCommit, push, and open a PR
