Flows
Suspendable, resumable flows with named steps, signals, and devtools tracing.
import {
flow,
withFlow,
signalFlow,
cancelFlow,
listFlows,
createFlowId,
FlowSuspendedError,
FlowCancelledError,
FlowExpiredError,
} from '@crux/core/flow'Overview
Flows group generate() calls into structured pipelines with named steps, automatic devtools tracing, suspend/resume, and retry/fallback. A run opens a canonical flow.run span, and each executed flow.step() opens a canonical flow.step child. Generated spans inside a flow inherit the active observability context for structured pipeline grouping in devtools. Flows can nest; calling flow().run() inside another flow preserves the parent-child relationship in the graph.
Flows support suspend/resume for human-in-the-loop workflows. When flow.suspend() is called, the flow persists its state to the configured CruxStore and returns { status: 'suspended' }. The observability graph records a flow.suspension marker linked to the causing step, and RunDetail presents that marker beside the other flow steps so successful generations and completed steps do not remain visually stuck. Pass { resume: flowId } to .run() to resume a previously suspended flow.
flow(name, handler)
Define a named flow and return a frozen FlowHandle. Separates definition from execution: the handler is captured once, then .run() can be called repeatedly with different inputs and options.
| Parameter | Type | Description |
|---|---|---|
name | string | Human-readable flow name |
handler | (flow: FlowScope<TInput>) => Promise<T> | T | Flow function with access to flow.step(), flow.suspend(), flow.cancel() |
Returns: FlowHandle<T, TInput> -- frozen handle with .run(), .signal(), and .name
import { flow } from '@crux/core/flow'
import { generate } from '@crux/ai'
const researchFlow = flow('research', async (flow) => {
const plan = await flow.step('plan', () => generate(planner, { model, input: { topic: flow.input.topic } }))
return flow.step('search', () => generate(searcher, { model, input: { plan } }))
})
const result = await researchFlow.run({ input: { topic: 'TypeScript monorepos' } })
if (result.status === 'completed') {
console.log(result.output)
}withFlow(name, fn, options?)
Execute a function directly within a flow scope. Lower-level alternative to flow -- useful for one-off flows or when you need full control over options at the call site.
| Parameter | Type | Description |
|---|---|---|
name | string | Human-readable flow name |
fn | (flow: FlowScope<TInput>) => Promise<T> | T | Flow function |
options | WithFlowOptions<TInput>? | Flow ID, resume, goal, input |
Returns: Promise<FlowResult<T>>
FlowHandle<T, TInput>
The frozen handle returned by flow().
| Property / Method | Type | Description |
|---|---|---|
name | string | The flow's registered name |
run(options?) | Promise<FlowResult<T>> | Execute the flow with optional input and runtime options |
signal(flowId, signalName, payload?) | Promise<void> | Send a signal to a suspended instance. Delegates to signalFlow() internally. |
FlowRunOptions<TInput>
Options passed to handle.run().
| Field | Type | Description |
|---|---|---|
input | TInput? | Typed input data, accessible as flow.input inside the flow |
flowId | string? | Use a specific ID for cross-action correlation |
parentFlowId | string? | Explicit parent flow ID for cross-action nesting |
goal | string? | Goal description for devtools display |
resume | string? | Resume a previously suspended flow by its flowId |
FlowScope<TInput>
The scope object passed to the flow handler. Generic over TInput (defaults to void).
| Property / Method | Type | Description |
|---|---|---|
flowId | string | The flow's unique identifier |
input | TInput | Typed input data passed via run options |
results | Record<string, unknown> | Auto-populated after each step completes, keyed by step label |
step(label, fn, options?) | Promise<T> | Execute a named step. fn can be () => T or (flow: FlowScope) => T |
suspend(name, options?) | Promise<T> | Suspend the flow at a named point and wait for an external signal |
waitUntil(name, conditionFn, options?) | Promise<void> | Suspend until a condition function returns true |
cancel(reason?) | never | Cancel the flow with an optional reason |
step(label, fn, options?)
Execute a named step within the flow. Opens a canonical flow.step span and restores observability context so generated child spans nest under the step. On resume, completed steps are replayed from cache (skip-replay) and are not re-emitted.
fn accepts both plain functions () => T and flow-aware functions (flow: FlowScope) => T. Flow-aware functions receive the scope automatically, enabling external step definitions that access flow.input, flow.results, etc.
suspend(name, options?)
Suspend the flow and wait for an external signal. Throws internally to unwind the call stack -- no code after suspend() executes in the current call. The current flow emits a flow.suspension observability marker and rolls the flow/run presentation status to suspended; the causing step can still end ok. On resume, the signal payload is returned (typed if schema is provided).
waitUntil(name, conditionFn, options?)
Suspend the flow until a condition function returns true. On each resume, re-evaluates the condition. If true, continues; if false, re-suspends.
SuspendOptions<T>
| Field | Type | Description |
|---|---|---|
schema | ZodType<T>? | Zod schema for the expected signal payload. Validated on signal delivery. |
timeout | string? | Timeout duration (e.g., '24h', '30m'). Flow expires if not signaled within this period. |
onExpired | (state) => void? | Callback invoked when a flow is detected as expired on resume. |
StepOptions
Retry and fallback options for a flow step.
| Field | Type | Description |
|---|---|---|
retry.attempts | number | Max attempts (including the first) |
retry.delay | number? | Base delay in ms (default: 1000) |
retry.backoff | 'linear' | 'exponential'? | Backoff strategy |
fallback | () => Promise<T> | T | Fallback if all retries fail |
FlowResult<T>
Discriminated union on status:
| Variant | Fields | Description |
|---|---|---|
completed | output: T, flowId | Flow finished normally |
suspended | flowId, suspendedAt | Flow paused at a suspend point |
cancelled | flowId, cancelReason? | Flow cancelled via flow.cancel() or cancelFlow() |
expired | flowId, suspendedAt | Suspended flow timed out before a signal arrived |
FlowSnapshot
Persisted flow state stored in CruxStore under the crux:flow:{flowId} key.
| Field | Type | Description |
|---|---|---|
flowId | string | Flow identifier |
name | string | Flow name |
status | string | Current status |
suspendedAt | string | Suspend point name |
completedSteps | Record<string, { output, durationMs }> | Cached step results for replay |
traceContext | Record<string, string | undefined> | Trace context metadata |
createdAt | number | Unix timestamp (ms) |
updatedAt | number | Unix timestamp (ms) |
Lifecycle Functions
signalFlow(flowId, name, payload?)
Low-level primitive. signalFlow() only writes the signal to the CruxStore -- it does NOT trigger the flow to
resume. Prefer handle.signal() which delegates to this internally. In Convex mutation-only modules, pair
signalFlow() with ctx.scheduler.runAfter() in an app-local helper that also schedules the resume action.
Write a signal payload to the store at crux:signal:{flowId}:{name}. The signal is picked up on the next resume.
| Parameter | Type | Description |
|---|---|---|
flowId | string | The suspended flow's ID |
name | string | Signal name (must match the suspend() point name) |
payload | JsonObject? | Signal data, validated against the suspend's schema on resume |
cancelFlow(flowId, reason?)
Cancel a suspended flow by updating its stored snapshot status to 'cancelled'.
| Parameter | Type | Description |
|---|---|---|
flowId | string | The flow's ID |
reason | string? | Cancellation reason |
listFlows(options?)
Query the store for flow snapshots.
| Parameter | Type | Description |
|---|---|---|
options.status | string? | Filter by flow status (e.g., 'suspended') |
Returns: Promise<FlowSummary[]> -- array of { flowId, name, status, suspendedAt, createdAt, updatedAt, timeoutAt? }
createFlowId()
Generates a unique flow ID for cross-boundary correlation. Returns string.
Error Classes
These errors are used internally for control flow -- they are caught by withFlow() and converted to FlowResult variants. You should not normally need to catch them.
| Error | Description |
|---|---|
FlowSuspendedError | Thrown by flow.suspend() to unwind the call stack |
FlowCancelledError | Thrown by flow.cancel() to unwind the call stack |
FlowExpiredError | Thrown when a suspended flow's timeout has been exceeded |
Suspend/Resume Example
import { flow, signalFlow } from '@crux/core/flow'
import { z } from 'zod'
const approvalFlow = flow('content-approval', async (flow) => {
const draft = await flow.step('draft', () => generate(writer, { model, input: flow.input }))
// Suspend and wait for human approval
const approval = await flow.suspend('review', {
schema: z.object({ approved: z.boolean(), feedback: z.string().optional() }),
timeout: '24h',
onExpired: ({ flowId }) => console.log(`Flow ${flowId} expired`),
})
if (!approval.approved) {
flow.cancel('Rejected by reviewer')
}
return flow.step('publish', () => generate(publisher, { model, input: { draft, feedback: approval.feedback } }))
})
// Start the flow
const result = await approvalFlow.run({ input: { topic: 'AI safety' } })
// result.status === 'suspended'
// Later, signal the flow to resume
await approvalFlow.signal(result.flowId, 'review', {
approved: true,
feedback: 'Great draft, just fix the intro.',
})
// Resume the flow
const final = await approvalFlow.run({ resume: result.flowId })
// final.status === 'completed'Types
import type {
FlowHandle,
FlowRunOptions,
FlowScope,
FlowResult,
FlowSnapshot,
FlowSummary,
ListFlowsOptions,
SuspendOptions,
StepOptions,
WithFlowOptions,
} from '@crux/core/flow'
import { FlowSuspendedError, FlowCancelledError, FlowExpiredError } from '@crux/core/flow'Related
- Guide: Flows overview
- Guide: Suspend and resume
- Guide: Typed input and step composition
- Guide: Agent primitives in flows
- Guide: Convex
- Cookbook: Plan with approval
- Cookbook: Long-running flow