Crux
CookbookWorkflows

Task worker pool

A task list as a queue, taskWorker per task, agent loop draining tasks in parallel with a live UI.

This recipe builds a worker pool: a task list holds the work, you spawn an agent per todo task using taskWorker, and the agent itself uses startTask / completeTask tools to drive its lifecycle. A simple drainer runs N tasks in parallel; the UI subscribes to live status.

Primitives used

  • tasklist() from @crux/core/tasks for the persistent task list (supports addTask / updateTask / getTasks)
  • taskWorker(taskListId, taskId) — binds an agent's prompt to a single assigned task, exposing asContext() + asTools()
  • useTaskList() / useTasks() from @crux/react for the live UI

When to reach for this pattern

  • You have many similar pieces of work to run (batch classification, batch summarization, multi-document indexing)
  • You want each task to survive restartstasklist is persistent
  • You want a live UI showing per-task progress — done / in-progress / failed
  • You want the agent to drive its own lifecycle via tool calls (startTask, reportProgress, completeTask, failTask)

Full code

Backend — define the work and a per-task agent

import { tasklist } from '@crux/core/tasks'
import { taskWorker } from '@crux/core/tasks'
import { prompt } from '@crux/core'
import { agent } from '@crux/core/agent'
import { generate } from '@crux/ai'
import { openai } from '@ai-sdk/openai'
import { z } from 'zod'

// 1. Build the task list and seed it
export async function startBatch(documents: { id: string; text: string }[]) {
  const list = await tasklist({})
  for (const doc of documents) {
    await list.addTask({
      id: doc.id, // user-provided meaningful id
      label: `Summarize ${doc.id}`,
      description: doc.text.slice(0, 200),
    })
  }
  return list.id
}

// 2. The summarizer prompt — the worker context provides the assignment
const summarizePrompt = prompt({
  id: 'summarize-task',
  input: z.object({ text: z.string() }),
  system: `You are a summarization worker. Use your task tools to drive the lifecycle:
     1. Call startTask when you begin.
     2. Call completeTask with a summary when finished.
     3. Call failTask if you cannot complete the task.`,
  prompt: ({ input }) => `Summarize the following text:\n\n${input.text}`,
})

// 3. Drain the queue — one agent per todo task, N in parallel
export async function drainList(taskListId: string, documents: Map<string, string>, concurrency = 5) {
  const handle = createHandle(taskListId)
  const tasks = await handle.getTasks()
  const todos = tasks.filter((t) => t.status === 'todo')

  // Simple parallel drainer
  const queue = [...todos]
  const inflight = new Set<Promise<unknown>>()

  while (queue.length || inflight.size) {
    while (inflight.size < concurrency && queue.length) {
      const task = queue.shift()!
      const work = runOneTask(taskListId, task.id, documents.get(task.id) ?? '').finally(() => inflight.delete(work))
      inflight.add(work)
    }
    if (inflight.size) await Promise.race(inflight)
  }
}

async function runOneTask(taskListId: string, taskId: string, text: string) {
  const worker = taskWorker(taskListId, taskId)

  const reviewerAgent = agent({
    id: `worker-${taskId}`,
    prompt: summarizePrompt,
    tools: worker.asTools(), // startTask, reportProgress, completeTask, failTask
  })

  await generate(agent.prompt, {
    model: openai('gpt-4o-mini'),
    input: { text },
    use: [worker.asContext()], // injects the assignment + guidelines
    stopWhen: { stepCountIs: 5 },
  })
}

createHandle(taskListId) is the rehydrate helper from @crux/core/tasks. The worker's asTools() exposes startTask / reportProgress / completeTask / failTask bound to this specific taskId — the agent doesn't need to pass IDs around.

Live UI

'use client'
import { useTaskList, useTasks } from '@crux/react'

export function BatchProgress({ taskListId }: { taskListId: string }) {
  const list = useTaskList(taskListId)
  const tasks = useTasks(taskListId)

  if (!list) return null

  const counts = {
    todo: tasks.filter((t) => t.status === 'todo').length,
    in_progress: tasks.filter((t) => t.status === 'in_progress').length,
    completed: tasks.filter((t) => t.status === 'completed').length,
    failed: tasks.filter((t) => t.status === 'failed').length,
  }

  return (
    <div>
      <p>
        {counts.completed} / {tasks.length} done • {counts.in_progress} running • {counts.failed} failed
      </p>
      <ul>
        {tasks.map((t) => (
          <li key={t.id} className={`task task-${t.status}`}>
            {t.label} — <code>{t.status}</code>
            {t.progress && <p className="muted">{t.progress}</p>}
            {t.status === 'failed' && t.error && <pre>{t.error}</pre>}
          </li>
        ))}
      </ul>
    </div>
  )
}

How it works

  1. The tasklist is the queue. Each task has id (user-provided meaningful string), label, optional description, and tracks its own status and result. Persisted in CruxStore.
  2. taskWorker(taskListId, taskId) is per-task. It returns an object with asContext() (injects the assignment into the system prompt at priority 95 by default) and asTools() (startTask / reportProgress / completeTask / failTask, all bound to this taskId — no IDs in tool inputs).
  3. The agent drives its own lifecycle. The system prompt instructs the model to call startTask first, then completeTask (with the summary as the result) when done, or failTask on errors. The task's status updates as the model calls tools.
  4. The drainer is just a small loop. Crux doesn't bake in a "worker pool runner" — you write the parallelism that fits your runtime (Promise pool, p-limit, BullMQ, Convex scheduler, etc.). The list itself is the source of truth.

Variations

Different agents per task kind

If different tasks need different agents, dispatch on task metadata:

const task = await handle.getTasks().then((ts) => ts.find((t) => t.id === taskId))
const agent = task?.assignee?.agent === 'classifier' ? classifierAgent : summarizerAgent

The task list's addTask({ assignee: { agent: 'classifier' } }) field is meant for this.

Pause and resume

tasklist({}) plus addTask can be called from anywhere — including a webhook handler. The drainer can be re-invoked from a cron job; only todo tasks get picked up.

Cross-action (Convex)

Convex actions have a 5-minute timeout. Schedule one drainer action per N tasks via ctx.crux.scheduler.runAfter('run task', 0, internal.batch.runOne, { taskListId, taskId }) from a Crux-aware action — each task gets a fresh action timeout and the hidden __crux envelope keeps devtools traces correlated.

Where to next

On this page