Crux
GuidesRetrieval & RAG

Retrieval Pipelines

Improve query-time retrieval with planning, multi-query expansion, parent expansion, compression, diversity, recency, and custom stages.

Use retrievalPipeline() when one raw search pass is not enough. It decorates a retriever, so the result is still a retriever: you can call retrieve(), place it in prompt({ use }), expose tools, or wrap it with grounding().

import {
  compress,
  decay,
  diversify,
  multiQuery,
  parentExpand,
  queryPlanner,
  retrievalPipeline,
} from '@crux/core/retrieval'

const advancedDocs = retrievalPipeline(docs, [
  queryPlanner({
    generate: generateObjectFn,
    model: planningModel,
    maxQueries: 3,
    filterSchema: z.object({
      product: z.string().optional(),
      visibility: z.enum(['public', 'internal']).optional(),
    }).optional(),
  }),
  multiQuery({
    generate: generateTextFn,
    model: queryModel,
    count: 4,
  }),
  parentExpand({ store: data, maxParentChars: 4000 }),
  compress({
    generate: generateObjectFn,
    model: compressionModel,
    mode: 'extractive',
    maxCharsPerHit: 1200,
  }),
  diversify({ strategy: 'mmr', limit: 8, sourcePenalty: 0.15 }),
  decay({
    field: 'metadata.updatedAt',
    halfLifeMs: 30 * 24 * 60 * 60 * 1000,
  }),
])

const hits = await advancedDocs.retrieve('enterprise SSO rollout')

Bundled Stages

The pipeline has two phases. Query stages run before Crux calls the base retriever. Hit stages run after fanout and merge.

StagePhaseWhat it doesUse it when
queryPlanner()queryTurns one question into typed planned queries with optional filters, weights, and reasons.A broad or ambiguous question should search multiple focused parts of the corpus.
multiQuery()queryGenerates alternate phrasings, dedupes them, and keeps the original by default.Recall is weak because users and docs use different wording.
Fanout + RRF mergeinternalCalls the base retriever once per planned query and merges duplicate hits by namespace/sourceId/chunkId.This always happens after query stages.
parentExpand()hitsFollows hit.parent.key and attaches larger parent context without changing the child citation.You indexed small child chunks but want surrounding context.
compress()hitsKeeps extractive excerpts from each hit and drops empty hits by default.Retrieved chunks are too long or noisy.
diversify()hitsApplies MMR-style diversity with optional same-source penalty.Top results repeat the same source or near-duplicate content.
decay()hitsApplies exponential score decay from a timestamp field.Freshness should influence rank.
retrievalStage()query or hitsAdds a custom stage with the same tracing and instrumentation as built-ins.Product-specific filtering, routing, enrichment, or ranking.

Query stages must come before hit stages. If a query stage appears after a hit stage, Crux throws when the pipeline is created.

Query Planning

queryPlanner() uses structured generation to produce typed subqueries.

const planner = queryPlanner({
  name: 'support-query-planner',
  generate: generateObjectFn,
  model,
  maxQueries: 4,
  filterSchema: z.object({
    product: z.string().optional(),
    visibility: z.enum(['public', 'internal']).optional(),
  }).optional(),
})

The planner must return at least one non-empty query. Planner filters are merged with per-call and retriever filters before retrieval.

multiQuery() creates alternate phrasings:

const expand = multiQuery({
  generate: generateTextFn,
  model,
  count: 4,
  includeOriginal: true,
})

After all query stages run, Crux retrieves each planned query and merges duplicate hits with reciprocal-rank fusion. Merged hits keep the max raw score and store match metadata in metadata._cruxRetrieval.

Parent Expansion

Use parentExpand() with chunker.parentChild() when you want small searchable chunks but larger context during prompting or compression.

const docsIndexer = indexer({
  id: 'docs',
  namespace: 'product-docs',
  data,
  vectors,
  dense,
  pipeline: indexingPipeline({
    chunker: chunker.parentChild({
      parentMaxChars: 6000,
      childMaxChars: 900,
      childOverlapChars: 120,
    }),
  }),
})

const advancedDocs = retrievalPipeline(docs, [
  parentExpand({ store: data, maxParentChars: 4000, missing: 'warn' }),
])

Child hits keep their sourceId, chunkId, score, and provenance. Parent content is added to hit.parent.content.

Compression, Diversity, And Decay

Use compress() to keep quote-safe extractive excerpts:

const shrink = compress({
  generate: generateObjectFn,
  model,
  mode: 'extractive',
  maxCharsPerHit: 1200,
})

Use diversify() to reduce repeated chunks:

const diverse = diversify({
  strategy: 'mmr',
  lambda: 0.5,
  limit: 8,
  sourcePenalty: 0.15,
})

Use decay() when freshness should affect rank:

const recent = decay({
  field: 'metadata.updatedAt',
  halfLifeMs: 30 * 24 * 60 * 60 * 1000,
  missing: 'ignore',
})

Custom Stages

Use retrievalStage() for product-specific logic.

import { retrievalStage } from '@crux/core/retrieval'

const onlyPublicDocs = retrievalStage({
  name: 'only-public-docs',
  phase: 'hits',
  run({ hits }) {
    return hits.filter((hit) => hit.metadata.visibility === 'public')
  },
})

const docsForCustomers = retrievalPipeline(docs, [
  multiQuery({ generate: generateTextFn, model: queryModel, count: 3 }),
  onlyPublicDocs,
  diversify({ strategy: 'mmr', limit: 6 }),
])

Custom query stages return planned queries and must run before hit stages. Custom hit stages return hits and can filter, reorder, enrich, or annotate candidates.

Prompt Use And Tracing

Because a pipeline is still a retriever, use it directly in prompts:

const promptDocs = retrievalPipeline(
  docs,
  [
    multiQuery({ generate: generateTextFn, model: queryModel, count: 4 }),
    parentExpand({ store: data, maxParentChars: 4000 }),
  ],
  {
    inject: 'both',
    context: { query: ({ question }) => question, limit: 6 },
    tools: { prefix: true, include: ['search', 'getSource'] },
  },
)

const answerDocs = prompt({
  id: 'answer-docs',
  use: [promptDocs],
  input: z.object({ question: z.string() }),
  system: 'Answer from the retrieved docs.',
})

When the pipeline behaves unexpectedly, use retrieveWithTrace():

const { hits, trace } = await advancedDocs.retrieveWithTrace('pricing changes')

console.table(
  trace.stages.map((stage) => ({
    stage: stage.name,
    status: stage.status,
    inputQueries: stage.inputQueryCount,
    outputHits: stage.outputHitCount,
    warnings: stage.warnings.length,
  })),
)

Devtools, CLI, and TUI show bounded previews. OTel receives only privacy-safe stage names, kinds, phases, status, counts, and warning counts.

After changing a pipeline, run RAG evals against a baseline and candidate config. That gives you per-case metric deltas, evidence previews, pipeline traces, citation checks, and failed-case export.

On this page