Typed input and step composition
Type the flow's input, share it with steps via flow.input, extract steps into reusable functions, and compose flows by nesting.
flow accepts two type parameters — T for the return type and TInput for typed flow input. This lets you pass structured data into a flow at call time and access it from any step via flow.input, without threading it through every closure.
import { flow } from '@crux/core'
interface PipelineInput {
query: string
maxResults: number
}
const researchFlow = flow<GenerateResult, PipelineInput>('research', async (flow) => {
// flow.input is typed as PipelineInput
const plan = await flow.step('plan', () => generate(planner, { model, input: { query: flow.input.query } }))
return flow.step('search', () =>
generate(searcher, {
model,
input: { plan: plan.object, limit: flow.input.maxResults },
}),
)
})
const result = await researchFlow.run({
input: { query: 'cloud migration', maxResults: 10 },
})External Step Functions (Auto-Pass)
When a step function accepts a (flow: FlowScope) parameter, flow.step() passes the flow scope automatically. This lets you extract steps into standalone functions that can access flow.input and flow.results without manual wiring:
// External step function — receives flow automatically
async function planStep(flow: FlowScope<PipelineInput>) {
return generate(planner, { model, input: { query: flow.input.query } })
}
// Auto-pass: flow.step detects the parameter and passes flow
await flow.step('plan', planStep)
// Compare with the manual wrapper form:
await flow.step('plan', () => planStep(customArgs))The auto-pass pattern works when your step function signature is (flow: FlowScope) => T. If your function needs additional arguments, use the wrapper form () => stepFn(args).
flow.results — Cross-Step Access
After each step completes, its return value is stored in flow.results keyed by step name. This is useful when an external step function needs access to earlier step outputs:
async function searchStep(flow: FlowScope<PipelineInput>) {
// Access the plan step's output via flow.results
const plan = flow.results['plan'] as PlanOutput
return generate(searcher, { model, input: { plan, limit: flow.input.maxResults } })
}
const researchFlow = flow<SearchResult, PipelineInput>('research', async (flow) => {
await flow.step('plan', planStep)
return flow.step('search', searchStep)
})
const result = await researchFlow.run({
input: { query: 'cloud migration', maxResults: 10 },
})Prefer return-value assignment as your primary typed path. Assigning const plan = await flow.step('plan', ...) gives you full type inference. Use flow.results as an escape hatch for external step functions that can't receive earlier step outputs as parameters — its values are typed as Record<string, unknown>, so you'll need type assertions.
Nested Flows
Flows can nest inside other flows. Running flow().run() inside another flow creates a parent-child relationship in the observability graph automatically.
import { flow } from '@crux/core'
import { generate } from '@crux/ai'
import { openai } from '@ai-sdk/openai'
const model = openai('gpt-4o')
const innerResearch = flow('research', async (innerFlow) => {
const sources = await innerFlow.step('find-sources', () =>
generate(sourceFinder, { model, input: { plan: plan.object } }),
)
return innerFlow.step('synthesize', () => generate(synthesizer, { model, input: { sources: sources.object } }))
})
const contentPipeline = flow('content-pipeline', async (flow) => {
const plan = await flow.step('plan', () => generate(contentPlanner, { model, input: { topic: 'cloud migration' } }))
// Inner flow runs as a nested pipeline inside a step
const research = await flow.step('research', () => innerResearch.run())
return flow.step('write', () => generate(articleWriter, { model, input: { research: research.object } }))
})
await contentPipeline.run()In devtools, nested flows render as a tree:
Flow "content-pipeline"
├── step:plan → trace-001
├── step:research
│ └── Flow "research" ← nested, parentFlowId points to parent
│ ├── step:find-sources → trace-002
│ └── step:synthesize → trace-003
└── step:write → trace-004This is useful when a step delegates to a complete sub-pipeline — the inner flow has its own retry, fallback, and tracing, while the parent flow sees it as a single step.
Related
- Agent primitives in flows — using
handoff,delegate, andblackboardinside steps - Convex flows — typed input across action boundaries
- API reference —
FlowScope,FlowHandle,FlowResult
Suspend and resume
Pause flows for human approval or external events, persist state, and resume execution — possibly in a different process.
Agent primitives in flows
Compose flows with handoff, delegate, and blackboard — the building blocks that plug into steps for validation, sub-pipelines, and shared state.