Flows
Compose generate() calls into structured pipelines with named steps, automatic retries, fallbacks, and full observability.
A single generate() call handles one prompt. Real applications chain multiple calls into pipelines — research then synthesize, plan then execute, draft then validate. flow makes those pipelines a first-class primitive: every step has a name, every retry is declarative, and every trace is grouped automatically.
import { flow } from '@crux/core'
const researchFlow = flow('research', async (flow) => {
const plan = await flow.step('plan', () => generate(planner, { model, input: { query } }))
const results = await flow.step('search', () => generate(searcher, { model, input: { plan: plan.object } }))
return flow.step('synthesize', () => generate(synthesizer, { model, input: { results: results.object } }))
})
await researchFlow.run()Before reaching for flow, check if a composition pattern already covers your use case.
pipeline() handles sequential agent chains with typed context accumulation,
parallel() handles concurrent execution with named results,
consensus() handles voting, and
swarm() handles dynamic LLM-decided routing. These are simpler, require less
code, and come with pattern-aware devtools rendering. Reach for flow when you need per-step retries/fallbacks,
suspend/resume, custom orchestration logic that doesn't fit a composition pattern, or step-level tracing
inside an agent's tool.
When You Need This
Use flow when you see these signals in your code:
- Chaining outputs — the result of one
generate()feeds into the next as input - Manual try/catch for retries — you're wrapping
generate()in retry loops yourself - Human-in-the-loop gates — your pipeline needs to pause for approval, review, or external input before continuing
- Cross-action tracing — your pipeline spans multiple Convex actions, Lambda invocations, or API routes
- Eval↔runtime comparison — you want to compare production step performance against eval baselines in devtools
If you're making a single generate() call, you don't need a flow — just call generate() directly. Flows are for multi-step pipelines where the steps have meaningful names and data dependencies.
What Flows Give You
- Named steps — label each stage of your pipeline; thread data between steps naturally
- Automatic retries — retry failed steps with configurable backoff, no manual try/catch
- Fallbacks — switch to a cheaper model or alternative strategy when retries are exhausted
- Suspend/resume — pause for human approval, external events, or async dependencies; resume later, even in a different process
- Structured tracing — every
generate()inside a step is tagged withflowId,stepId, andstepLabelautomatically - Devtools grouping — the Timeline groups traces by flow and step instead of guessing from timing
- Quality↔runtime comparison — flow step labels show up as typed
stepssignals in Quality evaluations, so test assertions and production traces speak the same names - Cross-boundary — works across Convex actions, microservices, or any multi-process setup by passing IDs
- Define once, run many —
flow()returns a frozen handle; call.run()repeatedly with different inputs
Basic Usage
import { flow } from '@crux/core'
import { generate } from '@crux/ai'
const editPipeline = flow('edit-pipeline', async (flow) => {
const analysis = await flow.step('analyze', () => generate(analyzer, { model, input: { document } }))
return flow.step('edit', () => generate(editor, { model, input: { analysis: analysis.object, document } }))
})
const result = await editPipeline.run()flow returns a FlowHandle with a .run() method that returns whatever the handler returns (wrapped in a FlowResult). Each flow.step() returns the result of its function — use it to thread data between steps.
Sessions
withSession groups traces by user interaction. flow structures them into pipelines within that session.
import { withSession, flow, createSessionId } from '@crux/core'
const researchFlow = flow('research', async (flow) => {
await flow.step('plan', () => generate(planner, { model, input }))
await flow.step('search', () => generate(searcher, { model, input }))
})
await withSession(createSessionId(), () => researchFlow.run())Sessions and flows compose in any order. Multiple flows can run within one session:
await withSession(sessionId, async () => {
await researchFlow.run()
await writeFlow.run()
})Retry and Fallback
LLM calls fail — rate limits, timeouts, malformed output. Instead of wrapping every step in try/catch, declare retry and fallback behavior:
await flow.step('search', () => generate(searcher, { model: primaryModel, input }), {
retry: { attempts: 3, delay: 1000, backoff: 'exponential' },
fallback: () => generate(searcher, { model: cheapModel, input }),
})This retries up to 3 times with exponential backoff (1s, 2s delays), then falls back to a cheaper model if all attempts fail. The step still returns its result transparently — callers don't need to know about the retry logic.
| Option | Type | Default | Description |
|---|---|---|---|
retry.attempts | number | 1 | Max attempts including the first |
retry.delay | number | 1000 | Base delay between retries (ms) |
retry.backoff | 'linear' | 'exponential' | 'linear' | Delay multiplier strategy |
fallback | () => T | — | Runs if all retries fail |
You can use retry alone, fallback alone, or both together. Without either, errors propagate normally.
Crux policy-terminal errors are not retried by default and do not run execution fallback: GuardrailBlockedError, ConstraintViolationError, and ValidationExhaustedError. This keeps safety, constraint, and validation decisions from being bypassed by a generic fallback. Use shouldRetry only when you intentionally want to override that policy.
// Retry only — fail hard after 3 attempts
await flow.step('validate', () => generate(validator, { model, input }), { retry: { attempts: 3, delay: 500 } })
// Fallback only — no retry, just switch strategy on first failure
await flow.step('summarize', () => generate(summarizer, { model: premiumModel, input }), {
fallback: () => generate(summarizer, { model: cheapModel, input }),
})Parallel Flows
Run independent pipelines concurrently:
const topicResearch = flow('research', async (flow) => {
await flow.step('plan', () => generate(planner, { model, input: { query: q } }))
await flow.step('search', () => generate(searcher, { model, input }))
})
await withSession(sessionId, () => Promise.all(queries.map((q) => topicResearch.run({ input: { query: q } }))))Each .run() call gets its own flowId and groups independently in devtools.
Evaluating flows
Point a Quality evaluation's task at the flow handle; step labels become
trace-backed steps signals you can assert on:
import { evaluate } from '@crux/core/quality'
const researchFlow = flow('research', async (flow) => {
await flow.step('plan', () => generate(planner, { model, input }))
await flow.step('search', () => generate(searcher, { model, input }))
})
export default evaluate('research.pipeline', {
task: researchFlow,
data: [{ input: { query: 'launch plan' } }],
expect: (ctx) => {
ctx.expect.steps.toHaveSucceeded('plan')
ctx.expect.steps.toHaveOrder('plan', 'search')
},
})See the Quality guide for variants, gates, and replay.
Migration from withFlow
flow replaces the previous withFlow(name, fn, options) API. The migration is mechanical — define the flow once at module level, call .run() at each call site:
// Before
import { withFlow } from '@crux/core'
const result = await withFlow(
'pipeline',
async (flow) => {
/* steps */
},
{ input, resume },
)
// After
import { flow } from '@crux/core'
const pipeline = flow('pipeline', async (flow) => {
/* steps */
})
const result = await pipeline.run({ input, resume })The other notable change is signaling: prefer pipeline.signal(flowId, name, payload) on the handle. The bare signalFlow() export is now a low-level primitive that only writes the signal payload — see Suspend and resume for the full picture.
Where to Next
Suspend and resume
Pause flows for approval or external events; resume in a different process.
Typed input and step composition
flow.input, external step functions, flow.results, and nested flows.
Agent primitives in flows
Compose flows with handoff, delegate, and blackboard.
Convex
Cross-action pipelines, resume signaling, trace propagation.
API reference
flow, FlowHandle, FlowScope, signalFlow, cancelFlow, and more.