CookbookAgents
Research pipeline
A planner → searcher → writer pipeline with typed handoffs. Each agent's output becomes the next agent's input.
This recipe builds a research pipeline: a planner produces a list of search queries, a searcher gathers results, and a writer composes a final answer with citations. Each step's output is typed and accumulates into the context for the next.
Primitives used
agent()for the three agentscreatePipeline()from@crux/core/agentfor sequential compositionprompt()with structured output for typed handoffs
When to reach for this pattern
- The work has a clear sequential shape — A must finish before B starts
- Each step's output is structured and feeds the next as typed input
- You want per-step tracing in devtools (each agent shows up as a labeled step)
Full code
lib/research/agents.ts
import { agent } from '@crux/core/agent'
import { prompt } from '@crux/core'
import { z } from 'zod'
const PlanSchema = z.object({
queries: z.array(z.string()).min(1).max(5),
rationale: z.string(),
})
const planner = agent({
id: 'planner',
prompt: prompt({
id: 'research-plan',
input: z.object({ topic: z.string() }),
output: PlanSchema,
system: 'Plan research. Generate 1-5 focused search queries for the topic.',
prompt: ({ input }) => `Topic: ${input.topic}`,
}),
})
const SearchResultsSchema = z.object({
results: z.array(z.object({
query: z.string(),
findings: z.array(z.object({
url: z.string(),
excerpt: z.string(),
})),
})),
})
const searcher = agent({
id: 'searcher',
prompt: prompt({
id: 'research-search',
input: PlanSchema,
output: SearchResultsSchema,
system: 'Execute the queries and return findings with sources.',
prompt: ({ input }) =>
`Run these queries:\n${input.queries.map((q, i) => `${i + 1}. ${q}`).join('\n')}`,
}),
tools: { searchWeb: webSearchTool },
})
const writer = agent({
id: 'writer',
prompt: prompt({
id: 'research-write',
input: SearchResultsSchema,
output: z.object({ answer: z.string(), citations: z.array(z.string()) }),
system: 'Write a concise answer with [n] citations referencing the findings.',
prompt: ({ input }) =>
input.results.map((r) =>
`Q: ${r.query}\n${r.findings.map((f) => `- ${f.excerpt} (${f.url})`).join('\n')}`
).join('\n\n'),
}),
})
export { planner, searcher, writer }Run the pipeline
import { createPipeline } from '@crux/core/agent'
import { generate } from '@crux/ai'
import { openai } from '@ai-sdk/openai'
import { planner, searcher, writer } from './agents'
const pipeline = createPipeline((agent, opts) => generate(agent.prompt, opts))
const result = await pipeline({
context: { topic: 'agentic workflows in 2026' },
model: openai('gpt-4o'),
steps: [
{ name: 'plan', agent: planner, input: (ctx) => ({ topic: ctx.topic }) },
{ name: 'search', agent: searcher, input: (ctx) => ctx.plan },
{ name: 'write', agent: writer, input: (ctx) => ctx.search },
],
})
result.status // 'completed' | 'suspended' | 'expired'
if (result.status === 'completed') {
result.context // { topic, plan, search, write } — every step's output keyed by name
result.finalOutput // the last step's output (write)
}How it works
- Each agent has a typed input/output schema. The planner outputs a
PlanSchema. The searcher accepts aPlanSchemaas input. The writer accepts the searcher'sSearchResultsSchema. Type errors surface at compose time, not runtime. createPipeline()chains them by name. Each step's output is stored in the accumulatingcontextunder itsname. The next step'sinputcallback receives the full accumulated context (seed + every prior step's output).- Tools are per-agent. Only the searcher has
searchWeb; the planner can't accidentally call it. - Devtools sees every step. When wrapped in a flow or
withSession(), each step gets a labeled entry in the timeline.
Variations
Add a fact-check step
Slot a verifier between searcher and writer:
const verifier = agent({
id: 'verifier',
prompt: prompt({
input: SearchResultsSchema,
output: SearchResultsSchema, // same shape — drops unverified findings
// ...
}),
})
await pipeline({
context: { topic: '...' },
model: openai('gpt-4o'),
steps: [
{ name: 'plan', agent: planner, input: (ctx) => ({ topic: ctx.topic }) },
{ name: 'search', agent: searcher, input: (ctx) => ctx.plan },
{ name: 'verify', agent: verifier, input: (ctx) => ctx.search },
{ name: 'write', agent: writer, input: (ctx) => ctx.verify },
],
})Adjacent steps share types as long as the upstream output schema matches the downstream input schema.
Wrap in a flow for resumability
If the pipeline is long-running and may exceed your function timeout, wrap it in flow. Each agent step becomes a flow.step() with retry support, and the whole flow can suspend between steps.