Crux
GuidesFlows

Agent primitives in flows

Compose flows with handoff, delegate, and blackboard — the building blocks that plug into steps for validation, sub-pipelines, and shared state.

Flows are the orchestration layer. Blackboard, Handoff, and Delegate are the building blocks that plug into steps. Used inside a flow, they give you validated data transfer between agents, callable sub-pipelines, and shared typed state across steps — all with the same retry, fallback, and tracing the rest of the flow gets.

Handoff Between Flows

A handoff validates and transforms data between agents. Use it to connect the output of one flow to the input of another — the handoff sits between the two flows as a validated bridge.

import { z } from 'zod'
import { withSession, flow, createSessionId } from '@crux/core'
import { handoff } from '@crux/core/agent'
import { generate, generateTextFn } from '@crux/ai'
import { openai } from '@ai-sdk/openai'

// Schemas for the research agent's output
const ResearchOutput = z.object({
  findings: z.array(z.string()),
  sources: z.array(z.object({ url: z.string(), title: z.string() })),
})

// Schemas for what the writer agent expects
const WriterInput = z.object({
  keyPoints: z.array(z.string()),
})

// The handoff validates research output, extracts key points,
// and generates a prose summary the writer can reference
const researchToWriter = handoff({
  id: 'research-to-writer',
  inputSchema: ResearchOutput,
  outputSchema: WriterInput,
  transform: (input) => ({
    keyPoints: input.findings.slice(0, 5), // top 5 findings
  }),
  summarize: {
    generate: generateTextFn,
    model: openai('gpt-4o-mini'),
  },
})

const model = openai('gpt-4o')

const researchFlow = flow('research', async (flow) => {
  const plan = await flow.step('plan', () =>
    generate(researchPlanner, { model, input: { query: 'cloud migration strategies' } })
  )
  return flow.step('search', () =>
    generate(researchSearcher, { model, input: { plan: plan.object } })
  )
})

const writeFlow = flow('write', async (flow) => {
  await flow.step('draft', () =>
    generate(articleWriter, {
      model,
      input: {
        topic: 'cloud migration strategies',
        keyPoints: payload.data.keyPoints,
        researchSummary: payload.summary,
      },
    })
  )
})

await withSession(createSessionId(), async () => {
  // First flow: research
  const research = await researchFlow.run()

  // Handoff: validate research output → transform → generate summary
  const payload = await researchToWriter.prepare(research.object)
  // payload.data    → { keyPoints: string[] }  (validated + transformed)
  // payload.summary → string                   (LLM-generated prose summary)

  // Second flow: writing, with handoff context injected
  await writeFlow.run()
})

The research flow produces raw findings, the handoff ensures they match the writer's expectations (validating with Zod, transforming, optionally summarizing), and the write flow consumes clean data.

Delegate as a Step

A delegate wraps a subagent with handoff validation. Use .run() inside a step when one step of your flow should execute a full subagent pipeline — the delegate handles args validation, execution, and output transformation in one call.

import { z } from 'zod'
import { flow } from '@crux/core'
import { handoff, delegate } from '@crux/core/agent'
import { generate } from '@crux/ai'
import { openai } from '@ai-sdk/openai'

const model = openai('gpt-4o')

// Same handoff as above — validates/transforms research output for the writer
const researchToWriter = handoff({
  id: 'research-to-writer',
  inputSchema: z.object({
    findings: z.array(z.string()),
    sources: z.array(z.object({ url: z.string(), title: z.string() })),
  }),
  outputSchema: z.object({ keyPoints: z.array(z.string()) }),
  transform: (input) => ({ keyPoints: input.findings.slice(0, 5) }),
})

// The delegate wraps a multi-step research subagent:
// 1. Validates args (query string)
// 2. Runs the subagent (plan → search → synthesize)
// 3. Validates output through the handoff (findings → keyPoints)
const researchDelegate = delegate({
  id: 'delegate-research',
  argsSchema: z.object({ query: z.string() }),
  handoff: researchToWriter,
  execute: async (args) => {
    // This is the subagent — it can be as simple or complex as needed.
    // It must return data matching the handoff's inputSchema.
    const plan = await generate(researchPlanner, {
      model,
      input: { query: args.query },
    })
    const results = await generate(researchSearcher, {
      model,
      input: { plan: plan.object },
    })
    return results.object // { findings: string[], sources: { url, title }[] }
  },
})

const contentPipeline = flow('content-pipeline', async (flow) => {
  // Delegate runs the full research subagent, validates output through the handoff
  const research = await flow.step(
    'research',
    () => researchDelegate.run({ query: 'landing page best practices' }),
    { retry: { attempts: 2, delay: 2000 } },
  )
  // research.data → { keyPoints: string[] }  (handoff-transformed)

  await flow.step('write', () =>
    generate(articleWriter, {
      model,
      input: { keyPoints: research.data.keyPoints },
    })
  )
})

await contentPipeline.run()

The delegate's three-layer validation (args → subagent output → handoff transform) runs inside the step. If the subagent returns data that doesn't match the handoff's inputSchema, the step fails and retry kicks in automatically.

Blackboard Across Steps

A blackboard is shared typed state that multiple steps can read and write. Use it when steps need to coordinate beyond simple data threading — for example, when a later step's behavior depends on an earlier step's confidence score.

import { z } from 'zod'
import { flow } from '@crux/core'
import { blackboard } from '@crux/core/agent'
import { generate } from '@crux/ai'
import { openai } from '@ai-sdk/openai'

const model = openai('gpt-4o-mini')
const premiumModel = openai('gpt-4o')

const board = blackboard({
  id: 'research-state',
  schema: z.object({
    findings: z.array(z.string()),
    confidence: z.number(),
    needsMoreResearch: z.boolean(),
  }),
})

const adaptiveResearch = flow('adaptive-research', async (flow) => {
  // First pass: quick search with a cheap model
  await flow.step('initial-search', async () => {
    const result = await generate(researchSearcher, {
      model,
      input: { query: 'React server components best practices' },
    })
    await board.patch({
      findings: result.object.findings,
      confidence: result.object.confidence,
      needsMoreResearch: result.object.confidence < 0.7,
    })
  })

  // Conditional step — only runs if the first pass wasn't confident enough
  if (await board.get('needsMoreResearch')) {
    await flow.step('deep-search', async () => {
      const priorFindings = await board.get('findings')
      const result = await generate(deepResearchSearcher, {
        model: premiumModel,
        input: { query: 'React server components best practices', priorFindings },
      })
      await board.patch({
        findings: result.object.findings,
        confidence: 1.0,
        needsMoreResearch: false,
      })
    })
  }

  // Final step always runs — reads whatever findings are on the board
  return flow.step('synthesize', async () => {
    const findings = await board.get('findings')
    return generate(researchSynthesizer, { model, input: { findings } })
  })
})

await adaptiveResearch.run()

You can also inject the blackboard directly into a prompt with use: [board]. Crux will add the current board state as context and expose focused tools so the agent can read or update it during generation. Use board.asContext() only when you want context without tools.

All agent primitive events — blackboard updates, handoff preparations, delegate executions — are automatically correlated to the current flow step in devtools via observability context. No extra wiring needed.

On this page