Syncing a Corpus
Keep an indexed retrieval corpus up to date without rewriting every source.
Use corpus() when indexing runs more than once.
indexer.indexDocuments() -> direct write
corpus.sync() -> repeated ingestion jobA corpus wraps an indexer with a source ledger. The ledger stores source hashes, index fingerprints, status, chunk counts, pipeline stages, errors, and timestamps.
Basic Sync
import { corpus, indexer } from '@crux/core/indexing'
import { inMemoryDataStore, inMemoryVectorStore } from '@crux/core/storage'
import { filesSource } from '@crux/ingest/files'
const data = inMemoryDataStore()
const vectors = inMemoryVectorStore()
const docsIndexer = indexer({
id: 'docs',
namespace: 'product-docs',
data,
vectors,
dense,
sparse,
})
const docsCorpus = corpus({
id: 'docs',
namespace: 'product-docs',
data,
indexer: docsIndexer,
})
const source = filesSource(
{ directory: './docs', recursive: true },
{ namespace: 'product-docs' },
)
const result = await docsCorpus.sync(source.load(), {
sourceSet: 'complete',
stale: 'delete',
})
console.log(result)Typical result:
const result = {
added: 3,
changed: 1,
unchanged: 42,
stale: 2,
deleted: 2,
failed: 0,
chunkCount: 84,
}Partial Updates
Use partial syncs for webhooks, CMS callbacks, file watchers, or any job that only sees some sources.
await docsCorpus.sync(changedFiles.load(), {
sourceSet: 'partial',
})Partial syncs can add and update sources. They do not delete missing sources because Crux cannot know whether those sources are actually gone.
Complete Syncs
Use complete syncs when the input is the full authoritative source set.
await docsCorpus.sync(allDocs.load(), {
sourceSet: 'complete',
stale: 'delete',
})stale: 'delete' requires sourceSet: 'complete'. This fails fast:
await docsCorpus.sync(changedFiles.load(), {
sourceSet: 'partial',
stale: 'delete',
})Crux throws instead of guessing.
Dry Run
Preview a sync without writing chunks or source records.
const plan = await docsCorpus.sync(source.load(), {
sourceSet: 'complete',
stale: 'delete',
dryRun: true,
})
console.table(
plan.sources.map((source) => ({
sourceId: source.sourceId,
action: source.action,
reason: source.reason,
chunks: source.chunkCount,
})),
)Use this for admin previews, deploy checks, and tests.
Append-Only
Use append-only mode when existing sources should not be rewritten by routine syncs.
await docsCorpus.sync(source.load(), {
mode: 'appendOnly',
sourceSet: 'partial',
})Changed existing sources are reported as skipped:
const sourceResult = {
sourceId: 'audit-log-2026-05-10',
action: 'skipped',
reason: 'appendOnly',
}Pipeline Cache
If the indexer has cache: true, sync can control the pipeline cache per run.
await docsCorpus.sync(source.load(), {
cache: 'readwrite',
})Force recomputation:
await docsCorpus.sync(source.load(), {
cache: 'refresh',
})Bypass cache while debugging:
await docsCorpus.sync(source.load(), {
cache: 'bypass',
})Inspect A Source
const sourceRecord = await docsCorpus.getSource('guides/retrieval.md')
console.log(sourceRecord?.status)
console.log(sourceRecord?.chunkCount)
console.log(sourceRecord?.lastError)The stage ledger shows what happened during the latest successful index:
for (const stage of sourceRecord?.stages ?? []) {
console.log({
name: stage.name,
kind: stage.kind,
cache: stage.cache,
chunks: stage.chunkCount,
parents: stage.parentCount,
durationMs: stage.durationMs,
})
}Example output:
[
{
name: 'normalize-docs',
kind: 'document-transform',
cache: 'hit',
durationMs: 1,
},
{
name: 'structured',
kind: 'chunker',
cache: 'miss',
chunks: 12,
parents: 0,
durationMs: 9,
},
]List Sources
const failed = await docsCorpus.listSources({ status: 'failed' })
const active = await docsCorpus.listSources({ includeDeleted: false })Retry failed sources by loading the source IDs again:
await docsCorpus.sync(retryLoader(failed.map((source) => source.sourceId)).load(), {
sourceSet: 'partial',
cache: 'refresh',
})Delete Sources
Delete one source:
await docsCorpus.deleteSource('old-guide.md')Clear all active sources for a corpus:
await docsCorpus.clearSources()What Counts As Changed
Crux compares the source hash and the index hash.
const docsCorpus = corpus({
id: 'docs',
namespace: 'product-docs',
data,
indexer: docsIndexer,
hash: {
excludeMetadata: ['mtimeMs', 'lastFetchedAt'],
},
})Changing source content reindexes the source. Changing stable metadata reindexes the source. Changing the indexer fingerprint also reindexes the source, so versioned pipeline changes can intentionally rebuild unchanged content.
Observability
Corpus sync emits:
corpus:sync:start
corpus:source:added | changed | unchanged | skipped | failed | stale | deleted
corpus:sync:endIngest parsing emits:
ingest:parse:start
ingest:parse:endIndexing emits:
index:start
index:endThe same stage records flow through source records, devtools, CLI/TUI, and @crux/otel.