Retrieval Pipelines
Improve query-time retrieval with planning, multi-query expansion, parent expansion, compression, diversity, recency, and custom stages.
Use retrievalPipeline() when one raw search pass is not enough. It decorates a retriever, so the result is still a retriever: you can call retrieve(), place it in prompt({ use }), expose tools, or wrap it with grounding().
import {
compress,
decay,
diversify,
multiQuery,
parentExpand,
queryPlanner,
retrievalPipeline,
} from '@crux/core/retrieval'
const advancedDocs = retrievalPipeline(docs, [
queryPlanner({
generate: generateObjectFn,
model: planningModel,
maxQueries: 3,
filterSchema: z.object({
product: z.string().optional(),
visibility: z.enum(['public', 'internal']).optional(),
}).optional(),
}),
multiQuery({
generate: generateTextFn,
model: queryModel,
count: 4,
}),
parentExpand({ store: data, maxParentChars: 4000 }),
compress({
generate: generateObjectFn,
model: compressionModel,
mode: 'extractive',
maxCharsPerHit: 1200,
}),
diversify({ strategy: 'mmr', limit: 8, sourcePenalty: 0.15 }),
decay({
field: 'metadata.updatedAt',
halfLifeMs: 30 * 24 * 60 * 60 * 1000,
}),
])
const hits = await advancedDocs.retrieve('enterprise SSO rollout')Bundled Stages
The pipeline has two phases. Query stages run before Crux calls the base retriever. Hit stages run after fanout and merge.
| Stage | Phase | What it does | Use it when |
|---|---|---|---|
queryPlanner() | query | Turns one question into typed planned queries with optional filters, weights, and reasons. | A broad or ambiguous question should search multiple focused parts of the corpus. |
multiQuery() | query | Generates alternate phrasings, dedupes them, and keeps the original by default. | Recall is weak because users and docs use different wording. |
| Fanout + RRF merge | internal | Calls the base retriever once per planned query and merges duplicate hits by namespace/sourceId/chunkId. | This always happens after query stages. |
parentExpand() | hits | Follows hit.parent.key and attaches larger parent context without changing the child citation. | You indexed small child chunks but want surrounding context. |
compress() | hits | Keeps extractive excerpts from each hit and drops empty hits by default. | Retrieved chunks are too long or noisy. |
diversify() | hits | Applies MMR-style diversity with optional same-source penalty. | Top results repeat the same source or near-duplicate content. |
decay() | hits | Applies exponential score decay from a timestamp field. | Freshness should influence rank. |
retrievalStage() | query or hits | Adds a custom stage with the same tracing and instrumentation as built-ins. | Product-specific filtering, routing, enrichment, or ranking. |
Query stages must come before hit stages. If a query stage appears after a hit stage, Crux throws when the pipeline is created.
Query Planning
queryPlanner() uses structured generation to produce typed subqueries.
const planner = queryPlanner({
name: 'support-query-planner',
generate: generateObjectFn,
model,
maxQueries: 4,
filterSchema: z.object({
product: z.string().optional(),
visibility: z.enum(['public', 'internal']).optional(),
}).optional(),
})The planner must return at least one non-empty query. Planner filters are merged with per-call and retriever filters before retrieval.
multiQuery() creates alternate phrasings:
const expand = multiQuery({
generate: generateTextFn,
model,
count: 4,
includeOriginal: true,
})After all query stages run, Crux retrieves each planned query and merges duplicate hits with reciprocal-rank fusion. Merged hits keep the max raw score and store match metadata in metadata._cruxRetrieval.
Parent Expansion
Use parentExpand() with chunker.parentChild() when you want small searchable chunks but larger context during prompting or compression.
const docsIndexer = indexer({
id: 'docs',
namespace: 'product-docs',
data,
vectors,
dense,
pipeline: indexingPipeline({
chunker: chunker.parentChild({
parentMaxChars: 6000,
childMaxChars: 900,
childOverlapChars: 120,
}),
}),
})
const advancedDocs = retrievalPipeline(docs, [
parentExpand({ store: data, maxParentChars: 4000, missing: 'warn' }),
])Child hits keep their sourceId, chunkId, score, and provenance. Parent content is added to hit.parent.content.
Compression, Diversity, And Decay
Use compress() to keep quote-safe extractive excerpts:
const shrink = compress({
generate: generateObjectFn,
model,
mode: 'extractive',
maxCharsPerHit: 1200,
})Use diversify() to reduce repeated chunks:
const diverse = diversify({
strategy: 'mmr',
lambda: 0.5,
limit: 8,
sourcePenalty: 0.15,
})Use decay() when freshness should affect rank:
const recent = decay({
field: 'metadata.updatedAt',
halfLifeMs: 30 * 24 * 60 * 60 * 1000,
missing: 'ignore',
})Custom Stages
Use retrievalStage() for product-specific logic.
import { retrievalStage } from '@crux/core/retrieval'
const onlyPublicDocs = retrievalStage({
name: 'only-public-docs',
phase: 'hits',
run({ hits }) {
return hits.filter((hit) => hit.metadata.visibility === 'public')
},
})
const docsForCustomers = retrievalPipeline(docs, [
multiQuery({ generate: generateTextFn, model: queryModel, count: 3 }),
onlyPublicDocs,
diversify({ strategy: 'mmr', limit: 6 }),
])Custom query stages return planned queries and must run before hit stages. Custom hit stages return hits and can filter, reorder, enrich, or annotate candidates.
Prompt Use And Tracing
Because a pipeline is still a retriever, use it directly in prompts:
const promptDocs = retrievalPipeline(
docs,
[
multiQuery({ generate: generateTextFn, model: queryModel, count: 4 }),
parentExpand({ store: data, maxParentChars: 4000 }),
],
{
inject: 'both',
context: { query: ({ question }) => question, limit: 6 },
tools: { prefix: true, include: ['search', 'getSource'] },
},
)
const answerDocs = prompt({
id: 'answer-docs',
use: [promptDocs],
input: z.object({ question: z.string() }),
system: 'Answer from the retrieved docs.',
})When the pipeline behaves unexpectedly, use retrieveWithTrace():
const { hits, trace } = await advancedDocs.retrieveWithTrace('pricing changes')
console.table(
trace.stages.map((stage) => ({
stage: stage.name,
status: stage.status,
inputQueries: stage.inputQueryCount,
outputHits: stage.outputHitCount,
warnings: stage.warnings.length,
})),
)Devtools, CLI, and TUI show bounded previews. OTel receives only privacy-safe stage names, kinds, phases, status, counts, and warning counts.
After changing a pipeline, run RAG evals against a baseline and candidate config. That gives you per-case metric deltas, evidence previews, pipeline traces, citation checks, and failed-case export.