Task worker pool
A task list as a queue, taskWorker per task, agent loop draining tasks in parallel with a live UI.
This recipe builds a worker pool: a task list holds the work, you spawn an agent per todo task using taskWorker, and the agent itself uses startTask / completeTask tools to drive its lifecycle. A simple drainer runs N tasks in parallel; the UI subscribes to live status.
Primitives used
tasklist()from@crux/core/tasksfor the persistent task list (supportsaddTask/updateTask/getTasks)taskWorker(taskListId, taskId)— binds an agent's prompt to a single assigned task, exposingasContext()+asTools()useTaskList()/useTasks()from@crux/reactfor the live UI
When to reach for this pattern
- You have many similar pieces of work to run (batch classification, batch summarization, multi-document indexing)
- You want each task to survive restarts —
tasklistis persistent - You want a live UI showing per-task progress — done / in-progress / failed
- You want the agent to drive its own lifecycle via tool calls (
startTask,reportProgress,completeTask,failTask)
Full code
Backend — define the work and a per-task agent
import { tasklist } from '@crux/core/tasks'
import { taskWorker } from '@crux/core/tasks'
import { prompt } from '@crux/core'
import { agent } from '@crux/core/agent'
import { generate } from '@crux/ai'
import { openai } from '@ai-sdk/openai'
import { z } from 'zod'
// 1. Build the task list and seed it
export async function startBatch(documents: { id: string; text: string }[]) {
const list = await tasklist({})
for (const doc of documents) {
await list.addTask({
id: doc.id, // user-provided meaningful id
label: `Summarize ${doc.id}`,
description: doc.text.slice(0, 200),
})
}
return list.id
}
// 2. The summarizer prompt — the worker context provides the assignment
const summarizePrompt = prompt({
id: 'summarize-task',
input: z.object({ text: z.string() }),
system: `You are a summarization worker. Use your task tools to drive the lifecycle:
1. Call startTask when you begin.
2. Call completeTask with a summary when finished.
3. Call failTask if you cannot complete the task.`,
prompt: ({ input }) => `Summarize the following text:\n\n${input.text}`,
})
// 3. Drain the queue — one agent per todo task, N in parallel
export async function drainList(taskListId: string, documents: Map<string, string>, concurrency = 5) {
const handle = createHandle(taskListId)
const tasks = await handle.getTasks()
const todos = tasks.filter((t) => t.status === 'todo')
// Simple parallel drainer
const queue = [...todos]
const inflight = new Set<Promise<unknown>>()
while (queue.length || inflight.size) {
while (inflight.size < concurrency && queue.length) {
const task = queue.shift()!
const work = runOneTask(taskListId, task.id, documents.get(task.id) ?? '').finally(() => inflight.delete(work))
inflight.add(work)
}
if (inflight.size) await Promise.race(inflight)
}
}
async function runOneTask(taskListId: string, taskId: string, text: string) {
const worker = taskWorker(taskListId, taskId)
const reviewerAgent = agent({
id: `worker-${taskId}`,
prompt: summarizePrompt,
tools: worker.asTools(), // startTask, reportProgress, completeTask, failTask
})
await generate(agent.prompt, {
model: openai('gpt-4o-mini'),
input: { text },
use: [worker.asContext()], // injects the assignment + guidelines
stopWhen: { stepCountIs: 5 },
})
}createHandle(taskListId) is the rehydrate helper from @crux/core/tasks. The worker's asTools() exposes startTask / reportProgress / completeTask / failTask bound to this specific taskId — the agent doesn't need to pass IDs around.
Live UI
'use client'
import { useTaskList, useTasks } from '@crux/react'
export function BatchProgress({ taskListId }: { taskListId: string }) {
const list = useTaskList(taskListId)
const tasks = useTasks(taskListId)
if (!list) return null
const counts = {
todo: tasks.filter((t) => t.status === 'todo').length,
in_progress: tasks.filter((t) => t.status === 'in_progress').length,
completed: tasks.filter((t) => t.status === 'completed').length,
failed: tasks.filter((t) => t.status === 'failed').length,
}
return (
<div>
<p>
{counts.completed} / {tasks.length} done • {counts.in_progress} running • {counts.failed} failed
</p>
<ul>
{tasks.map((t) => (
<li key={t.id} className={`task task-${t.status}`}>
{t.label} — <code>{t.status}</code>
{t.progress && <p className="muted">{t.progress}</p>}
{t.status === 'failed' && t.error && <pre>{t.error}</pre>}
</li>
))}
</ul>
</div>
)
}How it works
- The tasklist is the queue. Each task has
id(user-provided meaningful string),label, optionaldescription, and tracks its ownstatusandresult. Persisted inCruxStore. taskWorker(taskListId, taskId)is per-task. It returns an object withasContext()(injects the assignment into the system prompt at priority 95 by default) andasTools()(startTask / reportProgress / completeTask / failTask, all bound to this taskId — no IDs in tool inputs).- The agent drives its own lifecycle. The system prompt instructs the model to call
startTaskfirst, thencompleteTask(with the summary as theresult) when done, orfailTaskon errors. The task's status updates as the model calls tools. - The drainer is just a small loop. Crux doesn't bake in a "worker pool runner" — you write the parallelism that fits your runtime (Promise pool, p-limit, BullMQ, Convex scheduler, etc.). The list itself is the source of truth.
Variations
Different agents per task kind
If different tasks need different agents, dispatch on task metadata:
const task = await handle.getTasks().then((ts) => ts.find((t) => t.id === taskId))
const agent = task?.assignee?.agent === 'classifier' ? classifierAgent : summarizerAgentThe task list's addTask({ assignee: { agent: 'classifier' } }) field is meant for this.
Pause and resume
tasklist({}) plus addTask can be called from anywhere — including a webhook handler. The drainer can be re-invoked from a cron job; only todo tasks get picked up.
Cross-action (Convex)
Convex actions have a 5-minute timeout. Schedule one drainer action per N tasks via ctx.crux.scheduler.runAfter('run task', 0, internal.batch.runOne, { taskListId, taskId }) from a Crux-aware action — each task gets a fresh action timeout and the hidden __crux envelope keeps devtools traces correlated.