Crux
GuidesPlans & Tasks

Server-Side Integration

SSE handlers and AI SDK stream writers for delivering plan and task updates to clients.

The Problem

Plan and task updates happen on the server — inside agent tool calls, pipeline steps, and background jobs. The client needs to receive these updates in real time for reactive UI. You need a transport layer between server-side CruxStore mutations and client-side hooks.

See the Quick Start for CruxStore setup (inMemoryCruxStore() for dev, cruxConvexStore() for production).

SSE Handler

cruxSSEHandler() creates a Server-Sent Events endpoint that subscribes to CruxStore changes and streams them to connected clients. Works with any framework that handles (request: Request) => Response.

app/api/crux/sse/route.ts
import { cruxSSEHandler } from '@crux/react/server'

const handler = cruxSSEHandler({
  store,
  prefix: '', // filter by key prefix, or '' for all events
})

export async function GET(request: Request) {
  return handler(request)
}

The handler sends data-crux events matching the wire format:

event: data-crux
data: {"entity":"plan","key":"plan-123","value":{"title":"My Plan","version":2},"event":"updated"}

Prefix Filtering

Use the prefix option to scope which events are sent to a given client:

// Only send events for a specific session
cruxSSEHandler({ store, prefix: `session:${sessionId}:` })

Client Setup

Connect with createSSETransport:

import { CruxProvider, createSSETransport } from '@crux/react'

const transport = createSSETransport('/api/crux/sse')

function App() {
  return (
    <CruxProvider transport={transport}>
      <YourApp />
    </CruxProvider>
  )
}

AI SDK Stream Writer

createCruxStreamWriter() injects plan and task updates into an active AI SDK UIMessageStream. Updates are sent as data-crux data parts alongside the LLM's text output.

app/api/chat/route.ts
import { createCruxStreamWriter } from '@crux/ai/stream'

export async function POST(request: Request) {
  return createUIMessageStreamResponse({
    execute: async (writer) => {
      const cruxWriter = createCruxStreamWriter(writer, store)

      // cruxWriter subscribes to store changes and writes data parts
      // Run your agent — any plan/task mutations will be streamed to the client
      await generate(prompt, { model, input })

      cruxWriter.close()
    },
  })
}

Client Setup

On the client, use createStreamTransport() to consume data-crux parts from the stream:

chat-with-tasks.tsx
import { CruxProvider } from '@crux/react'
import { createStreamTransport } from '@crux/ai/stream'
import { useChat } from 'ai/react'

function ChatWithTasks() {
  const transport = createStreamTransport()

  const { messages } = useChat({
    api: '/api/chat',
    onData: (part) => transport.ingest(part), // connects stream to transport
  })

  return (
    <CruxProvider transport={transport}>
      <Messages messages={messages} />
      <TaskProgress taskListId={currentTaskListId} />
    </CruxProvider>
  )
}

Wire Format

Both SSE and stream transports use the same data-crux wire format:

interface CruxDataPart {
  entity: 'plan' | 'tasklist' | 'task'
  key: string
  value: Record<string, unknown>
  event: 'created' | 'updated' | 'completed' | 'discarded' | 'added' | 'removed'
}

Debouncing

Progress updates (reportProgress) are rate-limited to avoid flooding the transport with high-frequency updates. The default debounce window is transport-specific — SSE uses a 100ms window, stream writer uses the stream's own buffering.

When using the AI SDK stream transport, data-crux parts are only available during an active stream. For persistent reactivity between streams, use SSE or Convex transport instead.

Full Stream Example

Server and client wired together for a content writing pipeline:

app/api/write/route.ts
import { createCruxStreamWriter } from '@crux/ai/stream'
import { createPlanTool, planAgent } from '@crux/core/plan'
import { tasklist, taskWorker } from '@crux/core/tasks'

export async function POST(request: Request) {
  const { topic } = await request.json()

  return createUIMessageStreamResponse({
    execute: async (writer) => {
      const cruxWriter = createCruxStreamWriter(writer, store)

      // Phase 1: Create plan
      const planTool = createPlanTool()
      await generate(
        prompt({ system: 'Create a plan.', tools: { createPlan: planTool } }),
        { model, input: { topic } },
      )

      // Phase 2: Create tasks and execute
      const handle = await tasklist({ planId: planTool.created!.id })
      await handle.addTask({ id: 'intro', label: 'Write intro' })
      await handle.addTask({ id: 'body', label: 'Write body' })

      const planAgent = planAgent(planTool.created!.id)
      for (const task of await handle.getTasks()) {
        const worker = taskWorker(handle.id, task.id)
        await generate(
          prompt({
            use: [planAgent.asContext(), worker.asContext()],
            tools: worker.asTools(),
            system: 'Write the section.',
          }),
          { model, input: {} },
        )
      }

      cruxWriter.close()
    },
  })
}
write-page.tsx
import { CruxProvider } from '@crux/react'
import { createStreamTransport } from '@crux/ai/stream'
import { usePlan, useTaskList, useTasks } from '@crux/react'

function WritePage() {
  const transport = createStreamTransport()

  // Start the writing process — response stream feeds the transport
  const startWriting = async (topic: string) => {
    const response = await fetch('/api/write', {
      method: 'POST',
      body: JSON.stringify({ topic }),
    })
    // Feed stream data into transport for reactive hooks
    const reader = response.body?.getReader()
    if (reader) {
      while (true) {
        const { done, value } = await reader.read()
        if (done) break
        transport.ingest(new TextDecoder().decode(value))
      }
    }
  }

  return (
    <CruxProvider transport={transport}>
      <WriteProgress />
    </CruxProvider>
  )
}

On this page