Workflow
Convex component for durably executing workflows.
Install / Use
/learn @get-convex/WorkflowREADME
Convex Durable Workflows
<!-- START: Include on https://convex.dev/components -->The Workflow component allows you to write durable functions: code that orchestrates potentially-long-lived operations reliably, even in the face of server restarts.
It can pause indefinitely while it waits for an asynchronous event or sleep for an arbitrary amount of time, without consuming any resources in the interim.
Step execution can be determined dynamically with branching, loops, try/catch and more, all via regular code. Local variables, for-loops, console logging, etc. work as you'd expect, even though under the hood the function will be suspended and re-hydrated between steps.
Steps are regular Convex queries, mutations, or actions, and can run
sequentially or in parallel (e.g. Promise.all), using output from previous
steps or local variables. The overall workflow and each step has type-safe and
runtime-validated arguments and return value.
Workflows can be canceled and restarted from an arbitrary step, allowing you to recover failed workflows after third party outages or fixing your code.
The status can be observed by many users simultaneously via regular reactive-by-default Convex queries, and the history of each step's execution is likewise live-updating.
Retry behavior for each action step is configurable, mutations have
exactly-once execution (ignoring rollbacks due to database conflicts, which
are automatically retried), and the overall workflow is guaranteed to run to
completion, with exactly-once execution of an onComplete handler.
Uses a Workpool under the hood to enable parallelism limits for steps, to avoid spikes of asynchronous work consuming too many resources.
import { WorkflowManager } from "@convex-dev/workflow";
import { components } from "./_generated/api";
export const workflow = new WorkflowManager(components.workflow);
export const userOnboarding = workflow.define({
args: {
userId: v.id("users"),
},
handler: async (step, args): Promise<void> => {
let result = await step.runAction(
internal.llm.generateCustomContent,
{ userId: args.userId },
// Retry this on transient errors with the default retry policy.
{ retry: true },
);
while (result.requiresRefinement) {
// Run a whole workflow as a single step.
result = await step.runWorkflow(internal.llm.refineContentWorkflow, {
userId: args.userId,
currentResult: result,
});
}
const email = await step.runMutation(
internal.emails.sendWelcomeEmail,
{ userId: args.userId, content: result.content, },
// Optimization: run the mutation synchronously from this transaction.
{ inline: true },
);
if (email.status === "needsVerification") {
// Waits until verification is completed asynchronously.
await step.awaitEvent({ name: "emailHasBeenVerified" });
}
for (let i = 0; i < 3; i++) {
const status = await step.runMutation(
internal.emails.sendFollowUpEmailMaybe,
{ userId: args.userId },
// Runs one day after the previous step.
{ runAfter: 24 * 60 * 60 * 1000 },
);
if (!status.ok) break;
}
},
});
How it works
The workflow tracks each step as it goes, executing steps asynchronously, and
resuming the workflow's handler where it left off when the step completes. If a
step fails, it will either retry based on the configured policy, or throw a
catch-able exception in the workflow handler, allowing graceful recovery.
While steps are executing, the workflow handler is not running. When it is time
to run the next step, it re-executes the code, deterministically replaying the
history until it finds the next step. The workflow itself is also run via the
Workpool, so exceptions thrown within the workflow will get delivered to the
onComplete handler.
Installation
First, add @convex-dev/workflow to your Convex project:
npm install @convex-dev/workflow
Then, install the component within your convex/convex.config.ts file:
// convex/convex.config.ts
import workflow from "@convex-dev/workflow/convex.config.js";
import { defineApp } from "convex/server";
const app = defineApp();
app.use(workflow);
export default app;
Finally, create a workflow manager within your convex/ folder, and point it to
the installed component:
// convex/index.ts
import { WorkflowManager } from "@convex-dev/workflow";
import { components } from "./_generated/api";
export const workflow = new WorkflowManager(components.workflow);
Usage
The first step is to define a workflow using workflow.define(). This function
is designed to feel like a Convex action but with a few restrictions:
- The workflow runs in the background, so the result can't be directly returned to whatever code starts it.
- The workflow must be deterministic, so it should implement most of its
logic by calling out to other Convex functions. We restrict access to some
non-deterministic functions like
fetch, env vars andcrypto. Others we patch, such asconsolefor logging,Math.random()(seeded PRNG) andDatefor time.
Note: To help avoid type cycles, always annotate the return type of the
handler with the return type of the workflow.
export const exampleWorkflow = workflow.define({
args: { exampleArg: v.string() },
returns: v.string(),
handler: async (step, args): Promise<string> => {
// ^ Specify the return type of the handler
const queryResult = await step.runQuery(
internal.example.exampleQuery,
args,
);
const actionResult = await step.runAction(
internal.example.exampleAction,
{ queryResult }, // pass in results from previous steps!
);
return actionResult;
},
});
export const exampleQuery = internalQuery({
args: { exampleArg: v.string() },
handler: async (ctx, args) => {
return `The query says... Hi ${args.exampleArg}!`;
},
});
export const exampleAction = internalAction({
args: { queryResult: v.string() },
handler: async (ctx, args) => {
return args.queryResult + " The action says... Hi back!";
},
});
Starting a workflow
Once you've defined a workflow, you can start it from a mutation or action using
workflow.start().
export const kickoffWorkflow = mutation({
handler: async (ctx) => {
const workflowId = await workflow.start(
ctx,
internal.example.exampleWorkflow,
{ exampleArg: "James" },
);
},
});
Handling the workflow's result with onComplete
You can handle the workflow's result with onComplete. This is useful for
cleaning up any resources used by the workflow.
Note: when you return things from a workflow, you'll need to specify the return
type of your handler to break type cycles due to using internal.* functions
in the body, which then inform the type of the workflow, which is included in
the internal.* type.
You can also specify a returns validator to do runtime validation on the
return value. If it fails, your onComplete handler will be called with an
error instead of success. You can also do validation in the onComplete handler
to have more control over handling that situation.
import { vWorkflowId } from "@convex-dev/workflow";
import { vResultValidator } from "@convex-dev/workpool";
export const foo = mutation({
handler: async (ctx) => {
const name = "James";
const workflowId = await workflow.start(
ctx,
internal.example.exampleWorkflow,
{ name },
{
onComplete: internal.example.handleOnComplete,
context: name, // can be anything
},
);
},
});
export const handleOnComplete = mutation({
args: {
workflowId: vWorkflowId,
result: vResultValidator,
context: v.any(), // used to pass through data from the start site.
},
handler: async (ctx, args) => {
const name = (args.context as { name: string }).name;
if (args.result.kind === "success") {
const text = args.result.returnValue;
console.log(`${name} result: ${text}`);
} else if (args.result.kind === "error") {
console.error("Workflow failed", args.result.error);
} else if (args.result.kind === "canceled") {
console.log("Workflow canceled", args.context);
}
},
});
Running steps in parallel
You can run steps in parallel by calling step.runAction() multiple times in a
Promise.all() call.
export const exampleWorkflow = workflow.define({
args: { name: v.string() },
handler: async (step, args): Promise<void> => {
const [result1, result2] = await Promise.all([
step.runAction(internal.example.myAction, args),
step.runAction(internal.example.myAction, args),
]);
},
});
Note: The workflow will not proceed until all steps fired off at once have completed. Note: if you are starting many tasks at once, it will only start the first 10 (or maxParallelism) at once, to prevent one workflow from starving others. It will start the next batch when all 10 have finished.
Specifying retry behavior
Sometimes actions fail due to transient errors, whether it was an unreliable third-party API or a server restart. You can have the workflow automatically retry actions using best practices (exponential backoff & jitter). By default there are no retries on actions, and if the exception isn't caught in the workflow, it will call the onComplete as a failure. Note: all queries and mutations (including the workflow handler) are retried automatically by Convex on system errors, and no transaction will either partially commit or commit twice (regular Convex guarantees).
You can specify default retry behavior for all workflows on the WorkflowManager, or override it on
