Pipeline
Chain agents sequentially with typed data flow between immediate steps.
pipeline() chains agents sequentially — each step can access all previous outputs via a context accumulator. Use it for multi-step workflows like research-write-edit, or any process where each stage depends on earlier stages.
import { pipeline } from '@crux/ai'
import { agent } from '@crux/core/agent'
const result = await pipeline({
steps: [
{ agent: researcher, name: 'research' },
{
agent: writer,
name: 'write',
input: (ctx) => ({ findings: ctx.research.output.sources, query: ctx.research.output.query }),
},
{
agent: editor,
name: 'edit',
input: (ctx) => ({ draft: ctx.write.output.text }),
},
],
context: { query: 'Write about AI safety' },
model: claude35,
})
result.finalOutput // editor's output
result.context // accumulated context from all steps
result.durationMs // total pipeline timeHow it works
- Steps execute sequentially — step 2 waits for step 1 to finish
- The first step receives the pipeline's
context(seed data) - Each subsequent step's
inputfunction receives a context object containing all previous outputs plus the seed data - The context accumulates — step 3 can access outputs from both step 1 and step 2
- Returns the final output, accumulated context, and total duration
Context accumulator
The input function on each step receives the full accumulated context — all previous step outputs keyed by name, plus the original seed data:
{
agent: writer,
name: 'write',
input: (ctx) => ({
// ctx.research is the AgentResult from the research step
findings: ctx.research.output.sources,
// ctx.query comes from the seed context
topic: ctx.query,
wordCount: 1200,
}),
}If no input function is provided, the full context object is passed directly:
// No transform — writer receives the entire context object as input
{ agent: writer, name: 'write' }Plain function steps
Steps can also be plain { name, fn } objects alongside agent steps. Use this for custom logic, API calls, or data transforms that don't need an LLM:
const result = await pipeline({
steps: [
{ agent: researcher, name: 'research' },
{
name: 'enrich',
fn: async (ctx) => {
const sources = ctx.research.output.sources
return { enriched: await fetchMetadata(sources) }
},
},
{
agent: writer,
name: 'write',
input: (ctx) => ({ findings: ctx.enrich.enriched }),
},
],
context: { query: 'AI safety' },
model,
})Plain function steps appear in devtools tracing but skip agent execution.
Per-step retry
Each step can have its own retry configuration with backoff and fallback:
const result = await pipeline({
steps: [
{
agent: researcher,
name: 'research',
retry: {
retry: { attempts: 3, delay: 1000, backoff: 'exponential' },
fallback: () => ({ sources: [], query: 'fallback' }),
},
},
{ agent: writer, name: 'write', input: (ctx) => ({ findings: ctx.research.output.sources }) },
],
context: { query: 'AI safety' },
model,
})Without retry, failures propagate immediately (current behavior).
Crux policy-terminal errors are not retried by default and do not run execution fallback: GuardrailBlockedError, ConstraintViolationError, and ValidationExhaustedError. This keeps safety, constraint, and validation decisions from being bypassed by generic composition retry. Use shouldRetry only when you intentionally want to override that policy.
Error handling
If a step fails (and has no retry/fallback), the pipeline throws an error that includes the step name:
Pipeline step "write" failed: Model rate limitedThe error wraps the original error message so you can identify which step failed.
Single-step pipelines
A pipeline with one step works — useful when you want the pipeline return shape ({ finalOutput, context, durationMs }) for consistency:
const result = await pipeline({
steps: [{ agent: classifier, name: 'classify' }],
context: { text: message },
model,
})Durability
pipeline() is an immediate composition: it runs its steps in the current execution and either completes or throws. For human-in-the-loop approval, scheduled work, serverless resumes, or Convex action boundaries, use flow() and put pipeline-like stages inside flow.step().
CreationTool .created capture
When an agent step uses a CreationTool (a tool that produces artifacts), the pipeline automatically captures the tool's .created values. Downstream steps can access them via ctx[stepName]._created[toolName].
import { pipeline } from '@crux/ai'
const result = await pipeline({
steps: [
{
agent: plannerAgent, // uses a CreationTool called "createPlan"
name: 'plan',
},
{
agent: executor,
name: 'execute',
input: (ctx) => ({
// Access the createPlan tool's .created values from the plan step
plan: ctx.plan._created.createPlan,
}),
},
],
context: { goal: 'migrate to cloud' },
model,
})Each agent step's context entry gains a _created record keyed by tool name. This is populated automatically — no extra wiring needed. If the agent didn't invoke any creation tools, _created is an empty object.
Devtools
In the devtools timeline, pipeline compositions appear as sequential steps:
comp → pipeline 3 agents
comp ← researcher 1.2s
comp ← writer 3.4s
comp ← editor 1.5s
comp ← pipeline 3/3 ok 6.1s