Crux
CookbookAgents

Research pipeline

A planner → searcher → writer pipeline with typed handoffs. Each agent's output becomes the next agent's input.

This recipe builds a research pipeline: a planner produces a list of search queries, a searcher gathers results, and a writer composes a final answer with citations. Each step's output is typed and accumulates into the context for the next.

Primitives used

  • agent() for the three agents
  • createPipeline() from @crux/core/agent for sequential composition
  • prompt() with structured output for typed handoffs

When to reach for this pattern

  • The work has a clear sequential shape — A must finish before B starts
  • Each step's output is structured and feeds the next as typed input
  • You want per-step tracing in devtools (each agent shows up as a labeled step)

Full code

lib/research/agents.ts

import { agent } from '@crux/core/agent'
import { prompt } from '@crux/core'
import { z } from 'zod'

const PlanSchema = z.object({
  queries: z.array(z.string()).min(1).max(5),
  rationale: z.string(),
})

const planner = agent({
  id: 'planner',
  prompt: prompt({
    id: 'research-plan',
    input: z.object({ topic: z.string() }),
    output: PlanSchema,
    system: 'Plan research. Generate 1-5 focused search queries for the topic.',
    prompt: ({ input }) => `Topic: ${input.topic}`,
  }),
})

const SearchResultsSchema = z.object({
  results: z.array(z.object({
    query: z.string(),
    findings: z.array(z.object({
      url: z.string(),
      excerpt: z.string(),
    })),
  })),
})

const searcher = agent({
  id: 'searcher',
  prompt: prompt({
    id: 'research-search',
    input: PlanSchema,
    output: SearchResultsSchema,
    system: 'Execute the queries and return findings with sources.',
    prompt: ({ input }) =>
      `Run these queries:\n${input.queries.map((q, i) => `${i + 1}. ${q}`).join('\n')}`,
  }),
  tools: { searchWeb: webSearchTool },
})

const writer = agent({
  id: 'writer',
  prompt: prompt({
    id: 'research-write',
    input: SearchResultsSchema,
    output: z.object({ answer: z.string(), citations: z.array(z.string()) }),
    system: 'Write a concise answer with [n] citations referencing the findings.',
    prompt: ({ input }) =>
      input.results.map((r) =>
        `Q: ${r.query}\n${r.findings.map((f) => `- ${f.excerpt} (${f.url})`).join('\n')}`
      ).join('\n\n'),
  }),
})

export { planner, searcher, writer }

Run the pipeline

import { createPipeline } from '@crux/core/agent'
import { generate } from '@crux/ai'
import { openai } from '@ai-sdk/openai'

import { planner, searcher, writer } from './agents'

const pipeline = createPipeline((agent, opts) => generate(agent.prompt, opts))

const result = await pipeline({
  context: { topic: 'agentic workflows in 2026' },
  model: openai('gpt-4o'),
  steps: [
    { name: 'plan', agent: planner, input: (ctx) => ({ topic: ctx.topic }) },
    { name: 'search', agent: searcher, input: (ctx) => ctx.plan },
    { name: 'write', agent: writer, input: (ctx) => ctx.search },
  ],
})

result.status       // 'completed' | 'suspended' | 'expired'
if (result.status === 'completed') {
  result.context    // { topic, plan, search, write } — every step's output keyed by name
  result.finalOutput // the last step's output (write)
}

How it works

  1. Each agent has a typed input/output schema. The planner outputs a PlanSchema. The searcher accepts a PlanSchema as input. The writer accepts the searcher's SearchResultsSchema. Type errors surface at compose time, not runtime.
  2. createPipeline() chains them by name. Each step's output is stored in the accumulating context under its name. The next step's input callback receives the full accumulated context (seed + every prior step's output).
  3. Tools are per-agent. Only the searcher has searchWeb; the planner can't accidentally call it.
  4. Devtools sees every step. When wrapped in a flow or withSession(), each step gets a labeled entry in the timeline.

Variations

Add a fact-check step

Slot a verifier between searcher and writer:

const verifier = agent({
  id: 'verifier',
  prompt: prompt({
    input: SearchResultsSchema,
    output: SearchResultsSchema, // same shape — drops unverified findings
    // ...
  }),
})

await pipeline({
  context: { topic: '...' },
  model: openai('gpt-4o'),
  steps: [
    { name: 'plan', agent: planner, input: (ctx) => ({ topic: ctx.topic }) },
    { name: 'search', agent: searcher, input: (ctx) => ctx.plan },
    { name: 'verify', agent: verifier, input: (ctx) => ctx.search },
    { name: 'write', agent: writer, input: (ctx) => ctx.verify },
  ],
})

Adjacent steps share types as long as the upstream output schema matches the downstream input schema.

Wrap in a flow for resumability

If the pipeline is long-running and may exceed your function timeout, wrap it in flow. Each agent step becomes a flow.step() with retry support, and the whole flow can suspend between steps.

Where to next

On this page