SkillAgentSearch skills...

DataflowEx

A .NET dataflow and etl framework built upon Microsoft TPL Dataflow library

Install / Use

/learn @gridsum/DataflowEx
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

Welcome to DataflowEx

Gridsum.DataflowEx is a high-level dataflow framework redesigned on top of Microsoft TPL Dataflow library with Object-Oriented Programming in mind. It does not replace TPL Dataflow but provides reusability, abstraction and management over underlying dataflow blocks to make your life easier. You can get compiled binaries on nuget.org.

Update: Gridsum.DataflowEx v2.0 has been released! This new major version fully supports .NET standard 2.0, which means you are able to utilize the library in a wide range of platforms including .Net Core, Xamarin and UWP.

Here is a list of DataflowEx cool features:

  • Block chain/graph encapsulation as a reusable component
  • Inheritance and polymorphism for dataflows and their hehaviors
  • Sophisticated dataflow lifecycle management (e.g. Failure propagation, dynamic flow expansion)
  • Advanced dataflow chaining/linking
  • Extensive logging and built-in dataflow health monitor
  • Dataflow event counting and aggregation
  • Auto completion support for circular dataflow graph
  • Dataflow-friendly sql bulk inserter
  • Dataflow-friendly ETL dimension lookup component
  • Compatibility and integration with TPL Dataflow

Interested? O.K. Let's start the DataflowEx tour.

1. Prerequisites

If you are not familiar with TPL Dataflow yet, please take your time to watch two videos:

Beginner (15min): http://channel9.msdn.com/posts/TPL-Dataflow-Tour

Advanced (63min): https://channel9.msdn.com/Shows/Going+Deep/Stephen-Toub-Inside-TPL-Dataflow

2. Background

The very first question you may ask: what's wrong with TPL Dataflow? Nothing. The library from Microsoft library looks simply great. However, in the tough real world there are some obstacles when we apply RAW TPL Dataflow. Let's look at an example:

var splitter = new TransformBlock<string, KeyValuePair<string, int>>(
    input =>
        {
            var splitted = input.Split('=');
            return new KeyValuePair<string, int>(splitted[0], int.Parse(splitted[1]));
        });

var dict = new Dictionary<string, int>();
var aggregater = new ActionBlock<KeyValuePair<string, int>>(
    pair =>
        {
            int oldValue;
            dict[pair.Key] = dict.TryGetValue(pair.Key, out oldValue) ? oldValue + pair.Value : pair.Value;
        });

splitter.LinkTo(aggregater, new DataflowLinkOptions() { PropagateCompletion = true});

splitter.Post("a=1");
splitter.Post("b=2");
splitter.Post("a=5");

splitter.Complete();
aggregater.Completion.Wait();
Console.WriteLine("sum(a) = {0}", dict["a"]); //prints sum(a) = 6

A wonderful Dataflow demo, right? A splitter block who cuts kv pair strings connects to an aggregator block who sums value for every key. So far so good if we need this dataflow only once. But what if I need the same dataflow graph somewhere else in my application? Or, expose the same functionality as reusable components in a library?

Things are getting complicated. Obviously Copy&Paste is not an acceptable choice. What about put everything about the dataflow construction in a static method? Hmmm, this is a step forward to reuse the code but, what should be the return value of the static method as the handle of the graph? Returning the head block helps posting new data to the pipeline but then we miss the tail block which we need to wait completion on. Not to mention the <kbd>dict</kbd> variable which contains our state/data... Last but not least, static method is an anti-pattern for testing as you can hardly change any behavior of underlying blocks.

Clearly we need a class representing the graph and being the handle of all the stakeholders. Object oriented design is a perfect fit here to solve all problems mentioned above. That is why we gave birth to Gridsum.DataflowEx.

3. Introduction to DataflowEx

Code tells a lot. Let's migrate the above example to DataflowEx and see what it looks like:

using Gridsum.DataflowEx;
using System.Threading.Tasks.Dataflow;

public class AggregatorFlow : Dataflow<string>
{
    //Blocks
    private TransformBlock<string, KeyValuePair<string, int>> _splitter; 
    private ActionBlock<KeyValuePair<string, int>> _aggregater;

    //Data
    private Dictionary<string, int> _dict;

    public AggregatorFlow() : base(DataflowOptions.Default)
    {
        _splitter = new TransformBlock<string, KeyValuePair<string, int>>(s => this.Split(s));
        _dict = new Dictionary<string, int>();
        _aggregater = new ActionBlock<KeyValuePair<string, int>>(p => this.Aggregate(p));

        //Block linking
        _splitter.LinkTo(_aggregater, new DataflowLinkOptions() { PropagateCompletion = true });

        /* IMPORTANT */
        RegisterChild(_splitter);
        RegisterChild(_aggregater);
    }

