Crux
GuidesPlans & Tasks

Recipes

End-to-end recipes for plans and task lists — full pipelines, thread-scoped task tracking, and human-in-the-loop approval.

Full Pipeline: Plan → Tasks → Parallel Workers

The most common pattern: a planner agent creates a plan, an orchestrator breaks it into tasks, and worker agents execute each task in parallel — all wired together with pipeline() and parallel().

plan-task-pipeline.ts
import { createPlanTool } from '@crux/core/plan'
import { tasklist } from '@crux/core/tasks'
import { prompt, agent } from '@crux/core'
import { generate, pipeline, parallel } from '@crux/ai'
import { openai } from '@ai-sdk/openai'

const model = openai('gpt-4o')

// ── Step 1: Planner — LLM creates a structured plan ──────────────

const planTool = createPlanTool({
  template: `## Objective
[What we're creating and why]

## Target Audience
[Who this is for]

## Sections
1. [Section title — what it covers]

## Tone & Style
[Writing guidelines]`,
})

const planner = agent({
  id: 'planner',
  model,
  prompt: prompt({
    id: 'planner',
    system: `You are a content strategist. When given a topic, use the createPlan
tool to create a detailed, structured content plan. Be specific about each
section — the plan will be used by other agents to create the actual content.`,
    prompt: ({ input }) => `Create a content plan for: ${input.goal}`,
  }),
  tools: { createPlan: planTool },
})

// ── Step 2: Orchestrator — LLM breaks the plan into tasks ────────

const orchestratorPrompt = prompt({
  id: 'orchestrator',
  system: `You are a project manager. You've been given a content plan.
Use the addTask tool to create a task for each section in the plan.

For each task:
- Use a short, meaningful ID (e.g., 'write-intro', 'write-setup')
- Write a clear label describing the deliverable
- Include a description with specific guidance from the plan
- Assign to the 'writer' agent

Create one task per section. Do NOT create meta-tasks like "review" or "publish".`,
  prompt: ({ input }) => `Break this plan into tasks:\n\n${input.planContent}`,
})

// ── Step 3: Worker prompt — each worker gets plan + task context ──

const workerPrompt = prompt({
  id: 'worker',
  // use: [] is set dynamically per task (see step 3 below)
  system: `You are a technical writer. Complete the section described in your
assignment. Follow the plan for context, audience, and tone.

Guidelines:
- Call startTask when you begin writing
- Call reportProgress with a brief summary after each paragraph
- Call completeTask with your finished section when done
- Call failTask if you cannot complete the work (explain why)

Write in markdown. Be thorough but concise.`,
  prompt: () => 'Write the section described in your assignment.',
})

// ── Run the pipeline ─────────────────────────────────────────────

let taskHandle: Awaited<ReturnType<typeof tasklist>>

const result = await pipeline({
  context: { goal: 'Write a comprehensive guide about TypeScript monorepos' },
  steps: [
    // ╔════════════════════════════════════════════════╗
    // ║ Step 1: Planner creates the plan              ║
    // ║ The LLM calls createPlan tool automatically   ║
    // ║ planTool.created holds the PlanHandle after    ║
    // ╚════════════════════════════════════════════════╝
    { name: 'plan', agent: planner },

    // ╔════════════════════════════════════════════════╗
    // ║ Step 2: Orchestrator reads plan, creates tasks ║
    // ║ Uses fn step so we can create the task list    ║
    // ║ first, then give the LLM the addTask tool     ║
    // ╚════════════════════════════════════════════════╝
    {
      name: 'tasks',
      fn: async () => {
        const p = planTool.created!

        // Create a task list linked to the plan
        taskHandle = await tasklist({ planId: p.id })

        // Give the orchestrator only the addTask tool —
        // it shouldn't remove or discard, just create tasks
        const { addTask } = taskHandle.asTools()

        await generate(orchestratorPrompt, {
          model,
          input: { planContent: p.content },
          // Inject the full plan as context so the LLM can reference it
          use: [p.asContext()],
          tools: { addTask },
          // Allow multiple tool calls — the LLM creates several tasks
          maxSteps: 10,
        })

        const tasks = await taskHandle.getTasks()
        console.log(`Created ${tasks.length} tasks`)
        return { taskCount: tasks.length, taskListId: taskHandle.id }
      },
    },

    // ╔════════════════════════════════════════════════╗
    // ║ Step 3: Workers execute all tasks in parallel  ║
    // ║ Each worker gets the plan context + its own    ║
    // ║ task assignment + lifecycle tools              ║
    // ╚════════════════════════════════════════════════╝
    {
      name: 'execute',
      fn: async () => {
        const p = planTool.created!
        const tasks = await taskHandle.getTasks()

        // Create a worker agent for each task.
        // Each agent gets:
        //   - Plan context (the overall goal and structure)
        //   - Worker context (its specific assignment + guidelines)
        //   - Worker tools (startTask, reportProgress, completeTask, failTask)
        const workerAgents = Object.fromEntries(
          tasks.map((task) => {
            const worker = taskHandle.worker(task.id)

            return [
              task.id,
              agent({
                id: `worker-${task.id}`,
                model,
                prompt: prompt({
                  id: `task-${task.id}`,
                  // Both contexts compose in the system message:
                  // 1. Plan context: "## Plan: TypeScript Monorepos (v1) ..."
                  // 2. Worker context: "## Your Assignment\n**Task:** Write intro ..."
                  use: [p.asContext(), worker.asContext()],
                  system: workerPrompt.system,
                  prompt: workerPrompt.prompt,
                }),
                // Worker tools have taskId bound — the LLM just calls
                // startTask(), reportProgress(), completeTask() with no ID
                tools: worker.asTools(),
              }),
            ]
          }),
        )

        // Fan out all workers in parallel — they execute simultaneously
        const parallelResult = await parallel({
          context: {},
          agents: workerAgents,
          onError: 'continue', // Don't fail the whole pipeline if one task fails
        })

        console.log(`Completed ${Object.keys(parallelResult.results).length} tasks`)
        return parallelResult
      },
    },
  ],
})

