Asgarde
Asgarde allows simplifying error handling with Apache Beam Java, with less code, more concise and expressive code.
Install / Use
/learn @tosun-si/AsgardeREADME

Asgarde
This module allows simplifying error handling with Apache Beam Java.
Versions compatibility between Beam and Asgarde
<div style="max-height: 200px; overflow-y: auto; overflow-x: auto;">| Asgarde | Beam | |---------|--------| | 0.10.0 | 2.31.0 | | 0.11.0 | 2.32.0 | | 0.12.0 | 2.33.0 | | 0.13.0 | 2.34.0 | | 0.14.0 | 2.35.0 | | 0.15.0 | 2.36.0 | | 0.16.0 | 2.37.0 | | 0.17.0 | 2.38.0 | | 0.18.0 | 2.39.0 | | 0.19.0 | 2.40.0 | | 0.20.0 | 2.41.0 | | 0.21.0 | 2.42.0 | | 0.22.0 | 2.43.0 | | 0.23.0 | 2.44.0 | | 0.24.0 | 2.45.0 | | 0.25.0 | 2.46.0 | | 0.26.0 | 2.47.0 | | 0.27.0 | 2.48.0 | | 0.28.0 | 2.49.0 | | 0.29.0 | 2.50.0 | | 0.30.0 | 2.51.0 | | 0.31.0 | 2.52.0 | | 0.32.0 | 2.53.0 | | 0.33.0 | 2.54.0 | | 0.34.0 | 2.55.0 | | 0.35.0 | 2.56.0 | | 0.36.0 | 2.57.0 | | 0.37.0 | 2.58.0 | | 0.38.0 | 2.59.0 | | 0.39.0 | 2.60.0 | | 0.40.0 | 2.61.0 | | 0.41.0 | 2.62.0 | | 0.42.0 | 2.63.0 | | 0.43.0 | 2.64.0 | | 0.44.0 | 2.65.0 | | 0.45.0 | 2.66.0 | | 0.46.0 | 2.67.0 | | 0.47.0 | 2.68.0 | | 0.48.0 | 2.69.0 | | 0.49.0 | 2.70.0 |
</div>Installation of project
The project is hosted on Maven repository.
You can install it with all the build tools compatibles with Maven.
Example with Maven and Gradle :
Maven
<dependency>
<groupId>fr.groupbees</groupId>
<artifactId>asgarde</artifactId>
<version>0.49.0</version>
</dependency>
Gradle
implementation group: 'fr.groupbees', name: 'asgarde', version: '0.49.0'
Error logic with Beam ParDo and DoFn
Beam recommends treating errors with Dead letters.
It means catching errors in the flow and, using side outputs, sinking errors to a file, database or any other output...
Beam suggests handling side outputs with TupleTags in a DoFn class, example :
// Failure object.
public class Failure implements Serializable {
private final String pipelineStep;
private final Integer inputElement;
private final Throwable exception;
public static <T> Failure from(final String pipelineStep,
final T element,
final Throwable exception) {
return new Failure(pipelineStep, element.toString(), exception);
}
}
// Word count DoFn class.
public class WordCountFn extends DoFn<String, Integer> {
private final TupleTag<Integer> outputTag = new TupleTag<Integer>() {};
private final TupleTag<Failure> failuresTag = new TupleTag<Failure>() {};
@ProcessElement
public void processElement(ProcessContext ctx) {
try {
// Could throw ArithmeticException.
final String word = ctx.element();
ctx.output(1 / word.length());
} catch (Throwable throwable) {
final Failure failure = Failure.from("step", ctx.element(), throwable);
ctx.output(failuresTag, failure);
}
}
public TupleTag<Integer> getOutputTag() {
return outputTag;
}
public TupleTag<Failure> getFailuresTag() {
return failuresTag;
}
}
// In Beam pipeline flow.
final PCollection<String> wordPCollection....
final WordCountFn wordCountFn = new WordCountFn();
final PCollectionTuple tuple = wordPCollection
.apply("ParDo", ParDo.of(wordCountFn).withOutputTags(wordCountFn.getOutputTag(), TupleTagList.of(wordCountFn.getFailuresTag())));
// Output PCollection via outputTag.
PCollection<Integer> outputCollection = tuple.get(wordCountFn.getOutputTag());
// Failures PCollection via failuresTag.
PCollection<Failure> failuresCollection = tuple.get(wordCountFn.getFailuresTag());
With this approach we can, in all steps, get the output and failures result PCollections.
Error logic with Beam MapElements and FlatMapElements
Beam also allows handling errors with built-in components like MapElements and FlatMapElements (it's currently an experimental feature as of april of 2020).
Behind the scene, in these classes Beam use the same concept explained above.
Example:
public class Failure implements Serializable {
private final String pipelineStep;
private final String inputElement;
private final Throwable exception;
public static <T> Failure from(final String pipelineStep,
final WithFailures.ExceptionElement<T> exceptionElement) {
final T inputElement = exceptionElement.element();
return new Failure(pipelineStep, inputElement.toString(), exceptionElement.exception());
}
}
// In Beam pipeline flow.
final PCollection<String> wordPCollection....
WithFailures.Result<PCollection<Integer>, Failure> result = wordPCollection
.apply("Map", MapElements
.into(TypeDescriptors.integers())
.via((String word) -> 1 / word.length()) // Could throw ArithmeticException
.exceptionsInto(TypeDescriptor.of(Failure.class))
.exceptionsVia(exElt -> Failure.from("step", exElt))
);
PCollection<String> output = result.output();
PCollection<Failure> failures = result.failures();
The logic is the same for FlatMapElements :
final PCollection<String> wordPCollection....
WithFailures.Result<PCollection<String>, Failure>> result = wordPCollection
.apply("FlatMap", FlatMapElements
.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5)))
.exceptionsInto(TypeDescriptor.of(Failure.class))
.exceptionsVia(exElt -> Failure.from("step", exElt))
)
PCollection<String> output = result.output();
PCollection<Failure> failures = result.failures();
Comparison between approaches
Usual Beam pipeline
In a usual Beam pipeline flow, steps are chained fluently:
final PCollection<Integer> outputPCollection = inputPCollection
.apply("Map", MapElements .into(TypeDescriptors.strings()).via((String word) -> word + "Test"))
.apply("FlatMap", FlatMapElements
.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5))))
.apply("Map ParDo", ParDo.of(new WordCountFn()));
Usual Beam pipeline with error handling
Here's the same flow with error handling in each step:
WithFailures.Result<PCollection<String>, Failure> result1 = input
.apply("Map", MapElements
.into(TypeDescriptors.strings())
.via((String word) -> word + "Test")
.exceptionsInto(TypeDescriptor.of(Failure.class))
.exceptionsVia(exElt -> Failure.from("step", exElt)));
final PCollection<String> output1 = result1.output();
final PCollection<Failure> failure1 = result1.failures();
WithFailures.Result<PCollection<String>, Failure> result2 = output1
.apply("FlatMap", FlatMapElements
.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5)))
.exceptionsInto(TypeDescriptor.of(Failure.class))
.exceptionsVia(exElt -> Failure.from("step", exElt)));
final PCollection<String> output2 = result1.output();
final PCollection<Failure> failure2 = result1.failures();
final PCollectionTuple result3 = output2
.apply("Map ParDo", ParDo.of(wordCountFn).withOutputTags(wordCountFn.getOutputTag(), TupleTagList.of(wordCountFn.getFailuresTag())));
final PCollection<Integer> output3 = result3.get(wordCountFn.getOutputTag());
final PCollection<Failure> failure3 = result3.get(wordCountFn.getFailuresTag());
final PCollection<Failure> allFailures = PCollectionList
.of(failure1)
.and(failure2)
.and(failure3)
.apply(Flatten.pCollections());
Problems with this approach:
- We loose the native fluent style on apply chains, because we have to handle output and error for each step.
- For
MapElementsandFlatMapElementswe have to always addexceptionsIntoandexceptionsVia(can be centralized). - For each custom DoFn, we have to duplicate the code of
TupleTaglogic and the try catch block (can be centralized). - The code is verbose.
- There is no centralized code to concat all the errors, we have to concat all failures (can be centralized).
Usual Beam pipeline with error handling using Asgarde
Here's the same flow with error handling, but using this library instead:
final WithFailures.Result<PCollection<Integer>, Failure> resultComposer = CollectionComposer.of(input)
.apply("Map", MapElements.into(TypeDescriptors.strings()).via((String word) -> word + "Test"))
.apply("FlatMap", FlatMapElements
.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5))))
.apply("ParDo", MapElementFn.into(TypeDescriptors.integers()).via(word -> 1 / word.length()))
.getResult();
Some explanations:
- The
CollectionComposerclass allows to centralize all the error logic, fluently compose the applies and concat all the failures occurring in the flow. - For
MapElementsandFlatMapElements, behind the scene, theapplymethod addsexceptionsIntoandexceptionsViaonFailureobject. We can also explicitely useexceptionsIntoandexceptionsViaif needed, if you have some custom logic based onFailureobject. - The
MapElementFnclass is a customDoFnclass internally wraps the shared logic forDoFnlike try/catch block and Tuple tags. We will detail concepts in the next sections.
Purpose of the library
- Wrap all error handling logic in a composer class.
- Wrap
exceptionsIntoandexceptionsViausage in the native Beam classes `MapEleme
Related Skills
node-connect
338.0kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
frontend-design
83.4kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
openai-whisper-api
338.0kTranscribe audio via OpenAI Audio Transcriptions API (Whisper).
commit-push-pr
83.4kCommit, push, and open a PR
