Crux
GuidesPlans & Tasks

Pipeline & Flow Patterns

Orchestrating plans and tasks in pipelines, flows, and parallel fan-out patterns.

The Problem

The plan-task-worker lifecycle spans multiple steps: create a plan, create tasks, dispatch workers, collect results. Wiring these steps requires passing data between them — the plan ID from step 1 feeds step 2, the task list handle feeds step 3. Without structure, this becomes a mess of global variables and ad-hoc closures.

The .created Pattern

Creation tools (createPlanTool, createTaskListTool) capture the entity they create on a .created property. This enables clean inter-step data flow:

const planTool = createPlanTool()
await generate(plannerPrompt, { model, input })

// After the LLM calls the tool, .created holds the PlanHandle
const planId = planTool.created!.id

The onCreated callback provides an alternative for immediate reactions:

let planId: string
const planTool = createPlanTool({
  onCreated: (handle) => {
    planId = handle.id
  },
})

Pipeline Pattern

Use pipeline() from your SDK adapter to chain agent steps with typed data flow:

content-pipeline.ts
import { pipeline, generate } from '@crux/ai'
import { createPlanTool, planAgent } from '@crux/core/plan'
import { tasklist, taskWorker } from '@crux/core/tasks'
import { openai } from '@ai-sdk/openai'

const model = openai('gpt-4o') // or any AI SDK model

const planTool = createPlanTool({
  template: '## Goal\n[objective]\n\n## Sections\n1. [section]',
})

const result = await pipeline({
  steps: [
    {
      agent: planner,
      name: 'plan',
      // planner has createPlan tool — creates the plan
    },
    {
      agent: orchestrator,
      name: 'orchestrate',
      input: async () => {
        // Access the plan created in the previous step
        const planAgent = planAgent(planTool.created!.id)
        const handle = await tasklist({ planId: planTool.created!.id })
        return { planAgent, handle }
      },
    },
  ],
  context: { topic: 'TypeScript monorepos' },
  model,
})

See Pipeline deep dive for full pipeline documentation.

flow Pattern

flow() provides named steps with retries, fallbacks, and devtools tracing. Define the flow once at module level and call .run() to execute:

content-flow.ts
import { flow } from '@crux/core'
import { generate } from '@crux/ai'
import { createPlanTool, planAgent } from '@crux/core/plan'
import { tasklist, taskWorker } from '@crux/core/tasks'

const contentWriting = flow('content-writing', async (flow) => {
  // Step 1: Create plan
  const planTool = createPlanTool({
    template: '## Goal\n[objective]\n\n## Sections\n1. [section]',
  })

  await flow.step('plan', () =>
    generate(prompt({ system: 'Create a plan.', tools: { createPlan: planTool } }), { model, input: { topic } }),
  )

  const planAgent = planAgent(planTool.created!.id)

  // Step 2: Create task list
  const handle = await flow.step('setup-tasks', async () => {
    const h = await tasklist({ planId: planTool.created!.id })
    await h.addTask({ id: 'intro', label: 'Write introduction' })
    await h.addTask({ id: 'body', label: 'Write main body' })
    await h.addTask({ id: 'conclusion', label: 'Write conclusion' })
    return h
  })

  // Step 3: Execute tasks (with per-step retry)
  for (const task of await handle.getTasks()) {
    await flow.step(
      `write-${task.id}`,
      () => {
        const worker = taskWorker(handle.id, task.id)
        return generate(
          prompt({
            use: [planAgent.asContext(), worker.asContext()],
            tools: worker.asTools(),
            system: 'Write the section.',
          }),
          { model, input: {} },
        )
      },
      { retry: { attempts: 2, backoff: 'exponential' } },
    )
  }
})

await contentWriting.run()

See Flows deep dive for full flow documentation.

Convex apps: Use flow() from @crux/convex/server. It exposes .action, .handler, .args, and .signal(ctx, actionRef, ...) while preserving Crux observability across action boundaries.

Parallel Fan-Out

Use Promise.all or parallel() to run workers concurrently:

parallel-workers.ts
import { parallel, generate } from '@crux/ai'
import { taskWorker } from '@crux/core/tasks'

const tasks = await handle.getTasks()

// Simple Promise.all
await Promise.all(
  tasks.map(async (task) => {
    const worker = taskWorker(handle.id, task.id)
    await generate(
      prompt({
        use: [planAgent.asContext(), worker.asContext()],
        tools: worker.asTools(),
        system: 'Complete your assigned task.',
      }),
      { model, input: {} },
    )
  }),
)

For typed results from parallel agents, use parallel():

import { parallel } from '@crux/ai'

const { results } = await parallel({
  agents: {
    research: researchAgent,
    outline: outlineAgent,
    keywords: keywordAgent,
  },
  context: { topic },
  model,
})

// Typed access to each agent's output
results.research.output // typed from researchAgent's output schema
results.outline.output // typed from outlineAgent's output schema

See Parallel deep dive for full documentation.

Devtools Integration

Plans and tasks created inside flows and pipelines are automatically traced:

  • Each flow.step() creates an OpenTelemetry span
  • Plan creation and task mutations within a step are nested under that span
  • The devtools dashboard shows the full hierarchy: flow > step > plan/task events

This means you can see exactly which step created a plan, which step added tasks, and which worker completed each task.

Plan and task events include trace correlation metadata automatically. You don't need to configure anything beyond enabling devtools.

On this page