SkillAgentSearch skills...

Asgarde

Asgarde allows simplifying error handling with Apache Beam Java, with less code, more concise and expressive code.

Install / Use

/learn @tosun-si/Asgarde
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

Logo

Asgarde

This module allows simplifying error handling with Apache Beam Java.

Versions compatibility between Beam and Asgarde

<div style="max-height: 200px; overflow-y: auto; overflow-x: auto;">

| Asgarde | Beam | |---------|--------| | 0.10.0 | 2.31.0 | | 0.11.0 | 2.32.0 | | 0.12.0 | 2.33.0 | | 0.13.0 | 2.34.0 | | 0.14.0 | 2.35.0 | | 0.15.0 | 2.36.0 | | 0.16.0 | 2.37.0 | | 0.17.0 | 2.38.0 | | 0.18.0 | 2.39.0 | | 0.19.0 | 2.40.0 | | 0.20.0 | 2.41.0 | | 0.21.0 | 2.42.0 | | 0.22.0 | 2.43.0 | | 0.23.0 | 2.44.0 | | 0.24.0 | 2.45.0 | | 0.25.0 | 2.46.0 | | 0.26.0 | 2.47.0 | | 0.27.0 | 2.48.0 | | 0.28.0 | 2.49.0 | | 0.29.0 | 2.50.0 | | 0.30.0 | 2.51.0 | | 0.31.0 | 2.52.0 | | 0.32.0 | 2.53.0 | | 0.33.0 | 2.54.0 | | 0.34.0 | 2.55.0 | | 0.35.0 | 2.56.0 | | 0.36.0 | 2.57.0 | | 0.37.0 | 2.58.0 | | 0.38.0 | 2.59.0 | | 0.39.0 | 2.60.0 | | 0.40.0 | 2.61.0 | | 0.41.0 | 2.62.0 | | 0.42.0 | 2.63.0 | | 0.43.0 | 2.64.0 | | 0.44.0 | 2.65.0 | | 0.45.0 | 2.66.0 | | 0.46.0 | 2.67.0 | | 0.47.0 | 2.68.0 | | 0.48.0 | 2.69.0 | | 0.49.0 | 2.70.0 |

</div>

Installation of project

The project is hosted on Maven repository.
You can install it with all the build tools compatibles with Maven.

Example with Maven and Gradle :

Maven

<dependency>
    <groupId>fr.groupbees</groupId>
    <artifactId>asgarde</artifactId>
    <version>0.49.0</version>
</dependency>

Gradle

implementation group: 'fr.groupbees', name: 'asgarde', version: '0.49.0'

Error logic with Beam ParDo and DoFn

Beam recommends treating errors with Dead letters.
It means catching errors in the flow and, using side outputs, sinking errors to a file, database or any other output...

Beam suggests handling side outputs with TupleTags in a DoFn class, example :

// Failure object.
public class Failure implements Serializable {
    private final String pipelineStep;
    private final Integer inputElement;
    private final Throwable exception;

    public static <T> Failure from(final String pipelineStep,
                                   final T element,
                                   final Throwable exception) {
        return new Failure(pipelineStep, element.toString(), exception);
    }
}

// Word count DoFn class.
public class WordCountFn extends DoFn<String, Integer> {

     private final TupleTag<Integer> outputTag = new TupleTag<Integer>() {};
     private final TupleTag<Failure> failuresTag = new TupleTag<Failure>() {};
    
     @ProcessElement
     public void processElement(ProcessContext ctx) {
        try {
            // Could throw ArithmeticException.
            final String word = ctx.element();
            ctx.output(1 / word.length());
        } catch (Throwable throwable) {
            final Failure failure = Failure.from("step", ctx.element(), throwable);
            ctx.output(failuresTag, failure);
        }
    }
    
    public TupleTag<Integer> getOutputTag() {
       return outputTag;
    }
    
    public TupleTag<Failure> getFailuresTag() {
       return failuresTag;
    }
}
// In Beam pipeline flow.
final PCollection<String> wordPCollection....

final WordCountFn wordCountFn = new WordCountFn();

final PCollectionTuple tuple = wordPCollection
           .apply("ParDo", ParDo.of(wordCountFn).withOutputTags(wordCountFn.getOutputTag(), TupleTagList.of(wordCountFn.getFailuresTag())));

// Output PCollection via outputTag.
PCollection<Integer> outputCollection = tuple.get(wordCountFn.getOutputTag());

// Failures PCollection via failuresTag.
PCollection<Failure> failuresCollection = tuple.get(wordCountFn.getFailuresTag());

With this approach we can, in all steps, get the output and failures result PCollections.

Error logic with Beam MapElements and FlatMapElements

