Pipeline
Build streaming pipelines in PHP. The missing pipe operator. General-purpose collections pipeline. Since 2017.
Install / Use
/learn @sanmai/PipelineREADME
sanmai/pipeline provides a fluent, memory-efficient way to process iterable data in PHP. It uses lazy evaluation with generators, allowing you to build complex data processing pipelines that are easy to read and write.
Inspired by the pipe operator (|) in shells, it lets you chain operations like map, filter, reduce, zip, and more. Because it processes items one by one and only when needed, it's highly effective for large or even infinite collections without exhausting memory.
The library is rigorously tested and robust. Pipeline neither defines nor throws any exceptions.
Install
composer require sanmai/pipeline
The latest version requires PHP 8.2 or above.
Some earlier versions work under PHP 5.6 and above, but they are not as feature-complete.
Documentation
The documentation includes:
- Quick start guide to get up and running
- Full Pipeline API reference for a deep dive
- Cookbook with practical recipes and patterns
- Best practices with tips for effective usage
Use
use function Pipeline\take;
// iterable corresponds to arrays, generators, iterators
// we use an array here simplicity sake
$iterable = range(1, 3);
// wrap the initial iterable with a pipeline
$pipeline = take($iterable);
// join side by side with other iterables of any type
$pipeline->zip(
\range(1, 3),
map(function () {
yield 1;
yield 2;
yield 3;
})
);
// lazily process their elements together
$pipeline->unpack(function (int $a, int $b, int $c) {
return $a - $b - $c;
});
// map one value into several more
$pipeline->map(function ($i) {
yield pow($i, 2);
yield pow($i, 3);
});
// simple one-to-one mapper
$pipeline->cast(function ($i) {
return $i - 1;
});
// map into arrays
$pipeline->map(function ($i) {
yield [$i, 2];
yield [$i, 4];
});
// unpack array into arguments
$pipeline->unpack(function ($i, $j) {
yield $i * $j;
});
// one way to filter
$pipeline->map(function ($i) {
if ($i > 50) {
yield $i;
}
});
// this uses a filtering iterator from SPL under the hood
$pipeline->filter(function ($i) {
return $i > 100;
});
// reduce to a single value; can be an array or any value
$value = $pipeline->fold(0, function ($carry, $item) {
// for the sake of convenience the default reducer from the simple
// pipeline does summation, just like we do here
return $carry + $item;
});
var_dump($value);
// int(104)
API entry points
All entry points always return an instance of the pipeline.
| Method | Details | Use with |
| ----------- | ----------------------------- | ----------- |
| map() | Takes an optional initial callback, where it must not require any arguments. Other than that, works just like an instance method below. | use function Pipeline\map; |
| take() | Takes any iterables, including arrays, joining them together in succession. | use function Pipeline\take; |
| fromArray() | Takes an array, initializes a pipeline with it. | use function Pipeline\fromArray; |
| fromValues() | Takes variadic arguments, initializes a pipeline with them. | use function Pipeline\fromValues; |
| zip() | Takes an iterable, and several more, transposing them together. | use function Pipeline\zip; |
Instance methods in a nutshell
| Method | Details | A.K.A. |
| ----------- | ----------------------------- | ----------------- |
| map() | Takes an optional callback that for each input value may return one or yield many. Also takes an initial generator, where it must not require any arguments. Provided no callback does nothing. Also available as a plain function. | SelectMany |
| cast() | Takes a callback that for each input value expected to return another single value. Unlike map(), it assumes no special treatment for generators. Provided no callback does nothing. | array_map, Select |
| append() | Appends the contents of an iterable to the end of the pipeline. | array_merge |
| push() | Appends the arguments to the end of the pipeline. | array_push |
| prepend() | Appends the contents of an iterable to the end of the pipeline. | array_merge |
| unshift() | Prepends the pipeline with a list of values. | array_unshift |
| zip() | Takes a number of iterables, transposing them together with the current sequence, if any. | array_map(null, ...$array), Python's zip(), transposition |
| reservoir() | Reservoir sampling with an optional weighting function. | |
| flatten() | Flattens inputs: [[1, 2], [3, 4]] becomes [1, 2, 3, 4]. | flat_map, flatten, collect_concat |
| unpack() | Unpacks arrays into arguments for a callback. Flattens inputs if no callback provided. | |
| chunk() | Chunks the pipeline into arrays of specified length. | array_chunk |
| chunkBy() | Chunks the pipeline into arrays with variable sizes. Size 0 produces empty arrays. | |
| select() | Selects elements for which the callback returns true. By default only removes null and false. Optional onReject for side effects. | array_filter, filter, Where |
| filter() | Alias for select() with strict: false default. Removes all falsy values like array_filter. | array_filter |
| tap() | Performs side effects on each element without changing the values in the pipeline. | |
| skipWhile() | Skips elements while the predicate returns true, and keeps everything after the predicate return false just once. | |
| slice() | Extracts a slice from the inputs. Keys are not discarded intentionally. Supports negative values for both arguments. | array_slice |
| peek() | Returns the first N items as a pipeline/iterable. Use prepend() to restore items if needed. | array_splice |
| fold() | Reduces input values to a single value. Defaults to summation. Requires an initial value. | array_reduce, Aggregate, Sum |
| reduce() | Alias to fold() with a reversed order of arguments. | array_reduce |
| values() | Keep values only. | array_values |
| keys() | Keep keys only. | array_keys |
| flip() | Swaps keys and values. | array_flip |
| tuples() | Converts stream to [key, value] tuples. | |
| max() | Finds the highest value. | max |
| min() | Finds the lowest value. | min |
| count() | Counts values. Eagerly executed.| array_count |
| first() | Returns the first element. | array_first |
| last() | Returns the last element. | array_last |
| each() | Eagerly iterates over the sequence. | foreach, array_walk |
| stream() | Ensures subsequent operations use lazy, non-array paths | |
| runningCount() | Counts seen values using a reference argument. | |
| toList() | Returns an array with all values. Eagerly executed. | |
| toAssoc() | Returns a final array with values and keys. Eagerly executed. | dict, ToDictionary |
| cursor() | Returns a forward-only iterator that maintains position across iterations. | |
| runningVariance() | Computes online statistics: sample mean, sample variance, standard deviation. | Welford's method |
| finalVariance() | Computes final statistics for the sequence. | |
| __construct() | Can be provided with an optional initial iterator. Used in the take() function from above. | |
Pipeline implements IteratorAggregate and can be used as any other iterable.
Pipeline can be used as an argument to count(). Implements Countable. Be warned that operation of counting values is a terminal operation.
In general, Pipeline instances are mutable, meaning every Pipeline-returning method returns the very same Pipeline instance. This gives us great flexibility on trusting someone or something to add processing stages to a Pipeline instance, while also avoiding non-obvious mistakes, raised from a need to strictly follow a fluid interface. E.g. if you add a processing stage, it stays there no matter if you capture the return value or not. This peculiarity could have been a thread-safety hazard in other circumstances, but under PHP this is not an issue.
Caveats
-
Since most callback are lazily evaluated as more data coming in and out, you must consume the results with a plain
foreachor use areduce()to make sure processing happens.foreach ($pipeline as $result) { // Processing happens only if you consume the results. // Want to stop early after few result
Related Skills
node-connect
350.8kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
frontend-design
110.4kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
openai-whisper-api
350.8kTranscribe audio via OpenAI Audio Transcriptions API (Whisper).
qqbot-media
350.8kQQBot 富媒体收发能力。使用 <qqmedia> 标签,系统根据文件扩展名自动识别类型(图片/语音/视频/文件)。
