SkillAgentSearch skills...

RouteMaster

RouteMaster is a .NET library for writing stateful workflows on top of a message bus. It exists to make the implementation of long running business processes easy in event driven systems. Find out more of the history at https://blog.mavnn.co.uk/routemaster-master-your-messaging-routes/

Install / Use

/learn @RouteMasterIntegration/RouteMaster
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

  • RouteMaster

** What is it?

RouteMaster is a .NET library for writing stateful workflows on top of a message bus. It exists to make the implementation of long running business processes easy in event driven systems.

Much of the thinking behind RouteMaster is inspired by the book [[http://www.enterpriseintegrationpatterns.com][Enterprise Integration Patterns]], and it works best when:

  • the most appropriate for integration style for your applications is [[http://www.enterpriseintegrationpatterns.com/patterns/messaging/IntegrationStylesIntro.html][messaging]]
  • your messaging will be [[http://www.enterpriseintegrationpatterns.com/patterns/messaging/PublishSubscribeChannel.html][Publish - Subscribe]] (one to many) routed by data type (making all channels [[http://www.enterpriseintegrationpatterns.com/patterns/messaging/DatatypeChannel.html][DataType Channels]]), optionally filtered by topic
  • the flow of information between your applications is sufficiently complex that you require [[http://www.enterpriseintegrationpatterns.com/patterns/messaging/ContentBasedRouter.html][Content Based Routers]], [[http://www.enterpriseintegrationpatterns.com/patterns/messaging/DynamicRouter.html][Dynamic Routers]], or full blown stateful [[http://www.enterpriseintegrationpatterns.com/patterns/messaging/ProcessManager.html][Process Managers]].

RouteMaster is currently at an Alpha state and is not ready for production usage

Would you like to see faster progress on RouteMaster? Contact us@mavnn.co.uk to discuss sponsorship.

** What does it look like?

Imagine a workflow for sending emails. It is triggered with an email address to send to and a template name to use.

#+BEGIN_SRC dot :file email_sender.svg :cmdline -Kdot -Tsvg digraph { node [shape=box,style="filled",color="#aaffaa"]

trigger [label="Trigger email send"] template [label="Request template"] getInfo [label="Request user info"] render [label="Render HTML"] send [label="Send Email"]

trigger -> template -> getInfo -> render -> send } #+END_SRC

#+RESULTS: [[file:email_sender.svg]]

There are microservices already written to perform each of these individual functions (there is a template store, a user info store, etc.).

We use the following .NET types as the message types available in our overall system; these are effectively the contract by which we communicate with our services. Here they are defined as F# records to capture the fact that they are [[https://en.wikipedia.org/wiki/Immutable_object][immutable]] [[https://stackoverflow.com/questions/4581579/value-objects-in-ddd-why-immutable][value types]], without needing to write additional equality checks:

#+BEGIN_SRC fsharp // For the template store service

// Request type GetTemplate = { cid : Guid templateId : string }

// Response type Template = { cid : Guid template : string }

// For user info store

// Request, keyed on email address type LookupUserInfo = { cid : Guid address : string }

// Response type UserInfo = { cid : Guid name : string }

// HTML template renderer

// Request with template and "user info" type RenderEmail = { cid : Guid template : string name : string }

// Response type EmailRendered = { cid : Guid content : string }

// Email sending service

// Request type SendEmail = { cid : Guid address : string content : string }

// Response type EmailSent = { cid : Guid success : bool } #+END_SRC

All of these types contain a =cid= field - a convention in our system for storing a [[http://www.enterpriseintegrationpatterns.com/patterns/messaging/CorrelationIdentifier.html][Correlation Identifier]]. All of the applications within our system (or at least, all of the [[http://www.enterpriseintegrationpatterns.com/patterns/messaging/MessageTranslator.html][Message Translators]] exposing these messages) know to publish response messages including the same correlation identifier as received in the triggering message.

To run through this work flow, we'll need a stateful process manager; we'll need to retrieve the current template and user info, combine the two with the rendering service and then use the result (and our initial email address) to actually send our email.

Our state will need to look something like this:

#+BEGIN_SRC fsharp type SendEmailProcessState = { ToAddress : string Template : string option Content : string option UserInfo : string option } #+END_SRC

We'll always have the =ToAddress=, which is the information given to start the process off, but all of the other information will be filled in as we go through.

Now we need to define each step in our workflow. Firstly, let's create a function which takes our initial state and return an =Async<StepResult>= which:

  • Records the fact that we expect a =Template= message soon with a particular correlation ID
  • Requests that RouteMaster sends a =GetTemplate= message with the same correlation ID

#+BEGIN_SRC fsharp let startSendEmailRoute ttl timeout receivedTemplate initialState = async { let getTemplate = { cid = Guid.NewGuid() templateId = "My template" } let cid = getTemplate.cid.ToString() |> CorrelationId return StepResult.pipeline ttl timeout getTemplate cid receivedTemplate } #+END_SRC

But wait! That function takes four arguments - what are the other three?

=tll= ("time to live") is a simple =TimeSpan=. To avoid issues with stale messages and unbounded backlogs, RouteMaster requires that all messages sent and all expected responses have a time limit. For a simple "pipeline" step like this (sends one message, expects one response) the time to live of the message and how long we'll wait for the expected result are defined to be equal.

We cannot define the =timeout= and =receivedTemplate= steps within the function, as the steps to continue a workflow must be "registered" before being used. So for now we'll leave them as function parameters to be passed in later.

Next, we'll be receiving a =Template= message; we need a =Step= which knows how to extract the correlation ID from the message, and what logic to invoke when we receive one we've been expecting.

#+BEGIN_SRC fsharp let receivedTemplate timeout receivedUserInfo = let extract (t : Template) = t.cid.ToString() |> CorrelationId |> Some let invoke (access : StateAccess<_>) (template : Template) = async { let state = access.Update (fun state -> { state with Template = Some template.template }) match state with | Some { ToAddress = a } -> let lookupUserInfo = { cid = Guid.NewGuid() address = a } let cid = lookupUserInfo.cid.ToString() |> CorrelationId return StepResult.pipeline ttl timeout lookupUserInfo cid receivedUserInfo | _ -> printfn "Failed to retrieve state!" return StepResult.cancel } Step.create (StepName "template received") extract invoke #+END_SRC

We'll need a more steps to cover each of the stages of the process, and finally we'll add a timeout step which will receive a =TimeoutMessage= if any step along the way times out. Let's put those together:

#+BEGIN_SRC fsharp let receivedUserInfo receivedEmailRendered timeout = let extract (u : UserInfo) = u.cid.ToString() |> CorrelationId |> Some let invoke (access : StateAccess<_>) (u : UserInfo) = async { let state = access.Update id match state with | Some { Template = Some t } -> let renderEmail = { cid = Guid.NewGuid() template = t name = u.name } let cid = renderEmail.cid.ToString() |> CorrelationId return StepResult.pipeline ttl timeout renderEmail cid receivedEmailRendered | _ -> printfn "Failed to retrieve state!" return StepResult.cancel } Step.create (StepName "user info received") extract invoke

let receivedEmailRendered receivedEmailSent timeout = let extract (er : EmailRendered) = er.cid.ToString() |> CorrelationId |> Some let invoke (access : StateAccess<_>) (er : EmailRendered) = async { let state = access.Update id match state with | Some { ToAddress = a } -> let sendEmail = { cid = Guid.NewGuid() address = a content = er.content } let cid = sendEmail.cid.ToString() |> CorrelationId return StepResult.pipeline ttl timeout sendEmail cid receivedEmailSent | _ -> printfn "Failed to retrieve state!" return StepResult.cancel } Step.create (StepName "an email was rendered") extract invoke

let receivedEmailSent = Step.create (StepName "anEmailWasSent") (fun (es : EmailSent) -> es.cid.ToString() |> CorrelationId |> Some) (fun access (_ : EmailSent) -> async { printfn "Yay! Record I'm done somewhere" printfn "The console sounds a great place!" return StepResult.cancel })

let receivedTimeout = Step.createTimeout (StepName "timeout") (fun _ _ -> async { printfn "I should probably tell someone this happened." printfn "But I'm only demo code." return StepResult.cancel }) #+END_SRC

Now we have all of the steps required to build our "Route". To actually connect everything up (persistent storage, connect to the message bus, etc) we

View on GitHub
GitHub Stars18
CategoryContent
Updated5y ago
Forks0

Languages

F#

Security Score

75/100

Audited on Sep 23, 2020

No findings