Crux
API Reference@crux/core

Flows

Suspendable, resumable flows with named steps, signals, and devtools tracing.

import {
  flow,
  withFlow,
  signalFlow,
  cancelFlow,
  listFlows,
  createFlowId,
  FlowSuspendedError,
  FlowCancelledError,
  FlowExpiredError,
} from '@crux/core/flow'

Overview

Flows group generate() calls into structured pipelines with named steps, automatic devtools tracing, suspend/resume, and retry/fallback. A run opens a canonical flow.run span, and each executed flow.step() opens a canonical flow.step child. Generated spans inside a flow inherit the active observability context for structured pipeline grouping in devtools. Flows can nest; calling flow().run() inside another flow preserves the parent-child relationship in the graph.

Flows support suspend/resume for human-in-the-loop workflows. When flow.suspend() is called, the flow persists its state to the configured CruxStore and returns { status: 'suspended' }. The observability graph records a flow.suspension marker linked to the causing step, and RunDetail presents that marker beside the other flow steps so successful generations and completed steps do not remain visually stuck. Pass { resume: flowId } to .run() to resume a previously suspended flow.

flow(name, handler)

Define a named flow and return a frozen FlowHandle. Separates definition from execution: the handler is captured once, then .run() can be called repeatedly with different inputs and options.

ParameterTypeDescription
namestringHuman-readable flow name
handler(flow: FlowScope<TInput>) => Promise<T> | TFlow function with access to flow.step(), flow.suspend(), flow.cancel()

Returns: FlowHandle<T, TInput> -- frozen handle with .run(), .signal(), and .name

import { flow } from '@crux/core/flow'
import { generate } from '@crux/ai'

const researchFlow = flow('research', async (flow) => {
  const plan = await flow.step('plan', () => generate(planner, { model, input: { topic: flow.input.topic } }))
  return flow.step('search', () => generate(searcher, { model, input: { plan } }))
})

const result = await researchFlow.run({ input: { topic: 'TypeScript monorepos' } })
if (result.status === 'completed') {
  console.log(result.output)
}

withFlow(name, fn, options?)

Execute a function directly within a flow scope. Lower-level alternative to flow -- useful for one-off flows or when you need full control over options at the call site.

ParameterTypeDescription
namestringHuman-readable flow name
fn(flow: FlowScope<TInput>) => Promise<T> | TFlow function
optionsWithFlowOptions<TInput>?Flow ID, resume, goal, input

Returns: Promise<FlowResult<T>>

FlowHandle<T, TInput>

The frozen handle returned by flow().

Property / MethodTypeDescription
namestringThe flow's registered name
run(options?)Promise<FlowResult<T>>Execute the flow with optional input and runtime options
signal(flowId, signalName, payload?)Promise<void>Send a signal to a suspended instance. Delegates to signalFlow() internally.

FlowRunOptions<TInput>

Options passed to handle.run().

FieldTypeDescription
inputTInput?Typed input data, accessible as flow.input inside the flow
flowIdstring?Use a specific ID for cross-action correlation
parentFlowIdstring?Explicit parent flow ID for cross-action nesting
goalstring?Goal description for devtools display
resumestring?Resume a previously suspended flow by its flowId

FlowScope<TInput>

The scope object passed to the flow handler. Generic over TInput (defaults to void).

Property / MethodTypeDescription
flowIdstringThe flow's unique identifier
inputTInputTyped input data passed via run options
resultsRecord<string, unknown>Auto-populated after each step completes, keyed by step label
step(label, fn, options?)Promise<T>Execute a named step. fn can be () => T or (flow: FlowScope) => T
suspend(name, options?)Promise<T>Suspend the flow at a named point and wait for an external signal
waitUntil(name, conditionFn, options?)Promise<void>Suspend until a condition function returns true
cancel(reason?)neverCancel the flow with an optional reason

step(label, fn, options?)

Execute a named step within the flow. Opens a canonical flow.step span and restores observability context so generated child spans nest under the step. On resume, completed steps are replayed from cache (skip-replay) and are not re-emitted.

fn accepts both plain functions () => T and flow-aware functions (flow: FlowScope) => T. Flow-aware functions receive the scope automatically, enabling external step definitions that access flow.input, flow.results, etc.

suspend(name, options?)

Suspend the flow and wait for an external signal. Throws internally to unwind the call stack -- no code after suspend() executes in the current call. The current flow emits a flow.suspension observability marker and rolls the flow/run presentation status to suspended; the causing step can still end ok. On resume, the signal payload is returned (typed if schema is provided).

waitUntil(name, conditionFn, options?)

Suspend the flow until a condition function returns true. On each resume, re-evaluates the condition. If true, continues; if false, re-suspends.

