Propulsion
.NET event stream projection and scheduling platform with CosmosDB, DynamoDB, EventStoreDB, MemoryStore, message-db, Equinox and Kafka integrations
Install / Use
/learn @jet/PropulsionREADME
Propulsion


Propulsion provides a granular suite of .NET NuGet packages for building Reactive event processing pipelines. It caters for:
- Event Sourcing Reactions: Handling projections and reactions based on event feeds from stores such as EventStoreDB and MessageDB, and the Equinox Stores (DynamoStore, CosmosStore, MemoryStore).
- Unit and Integration testing support: The
AwaitCompletionmechanisms inMemoryStoreandFeedSourceprovide a clean way to structure test suites in a manner that achieves high test coverage without flaky tests or slow tests. - Generic Ingestion and Publishing pipelines: The same abstractions can also be used for consuming and/or publishing to any target.
- Serverless event pipelines: The core components do not assume a long-lived process.
- The
DynamoStore-related components implement support for running an end-to-end event sourced system using only Amazon DynamoDB and Lambda without requiring a long-lived host process.
- The
- Strong metrics support: Feed Sources and Projectors provide comprehensive logging and metrics. (At present, the primary integration is with Prometheus, but the mechanism is exposed in a pluggable manner).
If you're looking for a good discussion forum on these kinds of topics, look no further than the DDD-CQRS-ES Discord's #equinox channel (invite link).
Core Components
-
PropulsionImplements core functionality in a channel-independent fashion. Depends on
FSharp.Control.TaskSeq,MathNet.Numerics,Serilog:StreamsSink: High performance pipeline that handles parallelized event processing. Ingestion of events, and checkpointing of progress are handled asynchronously. Each aspect of the pipeline is decoupled such that it can be customized as desired.Streams.Prometheus: Helper that exposes per-scheduler metrics for Prometheus scraping.ParallelProjector: Scaled down variant ofStreamsSinkthat does not preserve stream level ordering semanticsFeedSource: Handles continual reading and checkpointing of events from a set of feeds ('tranches') of a 'source' that collectively represent a change data capture source for a given system (roughly analogous to how a CosmosDB Container presents a changefeed). AreadTranchesfunction is used to identify the Tranches (sub-feeds) on startup. The Feed Source then operates a logical reader thread per Tranche. Tranches represent content as an incrementally retrievable change feed consisting of batches ofFsCodec.ITimelineEventrecords. Each batch has an optional associated checkpointing callback that's triggered only when the Sink has handled all events within it.Monitor.AwaitCompletion: Enables efficient waiting for completion of reaction processing within an integration test.PeriodicSource: Handles regular crawling of an external datasource (such as a SQL database) where there is no way to save progress and then resume from that saved token (based on either the intrinsic properties of the data, or of the store itself). The source is expected to present its content as anIAsyncEnumerableofFsCodec.StreamName * FsCodec.IEventData * context. Checkpointing occurs only when all events have been deemed handled by the Sink.SinglePassFeedSource: Handles single pass loading of large datasets (such as a SQL database), completing when the full data has been ingested.JsonSource: Simple source that feeds items from a File containing JSON (such a file can be generated viaeqx query -o JSONFILE from cosmosetc)
NOTE
Propulsion.Feedis a namespace within the mainPropulsionpackage that provides helpers for checkpointed consumption of a feed of stream-based inputs.- Supported inputs include custom bindings (e.g. a third-party Feed API) or various other input configurations (e.g. periodically correlating with inputs from a non-streamed source such as a SQL Database).
- Provides a generic API for checkpoint storage, with diverse implementations hosted in the sibling packages associated with each concrete store (supported stores include DynamoStore, CosmosStore, SQL Server, Postgres).
- Using a feed normally requires a checkpoint store that inmplements
IFeedCheckpointStorefrom e.g.,Propulsion.CosmosStore|DynamoStore|MessageDb|SqlStreamStore
-
Propulsion.PrometheusProvides helpers for checkpointed consumption of a feed of stream-based inputs. Provides for custom bindings (e.g. a third-party Feed API) or various other input configurations (e.g. periodically correlating with inputs from a non-streamed source such as a SQL Database). Provides a generic API for checkpoint storage, with diverse implementations hosted in the sibling packages associated with each concrete store (supported stores include DynamoStore, CosmosStore, SQL Server, Postgres). Depends on
Propulsion, aIFeedCheckpointStoreimplementation (from e.g.,Propulsion.CosmosStore|DynamoStore|MessageDb|SqlStreamStore)Propulsion.Prometheus: Exposes processing throughput statistics to Prometheus.Propulsion.Feed.Prometheus: Exposes reading statistics to Prometheus (including metrics fromDynamoStore.DynamoStoreSource,EventStoreDb.EventStoreSource,MessageDb.MessageDbSourceandSqlStreamStore.SqlStreamStoreSource).
-
Propulsion.MemoryStore. Provides bindings to
Equinox.MemoryStore. Depends onEquinox.MemoryStorev4.0.0,FsCodec.Box,PropulsionMemoryStoreSource: Presents a Source that adapts anEquinox.MemoryStoreto feed into aPropulsion.Sink. Typically used as part of an overall test suite to enable efficient and deterministic testing where reactions are relevant to a given scenario.Monitor.AwaitCompletion: Enables efficient deterministic waits for Reaction processing within integration or unit tests.ReaderCheckpoint: ephemeral checkpoint storage forPropulsion.DynamoStore/EventStoreDb/Feed/MessageDb/SqlStreamSteamStorein test contexts.
Store-specific Components
-
Propulsion.CosmosStoreProvides bindings to Azure CosmosDB. Depends on
Equinox.CosmosStorev4.0.0CosmosStoreSource: reading from CosmosDb's ChangeFeed usingMicrosoft.Azure.CosmosCosmosStoreSink: writing toEquinox.CosmosStorev4.0.0.CosmosStorePruner: pruning fromEquinox.CosmosStorev4.0.0.ReaderCheckpoint: checkpoint storage forPropulsion.DynamoStore/EventStoreDb/Feed/MessageDb/SqlStreamSteamStoreusingEquinox.CosmosStorev4.0.0.
(Reading and position metrics are exposed via
Propulsion.CosmosStore.Prometheus) -
Propulsion.DynamoStoreProvides bindings to
Equinox.DynamoStore. Depends onEquinox.DynamoStorev4.0.0AppendsIndex/AppendsEpoch:Equinox.DynamoStoreaggregates that together form the DynamoStore IndexDynamoStoreIndexer: writes toAppendsIndex/AppendsEpoch(used byPropulsion.DynamoStore.Indexer,Propulsion.Tool)DynamoStoreSource: reads fromAppendsIndex/AppendsEpoch(seeDynamoStoreIndexer)ReaderCheckpoint: checkpoint storage forPropulsion.DynamoStore/EventStoreDb/Feed/MessageDb/SqlStreamSteamStoreusingEquinox.DynamoStorev4.0.0.
(Reading and position metrics are exposed via
Propulsion.Prometheus) -
Propulsion.DynamoStore.IndexerAWS Lambda to index appends into an Index Table. Depends on
Propulsion.DynamoStore,Amazon.Lambda.Core,Amazon.Lambda.DynamoDBEvents,Amazon.Lambda.Serialization.SystemTextJsonHandler: parses Dynamo DB Streams Source Mapping input, feeds intoPropulsion.DynamoStore.DynamoStoreIndexerConnector: Store / environment variables wiring to connectDynamoStoreIndexerto theEquinox.DynamoStoreIndex Event StoreFunction: AWS Lambda Function that can be fed via a DynamoDB Streams Event Source Mapping; passes
