Crux
GuidesRetrieval & RAG

Syncing a Corpus

Keep an indexed retrieval corpus up to date without rewriting every source.

Use corpus() when indexing runs more than once.

indexer.indexDocuments() -> direct write
corpus.sync()            -> repeated ingestion job

A corpus wraps an indexer with a source ledger. The ledger stores source hashes, index fingerprints, status, chunk counts, pipeline stages, errors, and timestamps.

Basic Sync

import { corpus, indexer } from '@crux/core/indexing'
import { inMemoryDataStore, inMemoryVectorStore } from '@crux/core/storage'
import { filesSource } from '@crux/ingest/files'

const data = inMemoryDataStore()
const vectors = inMemoryVectorStore()

const docsIndexer = indexer({
  id: 'docs',
  namespace: 'product-docs',
  data,
  vectors,
  dense,
  sparse,
})

const docsCorpus = corpus({
  id: 'docs',
  namespace: 'product-docs',
  data,
  indexer: docsIndexer,
})

const source = filesSource(
  { directory: './docs', recursive: true },
  { namespace: 'product-docs' },
)

const result = await docsCorpus.sync(source.load(), {
  sourceSet: 'complete',
  stale: 'delete',
})

console.log(result)

Typical result:

const result = {
  added: 3,
  changed: 1,
  unchanged: 42,
  stale: 2,
  deleted: 2,
  failed: 0,
  chunkCount: 84,
}

Partial Updates

Use partial syncs for webhooks, CMS callbacks, file watchers, or any job that only sees some sources.

await docsCorpus.sync(changedFiles.load(), {
  sourceSet: 'partial',
})

Partial syncs can add and update sources. They do not delete missing sources because Crux cannot know whether those sources are actually gone.

Complete Syncs

Use complete syncs when the input is the full authoritative source set.

await docsCorpus.sync(allDocs.load(), {
  sourceSet: 'complete',
  stale: 'delete',
})

stale: 'delete' requires sourceSet: 'complete'. This fails fast:

await docsCorpus.sync(changedFiles.load(), {
  sourceSet: 'partial',
  stale: 'delete',
})

Crux throws instead of guessing.

Dry Run

Preview a sync without writing chunks or source records.

const plan = await docsCorpus.sync(source.load(), {
  sourceSet: 'complete',
  stale: 'delete',
  dryRun: true,
})

console.table(
  plan.sources.map((source) => ({
    sourceId: source.sourceId,
    action: source.action,
    reason: source.reason,
    chunks: source.chunkCount,
  })),
)

Use this for admin previews, deploy checks, and tests.

Append-Only

Use append-only mode when existing sources should not be rewritten by routine syncs.

await docsCorpus.sync(source.load(), {
  mode: 'appendOnly',
  sourceSet: 'partial',
})

Changed existing sources are reported as skipped:

const sourceResult = {
  sourceId: 'audit-log-2026-05-10',
  action: 'skipped',
  reason: 'appendOnly',
}

Pipeline Cache

If the indexer has cache: true, sync can control the pipeline cache per run.

await docsCorpus.sync(source.load(), {
  cache: 'readwrite',
})

Force recomputation:

await docsCorpus.sync(source.load(), {
  cache: 'refresh',
})

Bypass cache while debugging:

await docsCorpus.sync(source.load(), {
  cache: 'bypass',
})

Inspect A Source

const sourceRecord = await docsCorpus.getSource('guides/retrieval.md')

console.log(sourceRecord?.status)
console.log(sourceRecord?.chunkCount)
console.log(sourceRecord?.lastError)

The stage ledger shows what happened during the latest successful index:

for (const stage of sourceRecord?.stages ?? []) {
  console.log({
    name: stage.name,
    kind: stage.kind,
    cache: stage.cache,
    chunks: stage.chunkCount,
    parents: stage.parentCount,
    durationMs: stage.durationMs,
  })
}

Example output:

[
  {
    name: 'normalize-docs',
    kind: 'document-transform',
    cache: 'hit',
    durationMs: 1,
  },
  {
    name: 'structured',
    kind: 'chunker',
    cache: 'miss',
    chunks: 12,
    parents: 0,
    durationMs: 9,
  },
]

List Sources

const failed = await docsCorpus.listSources({ status: 'failed' })
const active = await docsCorpus.listSources({ includeDeleted: false })

Retry failed sources by loading the source IDs again:

await docsCorpus.sync(retryLoader(failed.map((source) => source.sourceId)).load(), {
  sourceSet: 'partial',
  cache: 'refresh',
})

Delete Sources

Delete one source:

await docsCorpus.deleteSource('old-guide.md')

Clear all active sources for a corpus:

await docsCorpus.clearSources()

What Counts As Changed

Crux compares the source hash and the index hash.

const docsCorpus = corpus({
  id: 'docs',
  namespace: 'product-docs',
  data,
  indexer: docsIndexer,
  hash: {
    excludeMetadata: ['mtimeMs', 'lastFetchedAt'],
  },
})

Changing source content reindexes the source. Changing stable metadata reindexes the source. Changing the indexer fingerprint also reindexes the source, so versioned pipeline changes can intentionally rebuild unchanged content.

Observability

Corpus sync emits:

corpus:sync:start
corpus:source:added | changed | unchanged | skipped | failed | stale | deleted
corpus:sync:end

Ingest parsing emits:

ingest:parse:start
ingest:parse:end

Indexing emits:

index:start
index:end

The same stage records flow through source records, devtools, CLI/TUI, and @crux/otel.

On this page