    protected virtual void Aggregate(KeyValuePair<string, int> pair)
    {
        int oldValue;
        _dict[pair.Key] = this._dict.TryGetValue(pair.Key, out oldValue) ? oldValue + pair.Value : pair.Value;
    }

    protected virtual KeyValuePair<string, int> Split(string input)
    {
        string[] splitted = input.Split('=');
        return new KeyValuePair<string, int>(splitted[0], int.Parse(splitted[1]));
    }

    public override ITargetBlock<string> InputBlock { get { return _splitter; } }

    public IDictionary<string, int> Result { get { return _dict; } }
}

Though there seems to be more code, it is quite clear. We have a class AggregatorFlow representing our flow finally, which inherits from Dataflow<TIn> with type parameter string. This means the AggregatorFlow class reprensents a dataflow graph itself and accepts strings as input.

In this form, dataflow blocks and data become class members. Block behaviors become class methods (which allows the outside to override!). We also implemented the abstract <kbd>InputBlock</kbd> property of Dataflow<TIn> and exposes our internal data as an extra Result property.

There are two important calls to RegisterChild() in the constructor. We will come back to this later.

Now let's come to the consumer side of the AggregatorFlow class:

var aggregatorFlow = new AggregatorFlow();
aggregatorFlow.InputBlock.Post("a=1");
aggregatorFlow.InputBlock.Post("b=2");
aggregatorFlow.InputBlock.Post("a=5");
aggregatorFlow.InputBlock.Complete();
await aggregatorFlow.CompletionTask;
Console.WriteLine("sum(a) = {0}", aggregatorFlow.Result["a"]); //prints sum(a) = 6

You see that we now operate on a single instance of AggregatorFlow, without knowing the implementation detail of it. This gives you the possibility to encapsulate complex dataflow logic in your Dataflow class and pass on your consumers a clean high-level view of the graph.

Note: The Dataflow class exposes a CompletionTask property (just like IDataflowBlock.Completion) to represent the life of the whole dataflow. The whole dataflow won't complete till every single child block in the flow completes . Here in the example we await on the task to make sure the dataflow completes. More on this topic below.

By the way, Dataflow<TIn> provides some helper methods to boost productivity. To achieve the same effect:

var aggregatorFlow = new AggregatorFlow();
await aggregatorFlow.ProcessAsync(new[] { "a=1", "b=2", "a=5" }, completeFlowOnFinish:true);
Console.WriteLine("sum(a) = {0}", aggregatorFlow.Result["a"]); //prints sum(a) = 6

It is now that easy with <kbd>ProcessAsync</kbd> as DataflowEx handles the tedious Post-and-complete boilerplate code for you :)

This is the basic idea of DataflowEx which empowers you with a fully functional handle of your dataflow graph. Find more in the following topics.

4. Understanding DataflowEx design

Just like IDataflowBlock is the fundamental piece in TPL Dataflow, IDataflow is the counterpart in DataflowEx library. Take a look at the IDataflow design:

public interface IDataflow
{
    IEnumerable<IDataflowBlock> Blocks { get; }
    Task CompletionTask { get; }
    void Fault(Exception exception);
    string Name { get; }    
}

public interface IDataflow<in TIn> : IDataflow
{
    ITargetBlock<TIn> InputBlock { get; }
}

public interface IOutputDataflow<out TOut> : IDataflow
{
    ISourceBlock<TOut> OutputBlock { get; }
    void LinkTo(IDataflow<TOut> other);
}

public interface IDataflow<in TIn, out TOut> : IDataflow<TIn>, IOutputDataflow<TOut>
{
}

IDataflow looks like IDataflowBlock, doesn't it? Well, remember IDataflow now represents a dataflow graph which may contain one or more low-level blocks. Since a graph may have inputs and outputs, those strongly-typed generic IDataflow interfaces are designed.

Note: If you see IOutputDataflow<TOut>.LinkTo(IDataflow<TOut> other), congratulations to you as you find out the API supports (and encourages) graph level data linking.

So on top of IDataflow there is an implementation called Dataflow, which should be the base class for all DataflowEx flows. Besides acting as the handle of the graph, it has many useful functionalities built-in. Let's explore them one by one.

4.1 Lifecycle management

A key role of the Dataflow base class is to monitor the health of its children and provides a single completion state to the outside, namely the CompletionTask property.

So first things first, we need a way to tell the dataflow who is its child. That is done through the Dataflow.RegisterChild method (you have seen it in the last example). Dataflow class will now keep the reference of the child in its internal data structure and the lifecycle of child will start to affect its parent.

Note: RegisterChild() method is not restricted to be called inside dataflow constructor. In fact, it can be called by outsid

View on GitHub
GitHub Stars250
CategoryDevelopment
Updated9d ago
Forks53

Languages

C#

Security Score

95/100

Audited on Mar 25, 2026

No findings