GuidesPlans & Tasks
Recipes
End-to-end recipes for plans and task lists — full pipelines, thread-scoped task tracking, and human-in-the-loop approval.
Full Pipeline: Plan → Tasks → Parallel Workers
The most common pattern: a planner agent creates a plan, an orchestrator breaks it into tasks, and worker agents execute each task in parallel — all wired together with pipeline() and parallel().
import { createPlanTool } from '@crux/core/plan'
import { tasklist } from '@crux/core/tasks'
import { prompt, agent } from '@crux/core'
import { generate, pipeline, parallel } from '@crux/ai'
import { openai } from '@ai-sdk/openai'
const model = openai('gpt-4o')
// ── Step 1: Planner — LLM creates a structured plan ──────────────
const planTool = createPlanTool({
template: `## Objective
[What we're creating and why]
## Target Audience
[Who this is for]
## Sections
1. [Section title — what it covers]
## Tone & Style
[Writing guidelines]`,
})
const planner = agent({
id: 'planner',
model,
prompt: prompt({
id: 'planner',
system: `You are a content strategist. When given a topic, use the createPlan
tool to create a detailed, structured content plan. Be specific about each
section — the plan will be used by other agents to create the actual content.`,
prompt: ({ input }) => `Create a content plan for: ${input.goal}`,
}),
tools: { createPlan: planTool },
})
// ── Step 2: Orchestrator — LLM breaks the plan into tasks ────────
const orchestratorPrompt = prompt({
id: 'orchestrator',
system: `You are a project manager. You've been given a content plan.
Use the addTask tool to create a task for each section in the plan.
For each task:
- Use a short, meaningful ID (e.g., 'write-intro', 'write-setup')
- Write a clear label describing the deliverable
- Include a description with specific guidance from the plan
- Assign to the 'writer' agent
Create one task per section. Do NOT create meta-tasks like "review" or "publish".`,
prompt: ({ input }) => `Break this plan into tasks:\n\n${input.planContent}`,
})
// ── Step 3: Worker prompt — each worker gets plan + task context ──
const workerPrompt = prompt({
id: 'worker',
// use: [] is set dynamically per task (see step 3 below)
system: `You are a technical writer. Complete the section described in your
assignment. Follow the plan for context, audience, and tone.
Guidelines:
- Call startTask when you begin writing
- Call reportProgress with a brief summary after each paragraph
- Call completeTask with your finished section when done
- Call failTask if you cannot complete the work (explain why)
Write in markdown. Be thorough but concise.`,
prompt: () => 'Write the section described in your assignment.',
})
// ── Run the pipeline ─────────────────────────────────────────────
let taskHandle: Awaited<ReturnType<typeof tasklist>>
const result = await pipeline({
context: { goal: 'Write a comprehensive guide about TypeScript monorepos' },
steps: [
// ╔════════════════════════════════════════════════╗
// ║ Step 1: Planner creates the plan ║
// ║ The LLM calls createPlan tool automatically ║
// ║ planTool.created holds the PlanHandle after ║
// ╚════════════════════════════════════════════════╝
{ name: 'plan', agent: planner },
// ╔════════════════════════════════════════════════╗
// ║ Step 2: Orchestrator reads plan, creates tasks ║
// ║ Uses fn step so we can create the task list ║
// ║ first, then give the LLM the addTask tool ║
// ╚════════════════════════════════════════════════╝
{
name: 'tasks',
fn: async () => {
const p = planTool.created!
// Create a task list linked to the plan
taskHandle = await tasklist({ planId: p.id })
// Give the orchestrator only the addTask tool —
// it shouldn't remove or discard, just create tasks
const { addTask } = taskHandle.asTools()
await generate(orchestratorPrompt, {
model,
input: { planContent: p.content },
// Inject the full plan as context so the LLM can reference it
use: [p.asContext()],
tools: { addTask },
// Allow multiple tool calls — the LLM creates several tasks
maxSteps: 10,
})
const tasks = await taskHandle.getTasks()
console.log(`Created ${tasks.length} tasks`)
return { taskCount: tasks.length, taskListId: taskHandle.id }
},
},
// ╔════════════════════════════════════════════════╗
// ║ Step 3: Workers execute all tasks in parallel ║
// ║ Each worker gets the plan context + its own ║
// ║ task assignment + lifecycle tools ║
// ╚════════════════════════════════════════════════╝
{
name: 'execute',
fn: async () => {
const p = planTool.created!
const tasks = await taskHandle.getTasks()
// Create a worker agent for each task.
// Each agent gets:
// - Plan context (the overall goal and structure)
// - Worker context (its specific assignment + guidelines)
// - Worker tools (startTask, reportProgress, completeTask, failTask)
const workerAgents = Object.fromEntries(
tasks.map((task) => {
const worker = taskHandle.worker(task.id)
return [
task.id,
agent({
id: `worker-${task.id}`,
model,
prompt: prompt({
id: `task-${task.id}`,
// Both contexts compose in the system message:
// 1. Plan context: "## Plan: TypeScript Monorepos (v1) ..."
// 2. Worker context: "## Your Assignment\n**Task:** Write intro ..."
use: [p.asContext(), worker.asContext()],
system: workerPrompt.system,
prompt: workerPrompt.prompt,
}),
// Worker tools have taskId bound — the LLM just calls
// startTask(), reportProgress(), completeTask() with no ID
tools: worker.asTools(),
}),
]
}),
)
// Fan out all workers in parallel — they execute simultaneously
const parallelResult = await parallel({
context: {},
agents: workerAgents,
onError: 'continue', // Don't fail the whole pipeline if one task fails
})
console.log(`Completed ${Object.keys(parallelResult.results).length} tasks`)
return parallelResult
},
},
],
})
// ── After the pipeline ───────────────────────────────────────────
const finalStatus = await taskHandle.getStatus()
console.log(`Pipeline complete — task list status: ${finalStatus}`)
// → 'completed' (auto-derived from all tasks being completed)
// Access individual task results
const tasks = await taskHandle.getTasks()
for (const task of tasks) {
console.log(`${task.id}: ${task.status}`)
if (task.result) console.log(` Result: ${JSON.stringify(task.result).slice(0, 100)}...`)
if (task.error) console.log(` Error: ${task.error}`)
}How data flows between steps:
- Step 1 (planner): The LLM calls
createPlan→planTool.createdholds thePlanHandle - Step 2 (orchestrator): Reads
planTool.createdvia closure → creates task list → LLM callsaddTaskmultiple times - Step 3 (workers): Reads
planTool.createdandtaskHandlevia closures → fans out workers withparallel()
No manual ID extraction from tool calls. No shared state object. Just closures over the creation tools.
Variation: flow instead of pipeline
The same pattern works with flow() — the difference is that you call generate() directly instead of defining agents:
import { generate } from '@crux/ai'
import { flow } from '@crux/core'
const contentPipeline = flow('content-pipeline', async (flow) => {
// Step 1: Plan
const planTool = createPlanTool({ template: '...' })
await flow.step('plan', () => generate(plannerPrompt, { model, tools: { createPlan: planTool } }))
// Step 2: Tasks
const p = planTool.created!
const handle = await tasklist({ planId: p.id })
const { addTask } = handle.asTools()
await flow.step('tasks', () =>
generate(orchestratorPrompt, {
model,
input: { planContent: p.content },
use: [p.asContext()],
tools: { addTask },
maxSteps: 10,
}),
)
// Step 3: Parallel workers
await flow.step('execute', async () => {
const tasks = await handle.getTasks()
await Promise.all(
tasks.map(async (task) => {
const worker = handle.worker(task.id)
await generate(
prompt({
use: [p.asContext(), worker.asContext()],
system: 'Write your assigned section. Use the task tools to track progress.',
prompt: () => 'Execute your assignment.',
}),
{ model, tools: worker.asTools() },
)
}),
)
})
})
await contentPipeline.run()Task Lists Without Plans
Track work for a conversation thread without any plan association:
import { tasklist, taskListAgent } from '@crux/core/tasks'
const handle = await tasklist({
metadata: { threadId: 'thread-abc', userRequest: 'Set up my new project' },
})
await handle.addTask({ id: 'scaffold', label: 'Create project structure' })
await handle.addTask({ id: 'deps', label: 'Install dependencies' })
await handle.addTask({ id: 'config', label: 'Configure TypeScript and ESLint' })
await handle.addTask({ id: 'test', label: 'Set up test framework' })
// Give the agent management tools
const taskAgent = taskListAgent(handle.id)
await generate(
prompt({
use: [taskAgent.asContext()],
tools: taskAgent.asTools(),
system: 'Complete each task in order. Use the task tools to track your progress.',
}),
{ model, input: {} },
)Approval Workflow
Use plan metadata to implement human-in-the-loop approval before execution:
import { plan, updatePlan, getPlan } from '@crux/core/plan'
import { tasklist, taskWorker } from '@crux/core/tasks'
// Agent creates a plan
const p = await plan({
title: 'Blog Post: AI Agent Patterns',
content: '## Goal\nWrite a technical blog post about...',
metadata: { approval: 'pending' },
})
// ... UI shows plan for review (see reactive-ui.mdx) ...
// Human approves
await updatePlan(p.id, {
metadata: {
approval: 'approved',
approvedBy: 'henri',
approvedAt: Date.now(),
approvedVersion: p.version,
},
})
// Agent checks approval and version before executing
const current = await getPlan(p.id)
if (current?.metadata?.approval !== 'approved') {
throw new Error('Plan not approved')
}
if (current.version !== current.metadata.approvedVersion) {
throw new Error('Plan modified after approval — needs re-review')
}
// Proceed with task creation and execution...
const handle = await tasklist({ planId: p.id })
// ...Related
- Best practices — heuristics for naming, scoping, and tool selection that apply across these recipes
- Plans, Tasks, Workers — the underlying primitives