SuspendOptions<T>

FieldTypeDescription
schemaZodType<T>?Zod schema for the expected signal payload. Validated on signal delivery.
timeoutstring?Timeout duration (e.g., '24h', '30m'). Flow expires if not signaled within this period.
onExpired(state) => void?Callback invoked when a flow is detected as expired on resume.

StepOptions

Retry and fallback options for a flow step.

FieldTypeDescription
retry.attemptsnumberMax attempts (including the first)
retry.delaynumber?Base delay in ms (default: 1000)
retry.backoff'linear' | 'exponential'?Backoff strategy
fallback() => Promise<T> | TFallback if all retries fail

FlowResult<T>

Discriminated union on status:

VariantFieldsDescription
completedoutput: T, flowIdFlow finished normally
suspendedflowId, suspendedAtFlow paused at a suspend point
cancelledflowId, cancelReason?Flow cancelled via flow.cancel() or cancelFlow()
expiredflowId, suspendedAtSuspended flow timed out before a signal arrived

FlowSnapshot

Persisted flow state stored in CruxStore under the crux:flow:{flowId} key.

FieldTypeDescription
flowIdstringFlow identifier
namestringFlow name
statusstringCurrent status
suspendedAtstringSuspend point name
completedStepsRecord<string, { output, durationMs }>Cached step results for replay
traceContextRecord<string, string | undefined>Trace context metadata
createdAtnumberUnix timestamp (ms)
updatedAtnumberUnix timestamp (ms)

Lifecycle Functions

signalFlow(flowId, name, payload?)

Low-level primitive. signalFlow() only writes the signal to the CruxStore -- it does NOT trigger the flow to resume. Prefer handle.signal() which delegates to this internally. In Convex mutation-only modules, pair signalFlow() with ctx.scheduler.runAfter() in an app-local helper that also schedules the resume action.

Write a signal payload to the store at crux:signal:{flowId}:{name}. The signal is picked up on the next resume.

ParameterTypeDescription
flowIdstringThe suspended flow's ID
namestringSignal name (must match the suspend() point name)
payloadJsonObject?Signal data, validated against the suspend's schema on resume

cancelFlow(flowId, reason?)

Cancel a suspended flow by updating its stored snapshot status to 'cancelled'.

ParameterTypeDescription
flowIdstringThe flow's ID
reasonstring?Cancellation reason

listFlows(options?)

Query the store for flow snapshots.

ParameterTypeDescription
options.statusstring?Filter by flow status (e.g., 'suspended')

Returns: Promise<FlowSummary[]> -- array of { flowId, name, status, suspendedAt, createdAt, updatedAt, timeoutAt? }

createFlowId()

Generates a unique flow ID for cross-boundary correlation. Returns string.

Error Classes

These errors are used internally for control flow -- they are caught by withFlow() and converted to FlowResult variants. You should not normally need to catch them.

ErrorDescription
FlowSuspendedErrorThrown by flow.suspend() to unwind the call stack
FlowCancelledErrorThrown by flow.cancel() to unwind the call stack
FlowExpiredErrorThrown when a suspended flow's timeout has been exceeded

Suspend/Resume Example

import { flow, signalFlow } from '@crux/core/flow'
import { z } from 'zod'

const approvalFlow = flow('content-approval', async (flow) => {
  const draft = await flow.step('draft', () => generate(writer, { model, input: flow.input }))

  // Suspend and wait for human approval
  const approval = await flow.suspend('review', {
    schema: z.object({ approved: z.boolean(), feedback: z.string().optional() }),
    timeout: '24h',
    onExpired: ({ flowId }) => console.log(`Flow ${flowId} expired`),
  })

  if (!approval.approved) {
    flow.cancel('Rejected by reviewer')
  }

  return flow.step('publish', () => generate(publisher, { model, input: { draft, feedback: approval.feedback } }))
})

// Start the flow
const result = await approvalFlow.run({ input: { topic: 'AI safety' } })
// result.status === 'suspended'

// Later, signal the flow to resume
await approvalFlow.signal(result.flowId, 'review', {
  approved: true,
  feedback: 'Great draft, just fix the intro.',
})

// Resume the flow
const final = await approvalFlow.run({ resume: result.flowId })
// final.status === 'completed'

Types

import type {
  FlowHandle,
  FlowRunOptions,
  FlowScope,
  FlowResult,
  FlowSnapshot,
  FlowSummary,
  ListFlowsOptions,
  SuspendOptions,
  StepOptions,
  WithFlowOptions,
} from '@crux/core/flow'

import { FlowSuspendedError, FlowCancelledError, FlowExpiredError } from '@crux/core/flow'

On this page