Semantic RAG with Upstash
Index a doc corpus into Upstash Vector, rerank the best results, and answer questions with citations.
This recipe sets up a complete production-style RAG pipeline: ingest a directory of docs, chunk + embed + write into Upstash Vector, rerank the best matches, then answer questions with retriever() injecting the top hits as context. Hybrid mode gives you both semantic and keyword recall, while reranking improves the final ordering before the prompt sees the results.
Primitives used
@crux/ingestfor document loading@crux/core/indexing—indexer()for chunk + embed + write,corpus()for incremental sync@crux/core/retrieval—retriever()for query + asContext@crux/ai—reranker()for model-based reranking@crux/upstash—upstashVectorStore()for managed vector searchembedding()— one dense + one sparse embedding
When to reach for this pattern
- You need a production-ready RAG without managing vector infrastructure yourself
- Your corpus is document-shaped — Markdown, HTML, files
- Both semantic and keyword matching matter (most real corpora benefit from hybrid)
Full code
lib/rag/embedding.ts
import { embedding } from '@crux/core/embedding'
import { openai } from '@ai-sdk/openai'
export const dense = embedding({
kind: 'dense',
name: 'text-embedding-3-small',
dimensions: 1536,
maxInputTokens: 8191,
embed: async (texts) => {
const { embedMany } = await import('ai')
const { embeddings } = await embedMany({
model: openai.embedding('text-embedding-3-small'),
values: texts,
})
return { embeddings }
},
})
export const sparse = embedding({
kind: 'sparse',
name: 'bm25',
maxInputTokens: 8191,
embed: async (texts) => ({ embeddings: texts.map(toSparseVector) }),
})lib/rag/storage.ts
import { cruxConvexStore } from '@crux/convex'
import { upstashVectorStore } from '@crux/upstash'
import { Index } from '@upstash/vector'
export const data = cruxConvexStore({
component: components.crux,
ctx,
})
export const vectors = upstashVectorStore({
index: new Index({
url: process.env.UPSTASH_VECTOR_URL!,
token: process.env.UPSTASH_VECTOR_TOKEN!,
}),
namespace: 'product-docs',
})lib/rag/index-docs.ts
import { chunker, corpus, indexer, indexingPipeline, transform } from '@crux/core/indexing'
import { filesSource } from '@crux/ingest/files'
import { dense, sparse } from './embedding'
import { data, vectors } from './storage'
export const docsIndexer = indexer({
id: 'product-docs',
namespace: 'product-docs',
data,
vectors,
dense,
sparse,
cache: true,
pipeline: indexingPipeline({
documents: [
transform.document({
name: 'tag-source',
version: '1',
run(document) {
return {
...document,
metadata: { ...document.metadata, corpus: 'product-docs' },
}
},
}),
],
chunker: chunker.structured({ tableRowsPerChunk: 25 }),
}),
})
export const docsCorpus = corpus({
id: 'product-docs',
namespace: 'product-docs',
store: data,
indexer: docsIndexer,
})
export async function syncDocs() {
const source = filesSource(
{ directory: './docs', recursive: true },
{ namespace: 'product-docs' },
)
return docsCorpus.sync(source.load(), {
sourceSet: 'complete',
stale: 'delete',
})
}Run pnpm tsx lib/rag/index-docs.ts (or invoke syncDocs()) when content changes. Because this uses corpus.sync(), unchanged files are skipped, changed files are reindexed, and deleted files are removed when the folder scan is complete.
lib/rag/retriever.ts
import { retriever } from '@crux/core/retrieval'
import { reranker } from '@crux/ai'
import { openai } from '@ai-sdk/openai'
import { dense, sparse } from './embedding'
import { data, vectors } from './storage'
const docsReranker = reranker({
name: 'product-docs-reranker',
model: openai.reranking('gpt-4.1-mini'),
topN: 5,
})
export const docs = retriever({
id: 'product-docs',
namespace: 'product-docs',
data,
vectors,
dense,
sparse,
search: { mode: 'hybrid', limit: 5, fusion: 'rrf' },
rerank: docsReranker,
})Optional: advanced query-time pipeline
If your corpus has long docs, repeated chunks, or time-sensitive content, wrap the retriever with a pipeline. The exported object is still a retriever, so the answer prompt below does not change.
import {
compress,
decay,
diversify,
multiQuery,
parentExpand,
retrievalPipeline,
} from '@crux/core/retrieval'
import { generateObjectFn, generateTextFn } from '@crux/ai'
import { openai } from '@ai-sdk/openai'
export const advancedDocs = retrievalPipeline(docs, [
multiQuery({
generate: generateTextFn,
model: openai('gpt-4.1-mini'),
count: 4,
}),
parentExpand({
store: data,
maxParentChars: 4000,
}),
compress({
generate: generateObjectFn,
model: openai('gpt-4.1-mini'),
mode: 'extractive',
maxCharsPerHit: 1200,
}),
diversify({ strategy: 'mmr', limit: 8, sourcePenalty: 0.15 }),
decay({
field: 'metadata.updatedAt',
halfLifeMs: 30 * 24 * 60 * 60 * 1000,
}),
])
const { hits, trace } = await advancedDocs.retrieveWithTrace(
'enterprise SSO setup',
)Use the plain docs retriever when hybrid retrieval plus reranking is enough. Use advancedDocs when you need query expansion, parent context, compression, diversity, freshness, or stage-level debugging. If you use the pipeline, change the answer import below to import { docs } from './retriever'.
lib/rag/answer.ts
import { prompt } from '@crux/core'
import { generate } from '@crux/ai'
import { grounding, citationSchema } from '@crux/core/citations'
import { openai } from '@ai-sdk/openai'
import { z } from 'zod'
import { docs } from './retriever'
const groundedDocs = grounding({
id: 'product-docs',
retriever: docs,
query: ({ input }) => input.question,
limit: 5,
citations: {
required: true,
quotes: 'required',
},
})
const answerPrompt = prompt({
id: 'docs-answer',
use: [groundedDocs],
input: z.object({ question: z.string() }),
output: z.object({
answer: z.string(),
citations: z.array(citationSchema),
}),
system:
`Answer using only the provided retrieved docs. Cite the chunks you used.
If the docs don't contain the answer, say so.`,
prompt: ({ input }) => input.question,
})
export async function answer(question: string) {
const result = await generate(answerPrompt, {
model: openai('gpt-4o'),
input: { question },
})
return result.object
}How it works
- Sync deliberately, query many times. Syncing is expensive (pipeline work + embedding + writes); querying is cheap.
corpus.sync()makes repeated syncs safe by skipping unchanged sources and tracking failures. - Hybrid retrieval combines two strengths. Dense embeddings catch semantic similarity, while sparse embeddings catch exact terms and rare keywords. RRF (Reciprocal Rank Fusion) merges those rankings without needing manual score normalization.
- The reranker sharpens the final shortlist. Retrieval gets you candidate chunks. The reranker reorders the top results against the exact user query before they reach the prompt.
grounding()runs retrieval at prompt-resolve time. Every call toanswer()re-runs retrieval with the user's question, injects the allowed evidence, and contributes a citation constraint.- Structured citations are validated. Each citation captures
sourceId,chunkId, and optional quotes. Crux validates that cited chunks came from the retrieved hit set, and your UI can resolve them back to source URLs or paths via metadata. - The pipeline ledger explains sync behavior. Each source record stores stage records for transforms, chunking, cache hits, durations, and chunk counts. In devtools, CLI/TUI, and OTel this is the difference between "RAG feels stale" and knowing exactly which source and stage changed.
Variations
Sync one changed source
When a single doc changes and your job only sees that one file, use a partial source set. Crux will update that source without assuming missing sources were deleted.
await docsCorpus.sync(
fileSource('./docs/changed.md', { namespace: 'product-docs' }).load(),
{ sourceSet: 'partial' },
)Citations as URLs
If your docs come from URLs (urlsSource), metadata.sourceUrl is set automatically. Override the grounding renderer to format the evidence with clickable labels:
const urlGrounding = grounding({
id: 'url-docs',
retriever: docs,
query: ({ input }) => input.question,
render: ({ hits }) =>
hits
.map(
(hit, index) =>
`[${index + 1}] ${hit.parent?.title ?? hit.sourceId} - ${hit.metadata.sourceUrl ?? hit.chunkId}\n${hit.content}`,
)
.join('\n\n'),
citations: {
required: true,
quotes: 'optional',
},
})