Beam also allows handling errors with built-in components like MapElements and FlatMapElements (it's currently an experimental feature as of april of 2020).

Behind the scene, in these classes Beam use the same concept explained above.

Example:

public class Failure implements Serializable {
    private final String pipelineStep;
    private final String inputElement;
    private final Throwable exception;

    public static <T> Failure from(final String pipelineStep,
                                   final WithFailures.ExceptionElement<T> exceptionElement) {
        final T inputElement = exceptionElement.element();
        return new Failure(pipelineStep, inputElement.toString(), exceptionElement.exception());
    }
}

// In Beam pipeline flow.
final PCollection<String> wordPCollection....

WithFailures.Result<PCollection<Integer>, Failure> result = wordPCollection
   .apply("Map", MapElements
           .into(TypeDescriptors.integers())
           .via((String word) -> 1 / word.length())  // Could throw ArithmeticException
           .exceptionsInto(TypeDescriptor.of(Failure.class))
           .exceptionsVia(exElt -> Failure.from("step", exElt))
   );

PCollection<String> output = result.output();
PCollection<Failure> failures = result.failures();

The logic is the same for FlatMapElements :

final PCollection<String> wordPCollection....

WithFailures.Result<PCollection<String>, Failure>> result = wordPCollection
   .apply("FlatMap", FlatMapElements
           .into(TypeDescriptors.strings())
           .via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5)))
           .exceptionsInto(TypeDescriptor.of(Failure.class))
           .exceptionsVia(exElt -> Failure.from("step", exElt))
   )

PCollection<String> output = result.output();
PCollection<Failure> failures = result.failures();

Comparison between approaches

Usual Beam pipeline

In a usual Beam pipeline flow, steps are chained fluently:

final PCollection<Integer> outputPCollection = inputPCollection
                 .apply("Map", MapElements .into(TypeDescriptors.strings()).via((String word) -> word + "Test"))
                 .apply("FlatMap", FlatMapElements
                                         .into(TypeDescriptors.strings())
                                         .via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5))))
                 .apply("Map ParDo", ParDo.of(new WordCountFn()));

Usual Beam pipeline with error handling

Here's the same flow with error handling in each step:

WithFailures.Result<PCollection<String>, Failure> result1 = input
        .apply("Map", MapElements
                        .into(TypeDescriptors.strings())
                        .via((String word) -> word + "Test")
                        .exceptionsInto(TypeDescriptor.of(Failure.class))
                        .exceptionsVia(exElt -> Failure.from("step", exElt)));

final PCollection<String> output1 = result1.output();
final PCollection<Failure> failure1 = result1.failures();

WithFailures.Result<PCollection<String>, Failure> result2 = output1
        .apply("FlatMap", FlatMapElements
                            .into(TypeDescriptors.strings())
                            .via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5)))
                            .exceptionsInto(TypeDescriptor.of(Failure.class))
                            .exceptionsVia(exElt -> Failure.from("step", exElt)));

final PCollection<String> output2 = result1.output();
final PCollection<Failure> failure2 = result1.failures();

final PCollectionTuple result3 = output2
        .apply("Map ParDo", ParDo.of(wordCountFn).withOutputTags(wordCountFn.getOutputTag(), TupleTagList.of(wordCountFn.getFailuresTag())));

final PCollection<Integer> output3 = result3.get(wordCountFn.getOutputTag());
final PCollection<Failure> failure3 = result3.get(wordCountFn.getFailuresTag());

final PCollection<Failure> allFailures = PCollectionList
        .of(failure1)
        .and(failure2)
        .and(failure3)
        .apply(Flatten.pCollections());

Problems with this approach:

  • We loose the native fluent style on apply chains, because we have to handle output and error for each step.
  • For MapElements and FlatMapElements we have to always add exceptionsInto and exceptionsVia (can be centralized).
  • For each custom DoFn, we have to duplicate the code of TupleTag logic and the try catch block (can be centralized).
  • The code is verbose.
  • There is no centralized code to concat all the errors, we have to concat all failures (can be centralized).

Usual Beam pipeline with error handling using Asgarde

Here's the same flow with error handling, but using this library instead:

final WithFailures.Result<PCollection<Integer>, Failure> resultComposer = CollectionComposer.of(input)
        .apply("Map", MapElements.into(TypeDescriptors.strings()).via((String word) -> word + "Test"))
        .apply("FlatMap", FlatMapElements
                .into(TypeDescriptors.strings())
                .via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5))))
        .apply("ParDo", MapElementFn.into(TypeDescriptors.integers()).via(word -> 1 / word.length()))
        .getResult();

Some explanations:

  • The CollectionComposer class allows to centralize all the error logic, fluently compose the applies and concat all the failures occurring in the flow.
  • For MapElements and FlatMapElements, behind the scene, the apply method adds exceptionsInto and exceptionsVia on Failure object. We can also explicitely use exceptionsInto and exceptionsVia if needed, if you have some custom logic based on Failure object.
  • The MapElementFn class is a custom DoFn class internally wraps the shared logic for DoFn like try/catch block and Tuple tags. We will detail concepts in the next sections.

Purpose of the library

  • Wrap all error handling logic in a composer class.
  • Wrap exceptionsInto and exceptionsVia usage in the native Beam classes `MapEleme

Related Skills

View on GitHub
GitHub Stars88
CategoryDevelopment
Updated1mo ago
Forks6

Languages

Java

Security Score

100/100

Audited on Feb 3, 2026

No findings