Pipette
Stream and pipe utilities for Node
Install / Use
/learn @Medium/PipetteREADME
pipette: Stream and pipe utilities for Node
This Node module provides several utility classes that offer pipe and stream-related functionality. It particularly emphasizes providing a consistent event packaging and ordering for streams.
Building and Installing
npm install pipette
Or grab the source and
npm install
Testing
npm test
Or
node ./test/test.js
Event Sequence Philosophy
All of the classes in this module provide a consistently ordered sequence of events, which is meant to be a sensible synthesis of the (somewhat inconsistent) Node specification for the various core stream classes.
In particular, a stream will emit some number of data events
(possibly zero), each with a single payload argument. This will be
followed by either an end event with no payload or an error
event with an arbitrary payload. This is followed by a close event
with no payload. After that, a stream will not emit any further
events, and it is furthermore guaranteed to be detached from its
upstream source(s), if any.
More schematically, as a "railroad" diagram:
+--------------------+ +-------+
| | +->| end() |----------+
v +---------------+ | | +-------+ | +---------+
(start)-+->| data(payload) |-+-+-+ +->| close() |->(finish)
| +---------------+ ^ | +----------------+ | +---------+
| | +->| error(payload) |-+
+----------------------+ +----------------+
Of particular note are the cases of inconsistently-defined close
events. Some streams (core Node stream classes, for example) will emit
a close event with a non-empty payload value to indicate an
unexpected termination. The classes in this module consistently
translate such cases to an error event with the error payload
followed by a no-payload close event. For the purposes of this
module, a "non-empty payload" is one that is neither undefined nor
false. This takes care of the quirky definitions of net.Socket
(which includes a boolean error indicator in its close event) and
http.ClientResponse (which may include an arbitrary error object in
its close event).
The particularly nice thing about this arrangement is that if one
wants to consistently do something after a stream has finished, one
can write the something in question as a close event handler, rather
than splaying the logic between both an end and an error handler.
In the rest of the documentation, it should be taken as implicit that all the classes' event sequences follow this order.
Layering Philosophy
Four of these classes (Cat, Sink, Slicer, and Valve) provide a
layer on top of other streams. The implementation philosophy is that
these listen for events from their "upstream" sources, but they do not
otherwise attempt to interact with those streams. In particular:
-
They do not make upstream calls to the flow-control methods
pause()andresume(). -
They do not attempt to make upstream
setEncoding()calls. -
They do not call upstream
destroy()even when they themselves are beingdestroy()ed.
In addition, these layering classes check upon construction that their
upstream sources are in fact streams that have not yet been ended
(that is, that they are still capable of emitting events). If a stream
source argument fails this check, then the constructor call will throw
an exception indicating that fact. The check is somewhat conservative
(on the side of accepting) and meant to accept stream-like event
emitters in addition to checking bona fide Stream instances.
Details: If a given source is a Stream per se, then the value of
source.readable is taken at face value. Otherwise, a source is
considered to be ended if and only if it (or a prototype in its chain)
defines a readable property and that property's value is falsey.
Constructing stacked readers
Many Node stream classes are designed as an atomic unit that includes both reader and writer methods intermingled in a single object. This module takes a different tack:
-
Any given object is either a reader or a writer, never both.
-
To pass one reader's event output to another, construct the destination object passing it the source, e.g.
new Valve(new OtherStream(...)).
Getting a writer
If you need to get a writer to write into one of the reader classes
(or a stack of same), you can use a Pipe:
var pipe = new Pipe();
var readerStack = new OtherStream(pipe.reader);
var writer = pipe.writer;
writer.write(...); // What's written here will get read by the OtherStream.
A Note About Encodings
Node 0.6.* and 0.8.* differ in their documentation about which encodings
are allowed by setEncoding(). This module accepts the union of the
encodings specified by those. This includes:
ascii— 7-bit ASCIIbase64— standard Base-64 encoding for binary datahex— hex encoding for binary data (two hexadecimal ASCII characters per byte)ucs2— alias forutf16le(below). This is not technically correct (per Unicode spec), but it is how Node is defined.utf16le— standard little-endian UTF-16 encoding for Unicode datautf8— standard UTF-8 encoding for Unicode data
Common Options
All of the classes in this module take an optional options
constructor parameter. If not undefined, this must be a map from
option names to values as specified by the class.
The following are three commonly-accepted options. Classes all accept whichever of these make sense.
-
encoding— A string representing the encoding to use when emitting events. Passing this option is exactly like callingsetEncoding()on the constructed instance. -
incomingEncoding— A string representing the incoming encoding to use when interpreting incomingdataevents that arrive as strings (as opposed to buffers). Passing this option is exactly like callingsetIncomingEncoding()on the constructed instance. -
paused— A boolean value indicating whether the instance should be immediately paused. For most classes, this is exactly like callingpause()on the constructed instance.
API Details
Blip
The Blip class exists to emit a single data event.
This class is useful if you have data that you need to re-emit.
var blip = new Blip([data], [options])
Constructs and returns a new blip which is to emit the given data
(a string or buffer) once unpaused. After emitting the data event,
blips always also emit an end and a close event (in that order).
Of the common options, Blip recognizes encoding and
incomingEncoding, though the latter is with a twist: The
incomingEncoding (either as specified or with the default behavior)
applies immediately to the given data, in order to transform it into
a buffer. That is, if data is passed as a string, it will always get
immediately transformed into a buffer, when an instance is
constructed.
If data is omitted, then the resulting blip will just emit the
ending events, without a data event first.
Blips start out paused, since there is not much point in them immediately emitting their contents upon construction (as there will necessarily be no listeners at that moment).
The constructed instance obeys the full standard Node stream protocol for readers.
Cat
The Cat class (short for "concatenate" and by analogy with the
traditional Unix command with the same name) emits the events from
a sequence of streams, in the order of the given sequence (i.e.
not interspersed).
This can be used, for example, to produce a stream that is prefixed
or suffixed with a given bit of data (when used in combination with
Blip, above).
var cat = new Cat(streams, [options])
Constructs and returns a new cat which is to emit the events from
the given streams (each of which must be an EventEmitter and is
assumed to emit the standard Node readable stream events).
The data events from each stream (in order) are in turn emitted by
this instance, switching to the next stream when the current stream
emits either an end or close event. After all the streams have
been "consumed" in this fashion, this instance emits an end and then
a close event.
If a stream should emit an error event, then that event is in turn
emitted by this instance, after which this instance emits a close
event. It will then become closed (emitting no further events, and
producing false for cat.readable).
This class recognizes all three of the common options (see above), and no others.
The constructed instance obeys the full standard Node stream protocol for readers.
cat.setIncomingEncoding(name)
Sets the incoming encoding of the stream. This is the encoding to use
when interpreting strings that arrive in data events. (This is as
opposed to the encoding set by setEncoding() which determines how
the collected data is transformed as it gets emitted from an
instance.)
The name must be one of the unified allowed encoding names for
Stream.setEncoding().
The incoming encoding starts out as undefined, which is taken to
be synonymous with "utf8" should a data event be received
containing a string payload.
Dropper
The Dropper class is a bufferer of readable stream events, which
relays those events in fixed size blocks (or multiples thereof),
a.k.a. "drops" (hence the name). It handles pause/resume semantics,
and it will always translate incoming values that aren't buffers into
buffers, using a specified and settable incoming encoding.
The only exception to the block size is that the last data event
from a Dropper may have a smaller size, if t
Related Skills
node-connect
352.9kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
frontend-design
111.5kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
openai-whisper-api
352.9kTranscribe audio via OpenAI Audio Transcriptions API (Whisper).
qqbot-media
352.9kQQBot 富媒体收发能力。使用 <qqmedia> 标签,系统根据文件扩展名自动识别类型(图片/语音/视频/文件)。
