Server-Side Integration
SSE handlers and AI SDK stream writers for delivering plan and task updates to clients.
The Problem
Plan and task updates happen on the server — inside agent tool calls, pipeline steps, and background jobs. The client needs to receive these updates in real time for reactive UI. You need a transport layer between server-side CruxStore mutations and client-side hooks.
See the Quick Start for CruxStore setup (inMemoryCruxStore() for dev, cruxConvexStore() for production).
SSE Handler
cruxSSEHandler() creates a Server-Sent Events endpoint that subscribes to CruxStore changes and streams them to connected clients. Works with any framework that handles (request: Request) => Response.
import { cruxSSEHandler } from '@crux/react/server'
const handler = cruxSSEHandler({
store,
prefix: '', // filter by key prefix, or '' for all events
})
export async function GET(request: Request) {
return handler(request)
}The handler sends data-crux events matching the wire format:
event: data-crux
data: {"entity":"plan","key":"plan-123","value":{"title":"My Plan","version":2},"event":"updated"}Prefix Filtering
Use the prefix option to scope which events are sent to a given client:
// Only send events for a specific session
cruxSSEHandler({ store, prefix: `session:${sessionId}:` })Client Setup
Connect with createSSETransport:
import { CruxProvider, createSSETransport } from '@crux/react'
const transport = createSSETransport('/api/crux/sse')
function App() {
return (
<CruxProvider transport={transport}>
<YourApp />
</CruxProvider>
)
}AI SDK Stream Writer
createCruxStreamWriter() injects plan and task updates into an active AI SDK UIMessageStream. Updates are sent as data-crux data parts alongside the LLM's text output.
import { createCruxStreamWriter } from '@crux/ai/stream'
export async function POST(request: Request) {
return createUIMessageStreamResponse({
execute: async (writer) => {
const cruxWriter = createCruxStreamWriter(writer, store)
// cruxWriter subscribes to store changes and writes data parts
// Run your agent — any plan/task mutations will be streamed to the client
await generate(prompt, { model, input })
cruxWriter.close()
},
})
}Client Setup
On the client, use createStreamTransport() to consume data-crux parts from the stream:
import { CruxProvider } from '@crux/react'
import { createStreamTransport } from '@crux/ai/stream'
import { useChat } from 'ai/react'
function ChatWithTasks() {
const transport = createStreamTransport()
const { messages } = useChat({
api: '/api/chat',
onData: (part) => transport.ingest(part), // connects stream to transport
})
return (
<CruxProvider transport={transport}>
<Messages messages={messages} />
<TaskProgress taskListId={currentTaskListId} />
</CruxProvider>
)
}Wire Format
Both SSE and stream transports use the same data-crux wire format:
interface CruxDataPart {
entity: 'plan' | 'tasklist' | 'task'
key: string
value: Record<string, unknown>
event: 'created' | 'updated' | 'completed' | 'discarded' | 'added' | 'removed'
}Debouncing
Progress updates (reportProgress) are rate-limited to avoid flooding the transport with high-frequency updates. The default debounce window is transport-specific — SSE uses a 100ms window, stream writer uses the stream's own buffering.
When using the AI SDK stream transport, data-crux parts are only available during an active stream. For persistent reactivity between streams, use SSE or Convex transport instead.
Full Stream Example
Server and client wired together for a content writing pipeline:
import { createCruxStreamWriter } from '@crux/ai/stream'
import { createPlanTool, planAgent } from '@crux/core/plan'
import { tasklist, taskWorker } from '@crux/core/tasks'
export async function POST(request: Request) {
const { topic } = await request.json()
return createUIMessageStreamResponse({
execute: async (writer) => {
const cruxWriter = createCruxStreamWriter(writer, store)
// Phase 1: Create plan
const planTool = createPlanTool()
await generate(
prompt({ system: 'Create a plan.', tools: { createPlan: planTool } }),
{ model, input: { topic } },
)
// Phase 2: Create tasks and execute
const handle = await tasklist({ planId: planTool.created!.id })
await handle.addTask({ id: 'intro', label: 'Write intro' })
await handle.addTask({ id: 'body', label: 'Write body' })
const planAgent = planAgent(planTool.created!.id)
for (const task of await handle.getTasks()) {
const worker = taskWorker(handle.id, task.id)
await generate(
prompt({
use: [planAgent.asContext(), worker.asContext()],
tools: worker.asTools(),
system: 'Write the section.',
}),
{ model, input: {} },
)
}
cruxWriter.close()
},
})
}import { CruxProvider } from '@crux/react'
import { createStreamTransport } from '@crux/ai/stream'
import { usePlan, useTaskList, useTasks } from '@crux/react'
function WritePage() {
const transport = createStreamTransport()
// Start the writing process — response stream feeds the transport
const startWriting = async (topic: string) => {
const response = await fetch('/api/write', {
method: 'POST',
body: JSON.stringify({ topic }),
})
// Feed stream data into transport for reactive hooks
const reader = response.body?.getReader()
if (reader) {
while (true) {
const { done, value } = await reader.read()
if (done) break
transport.ingest(new TextDecoder().decode(value))
}
}
}
return (
<CruxProvider transport={transport}>
<WriteProgress />
</CruxProvider>
)
}