Petastorm
Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Install / Use
/learn @uber/PetastormREADME
Petastorm
.. image:: https://github.com/uber/petastorm/actions/workflows/unittest.yml/badge.svg?branch=master :target: https://github.com/uber/petastorm/actions/workflows/unittest.yml :alt: Build Status
.. image:: https://codecov.io/gh/uber/petastorm/branch/master/graph/badge.svg :target: https://codecov.io/gh/uber/petastorm/branch/master :alt: Code coverage
.. image:: https://img.shields.io/badge/License-Apache%202.0-blue.svg :target: https://img.shields.io/badge/License-Apache%202.0-blue.svg :alt: License
.. image:: https://badge.fury.io/py/petastorm.svg :target: https://pypi.org/project/petastorm :alt: Latest Version
.. inclusion-marker-start-do-not-remove
.. contents::
Petastorm is an open source data access library developed at Uber ATG. This library enables single machine or
distributed training and evaluation of deep learning models directly from datasets in Apache Parquet
format. Petastorm supports popular Python-based machine learning (ML) frameworks such as
Tensorflow <http://www.tensorflow.org/>, PyTorch <https://pytorch.org/>, and
PySpark <http://spark.apache.org/docs/latest/api/python/pyspark.html>_. It can also be used from pure Python code.
Documentation web site: <https://petastorm.readthedocs.io>_
Installation
.. code-block:: bash
pip install petastorm
There are several extra dependencies that are defined by the petastorm package that are not installed automatically.
The extras are: tf, tf_gpu, torch, opencv, docs, test.
For example to trigger installation of GPU version of tensorflow and opencv, use the following pip command:
.. code-block:: bash
pip install petastorm[opencv,tf_gpu]
Generating a dataset
A dataset created using Petastorm is stored in Apache Parquet <https://parquet.apache.org/>_ format.
On top of a Parquet schema, petastorm also stores higher-level schema information that makes multidimensional arrays into a native part of a petastorm dataset.
Petastorm supports extensible data codecs. These enable a user to use one of the standard data compressions (jpeg, png) or implement her own.
Generating a dataset is done using PySpark. PySpark natively supports Parquet format, making it easy to run on a single machine or on a Spark compute cluster. Here is a minimalistic example writing out a table with some random data.
.. code-block:: python
import numpy as np from pyspark.sql import SparkSession from pyspark.sql.types import IntegerType
from petastorm.codecs import ScalarCodec, CompressedImageCodec, NdarrayCodec from petastorm.etl.dataset_metadata import materialize_dataset from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField
The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema('HelloWorldSchema', [ UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False), UnischemaField('image1', np.uint8, (128, 256, 3), CompressedImageCodec('png'), False), UnischemaField('array_4d', np.uint8, (None, 128, 30, None), NdarrayCodec(), False), ])
def row_generator(x): """Returns a single entry in the generated dataset. Return a bunch of random values as an example.""" return {'id': x, 'image1': np.random.randint(0, 255, dtype=np.uint8, size=(128, 256, 3)), 'array_4d': np.random.randint(0, 255, dtype=np.uint8, size=(4, 128, 30, 3))}
def generate_petastorm_dataset(output_url='file:///tmp/hello_world_dataset'): rowgroup_size_mb = 256
spark = SparkSession.builder.config('spark.driver.memory', '2g').master('local[2]').getOrCreate()
sc = spark.sparkContext
# Wrap dataset materialization portion. Will take care of setting up spark environment variables as
# well as save petastorm specific metadata
rows_count = 10
with materialize_dataset(spark, output_url, HelloWorldSchema, rowgroup_size_mb):
rows_rdd = sc.parallelize(range(rows_count))\
.map(row_generator)\
.map(lambda x: dict_to_spark_row(HelloWorldSchema, x))
spark.createDataFrame(rows_rdd, HelloWorldSchema.as_spark_schema()) \
.coalesce(10) \
.write \
.mode('overwrite') \
.parquet(output_url)
HelloWorldSchemais an instance of aUnischemaobject.Unischemais capable of rendering types of its fields into different framework specific formats, such as: SparkStructType, Tensorflowtf.DTypeand numpynumpy.dtype.- To define a dataset field, you need to specify a
type,shape, acodecinstance and whether the field is nullable for each field of theUnischema. - We use PySpark for writing output Parquet files. In this example, we launch
PySpark on a local box (
.master('local[2]')). Of course for a larger scale dataset generation we would need a real compute cluster. - We wrap spark dataset generation code with the
materialize_datasetcontext manager. The context manager is responsible for configuring row group size at the beginning and write out petastorm specific metadata at the end. - The row generating code is expected to return a Python dictionary indexed by
a field name. We use
row_generatorfunction for that. dict_to_spark_rowconverts the dictionary into apyspark.Rowobject while ensuring schemaHelloWorldSchemacompliance (shape, type and is-nullable condition are tested).- Once we have a
pyspark.DataFramewe write it out to a parquet storage. The parquet schema is automatically derived fromHelloWorldSchema.
Plain Python API
The petastorm.reader.Reader class is the main entry point for user
code that accesses the data from an ML framework such as Tensorflow or Pytorch.
The reader has multiple features such as:
- Selective column readout
- Multiple parallelism strategies: thread, process, single-threaded (for debug)
- N-grams readout support
- Row filtering (row predicates)
- Shuffling
- Partitioning for multi-GPU training
- Local caching
Reading a dataset is simple using the petastorm.reader.Reader class which can be created using the
petastorm.make_reader factory method:
.. code-block:: python
from petastorm import make_reader
with make_reader('hdfs://myhadoop/some_dataset') as reader:
for row in reader:
print(row)
hdfs://... and file://... are supported URL protocols.
Once a Reader is instantiated, you can use it as an iterator.
Tensorflow API
To hookup the reader into a tensorflow graph, you can use the tf_tensors
function:
.. code-block:: python
from petastorm.tf_utils import tf_tensors
with make_reader('file:///some/localpath/a_dataset') as reader:
row_tensors = tf_tensors(reader)
with tf.Session() as session:
for _ in range(3):
print(session.run(row_tensors))
Alternatively, you can use new tf.data.Dataset API;
.. code-block:: python
from petastorm.tf_utils import make_petastorm_dataset
with make_reader('file:///some/localpath/a_dataset') as reader:
dataset = make_petastorm_dataset(reader)
iterator = dataset.make_one_shot_iterator()
tensor = iterator.get_next()
with tf.Session() as sess:
sample = sess.run(tensor)
print(sample.id)
Pytorch API
As illustrated in
pytorch_example.py <https://github.com/uber/petastorm/blob/master/examples/mnist/pytorch_example.py>_,
reading a petastorm dataset from pytorch
can be done via the adapter class petastorm.pytorch.DataLoader,
which allows custom pytorch collating function and transforms to be supplied.
Be sure you have torch and torchvision installed:
.. code-block:: bash
pip install torchvision
The minimalist example below assumes the definition of a Net class and
train and test functions, included in pytorch_example:
.. code-block:: python
import torch
from petastorm.pytorch import DataLoader
torch.manual_seed(1)
device = torch.device('cpu')
model = Net().to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
def _transform_row(mnist_row):
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])
return (transform(mnist_row['image']), mnist_row['digit'])
transform = TransformSpec(_transform_row, removed_fields=['idx'])
with DataLoader(make_reader('file:///localpath/mnist/train', num_epochs=10,
transform_spec=transform, seed=1, shuffle_rows=True), batch_size=64) as train_loader:
train(model, device, train_loader, 10, optimizer, 1)
with DataLoader(make_reader('file:///localpath/mnist/test', num_epochs=10,
transform_spec=transform), batch_size=1000) as test_loader:
test(model, device, test_loader)
If you are working with very large batch sizes and do not need support for Decimal/strings we provide a petastorm.pytorch.BatchedDataLoader that can buffer using Torch tensors (cpu or cuda) with a signficantly higher throughput.
If the size of your dataset can fit into system memory, you can use an in-memory version dataloader petastorm.pytorch.InMemBatchedDataLoader. This dataloader only reades the dataset once, and caches data in memory to avoid additional I/O for multiple epochs.
Spark Dataset Converter API
Spark converter API simplifies the data conversion from Spark to TensorFlow or PyTorch.
The input Spark DataFrame is first materialized in the parquet format and then loaded as
a tf.data.Dataset or torch.utils.data.DataLoader.
The minimalist example below assumes th
Related Skills
openhue
339.1kControl Philips Hue lights and scenes via the OpenHue CLI.
sag
339.1kElevenLabs text-to-speech with mac-style say UX.
weather
339.1kGet current weather and forecasts via wttr.in or Open-Meteo
tweakcc
1.5kCustomize Claude Code's system prompts, create custom toolsets, input pattern highlighters, themes/thinking verbs/spinners, customize input box & user message styling, support AGENTS.md, unlock private/unreleased features, and much more. Supports both native/npm installs on all platforms.
