Crux
GuidesCompositions

Pipeline

Chain agents sequentially with typed data flow between immediate steps.

pipeline() chains agents sequentially — each step can access all previous outputs via a context accumulator. Use it for multi-step workflows like research-write-edit, or any process where each stage depends on earlier stages.

research-pipeline.ts
import { pipeline } from '@crux/ai'
import { agent } from '@crux/core/agent'

const result = await pipeline({
  steps: [
    { agent: researcher, name: 'research' },
    {
      agent: writer,
      name: 'write',
      input: (ctx) => ({ findings: ctx.research.output.sources, query: ctx.research.output.query }),
    },
    {
      agent: editor,
      name: 'edit',
      input: (ctx) => ({ draft: ctx.write.output.text }),
    },
  ],
  context: { query: 'Write about AI safety' },
  model: claude35,
})

result.finalOutput  // editor's output
result.context      // accumulated context from all steps
result.durationMs   // total pipeline time

How it works

  1. Steps execute sequentially — step 2 waits for step 1 to finish
  2. The first step receives the pipeline's context (seed data)
  3. Each subsequent step's input function receives a context object containing all previous outputs plus the seed data
  4. The context accumulates — step 3 can access outputs from both step 1 and step 2
  5. Returns the final output, accumulated context, and total duration

Context accumulator

The input function on each step receives the full accumulated context — all previous step outputs keyed by name, plus the original seed data:

{
  agent: writer,
  name: 'write',
  input: (ctx) => ({
    // ctx.research is the AgentResult from the research step
    findings: ctx.research.output.sources,
    // ctx.query comes from the seed context
    topic: ctx.query,
    wordCount: 1200,
  }),
}

If no input function is provided, the full context object is passed directly:

// No transform — writer receives the entire context object as input
{ agent: writer, name: 'write' }

Plain function steps

Steps can also be plain { name, fn } objects alongside agent steps. Use this for custom logic, API calls, or data transforms that don't need an LLM:

const result = await pipeline({
  steps: [
    { agent: researcher, name: 'research' },
    {
      name: 'enrich',
      fn: async (ctx) => {
        const sources = ctx.research.output.sources
        return { enriched: await fetchMetadata(sources) }
      },
    },
    {
      agent: writer,
      name: 'write',
      input: (ctx) => ({ findings: ctx.enrich.enriched }),
    },
  ],
  context: { query: 'AI safety' },
  model,
})

Plain function steps appear in devtools tracing but skip agent execution.

Per-step retry

Each step can have its own retry configuration with backoff and fallback:

resilient-pipeline.ts
const result = await pipeline({
  steps: [
    {
      agent: researcher,
      name: 'research',
      retry: {
        retry: { attempts: 3, delay: 1000, backoff: 'exponential' },
        fallback: () => ({ sources: [], query: 'fallback' }),
      },
    },
    { agent: writer, name: 'write', input: (ctx) => ({ findings: ctx.research.output.sources }) },
  ],
  context: { query: 'AI safety' },
  model,
})

Without retry, failures propagate immediately (current behavior).

Crux policy-terminal errors are not retried by default and do not run execution fallback: GuardrailBlockedError, ConstraintViolationError, and ValidationExhaustedError. This keeps safety, constraint, and validation decisions from being bypassed by generic composition retry. Use shouldRetry only when you intentionally want to override that policy.

Error handling

If a step fails (and has no retry/fallback), the pipeline throws an error that includes the step name:

Pipeline step "write" failed: Model rate limited

The error wraps the original error message so you can identify which step failed.

Single-step pipelines

A pipeline with one step works — useful when you want the pipeline return shape ({ finalOutput, context, durationMs }) for consistency:

const result = await pipeline({
  steps: [{ agent: classifier, name: 'classify' }],
  context: { text: message },
  model,
})

Durability

pipeline() is an immediate composition: it runs its steps in the current execution and either completes or throws. For human-in-the-loop approval, scheduled work, serverless resumes, or Convex action boundaries, use flow() and put pipeline-like stages inside flow.step().

CreationTool .created capture

When an agent step uses a CreationTool (a tool that produces artifacts), the pipeline automatically captures the tool's .created values. Downstream steps can access them via ctx[stepName]._created[toolName].

pipeline-with-creation.ts
import { pipeline } from '@crux/ai'

const result = await pipeline({
  steps: [
    {
      agent: plannerAgent,  // uses a CreationTool called "createPlan"
      name: 'plan',
    },
    {
      agent: executor,
      name: 'execute',
      input: (ctx) => ({
        // Access the createPlan tool's .created values from the plan step
        plan: ctx.plan._created.createPlan,
      }),
    },
  ],
  context: { goal: 'migrate to cloud' },
  model,
})

Each agent step's context entry gains a _created record keyed by tool name. This is populated automatically — no extra wiring needed. If the agent didn't invoke any creation tools, _created is an empty object.

Devtools

In the devtools timeline, pipeline compositions appear as sequential steps:

comp  → pipeline  3 agents
comp  ← researcher  1.2s
comp  ← writer  3.4s
comp  ← editor  1.5s
comp  ← pipeline  3/3 ok  6.1s

On this page