// ── After the pipeline ───────────────────────────────────────────

const finalStatus = await taskHandle.getStatus()
console.log(`Pipeline complete — task list status: ${finalStatus}`)
// → 'completed' (auto-derived from all tasks being completed)

// Access individual task results
const tasks = await taskHandle.getTasks()
for (const task of tasks) {
  console.log(`${task.id}: ${task.status}`)
  if (task.result) console.log(`  Result: ${JSON.stringify(task.result).slice(0, 100)}...`)
  if (task.error) console.log(`  Error: ${task.error}`)
}

How data flows between steps:

  • Step 1 (planner): The LLM calls createPlanplanTool.created holds the PlanHandle
  • Step 2 (orchestrator): Reads planTool.created via closure → creates task list → LLM calls addTask multiple times
  • Step 3 (workers): Reads planTool.created and taskHandle via closures → fans out workers with parallel()

No manual ID extraction from tool calls. No shared state object. Just closures over the creation tools.

Variation: flow instead of pipeline

The same pattern works with flow() — the difference is that you call generate() directly instead of defining agents:

plan-task-flow.ts
import { generate } from '@crux/ai'
import { flow } from '@crux/core'

const contentPipeline = flow('content-pipeline', async (flow) => {
  // Step 1: Plan
  const planTool = createPlanTool({ template: '...' })
  await flow.step('plan', () => generate(plannerPrompt, { model, tools: { createPlan: planTool } }))

  // Step 2: Tasks
  const p = planTool.created!
  const handle = await tasklist({ planId: p.id })
  const { addTask } = handle.asTools()

  await flow.step('tasks', () =>
    generate(orchestratorPrompt, {
      model,
      input: { planContent: p.content },
      use: [p.asContext()],
      tools: { addTask },
      maxSteps: 10,
    }),
  )

  // Step 3: Parallel workers
  await flow.step('execute', async () => {
    const tasks = await handle.getTasks()
    await Promise.all(
      tasks.map(async (task) => {
        const worker = handle.worker(task.id)
        await generate(
          prompt({
            use: [p.asContext(), worker.asContext()],
            system: 'Write your assigned section. Use the task tools to track progress.',
            prompt: () => 'Execute your assignment.',
          }),
          { model, tools: worker.asTools() },
        )
      }),
    )
  })
})

await contentPipeline.run()

Task Lists Without Plans

Track work for a conversation thread without any plan association:

thread-tasks.ts
import { tasklist, taskListAgent } from '@crux/core/tasks'

const handle = await tasklist({
  metadata: { threadId: 'thread-abc', userRequest: 'Set up my new project' },
})

await handle.addTask({ id: 'scaffold', label: 'Create project structure' })
await handle.addTask({ id: 'deps', label: 'Install dependencies' })
await handle.addTask({ id: 'config', label: 'Configure TypeScript and ESLint' })
await handle.addTask({ id: 'test', label: 'Set up test framework' })

// Give the agent management tools
const taskAgent = taskListAgent(handle.id)

await generate(
  prompt({
    use: [taskAgent.asContext()],
    tools: taskAgent.asTools(),
    system: 'Complete each task in order. Use the task tools to track your progress.',
  }),
  { model, input: {} },
)

Approval Workflow

Use plan metadata to implement human-in-the-loop approval before execution:

approval-flow.ts
import { plan, updatePlan, getPlan } from '@crux/core/plan'
import { tasklist, taskWorker } from '@crux/core/tasks'

// Agent creates a plan
const p = await plan({
  title: 'Blog Post: AI Agent Patterns',
  content: '## Goal\nWrite a technical blog post about...',
  metadata: { approval: 'pending' },
})

// ... UI shows plan for review (see reactive-ui.mdx) ...

// Human approves
await updatePlan(p.id, {
  metadata: {
    approval: 'approved',
    approvedBy: 'henri',
    approvedAt: Date.now(),
    approvedVersion: p.version,
  },
})

// Agent checks approval and version before executing
const current = await getPlan(p.id)
if (current?.metadata?.approval !== 'approved') {
  throw new Error('Plan not approved')
}
if (current.version !== current.metadata.approvedVersion) {
  throw new Error('Plan modified after approval — needs re-review')
}

// Proceed with task creation and execution...
const handle = await tasklist({ planId: p.id })
// ...
  • Best practices — heuristics for naming, scoping, and tool selection that apply across these recipes
  • Plans, Tasks, Workers — the underlying primitives

On this page