Postgres Storage
Implement durable Crux DataStore, pgvector VectorStore, and bytea BlobStore adapters on Postgres.
Use a DataStore when Crux needs durable JSON records. Postgres is a good fit when your app already uses it for product data and you want memory, flow state, corpus ledgers, and workspace metadata in the same operational database.
This recipe starts with a DataStore, then shows optional Postgres-backed vector and blob storage. You can use only the pieces you need:
const data = postgresDataStore(pool)
const vectors = postgresVectorStore(pool) // optional, dense pgvector search
const blobs = postgresBlobStore(pool) // optional, modest bytea-backed filesFor large production files, object storage such as S3, R2, GCS, or Convex file storage is usually a better BlobStore than Postgres. Use the Postgres blob example when you deliberately want one operational database and the files are modest in size.
DataStore
Implement the DataStore interface with a single table:
create table crux_records (
key text primary key,
value jsonb not null,
expires_at timestamptz,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
create index crux_records_key_prefix_idx on crux_records (key text_pattern_ops);
create index crux_records_updated_at_idx on crux_records (updated_at desc);
create index crux_records_expires_at_idx on crux_records (expires_at);Implementation
This example uses pg, but the same shape works with Prisma, Drizzle, Kysely, Neon, Supabase, or any SQL client.
import type {
DataStore,
JsonObject,
ListOptions,
ListResult,
SetOptions,
} from '@crux/core/storage'
type Row = {
key: string
value: JsonObject
}
type Queryable = {
query<T extends Record<string, unknown> = Record<string, unknown>>(
sql: string,
values?: readonly unknown[],
): Promise<{ rows: T[] }>
}
export function postgresDataStore(pool: Queryable): DataStore {
return {
async get(key) {
await pool.query(
`
delete from crux_records
where expires_at is not null and expires_at <= now()
`,
)
const row = await pool.query<Row>(
`
select key, value
from crux_records
where key = $1
and (expires_at is null or expires_at > now())
limit 1
`,
[key],
)
return row.rows[0]?.value ?? null
},
async set(key, value, options?: SetOptions) {
const expiresAt =
options?.ttl !== undefined && options.ttl > 0
? new Date(Date.now() + options.ttl)
: null
await pool.query(
`
insert into crux_records (key, value, expires_at, updated_at)
values ($1, $2::jsonb, $3, now())
on conflict (key) do update
set value = excluded.value,
expires_at = excluded.expires_at,
updated_at = now()
`,
[key, JSON.stringify(value), expiresAt],
)
},
async delete(key) {
await pool.query('delete from crux_records where key = $1', [key])
},
async list(prefix, options?: ListOptions): Promise<ListResult> {
const limit = options?.limit ?? 100
const rows = await pool.query<Row>(
`
select key, value
from crux_records
where key like $1
escape '\\'
and ($2::text is null or key > $2)
and (expires_at is null or expires_at > now())
order by key asc
limit $3
`,
[`${escapeLike(prefix)}%`, options?.cursor ?? null, limit + 1],
)
const visibleRows = options?.filter
? rows.rows.filter((row) => matchesTopLevelFilter(row.value, options.filter!))
: rows.rows
const entries = visibleRows.slice(0, limit).map((row) => ({
key: row.key,
value: row.value,
}))
return {
entries,
cursor: visibleRows.length > limit ? entries.at(-1)?.key : undefined,
}
},
supportsTtl() {
return true
},
}
}
function escapeLike(value: string): string {
return value.replace(/[\\%_]/g, (match) => `\\${match}`)
}
function matchesTopLevelFilter(
value: JsonObject,
filter: Record<string, unknown>,
): boolean {
return Object.entries(filter).every(([key, expected]) => {
const actual = value[key]
return expected === null ? actual === null || actual === undefined : actual === expected
})
}VectorStore With pgvector
Postgres can also be a dense VectorStore through the pgvector extension. This is useful when you want a simple deployment and dense semantic search is enough.
It is not the cleanest default for sparse or hybrid retrieval. Crux sparse vectors are { indices, values }, while Postgres full-text search uses tsvector/tsquery. If you want sparse or hybrid search in Postgres, build a custom retriever that combines pgvector and full-text search, or implement a product-specific VectorStore with your own fusion. For first-class sparse/hybrid vector APIs, Upstash Vector is currently the simpler bundled path.
Schema for dense pgvector:
create extension if not exists vector;
create table crux_vectors (
key text primary key,
dense vector(1536),
metadata jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
create index crux_vectors_dense_idx
on crux_vectors using hnsw (dense vector_cosine_ops);
create index crux_vectors_metadata_idx
on crux_vectors using gin (metadata);Change vector(1536) to match your embedding dimensions.
import type {
VectorHit,
VectorRecord,
VectorSearchQuery,
VectorStore,
} from '@crux/core/storage'
export function postgresVectorStore(pool: Queryable): VectorStore {
return {
async upsert(records: readonly VectorRecord[]) {
for (const record of records) {
if (!record.dense) {
throw new Error('postgresVectorStore() supports dense vectors only.')
}
await pool.query(
`
insert into crux_vectors (key, dense, metadata, updated_at)
values ($1, $2::vector, $3::jsonb, now())
on conflict (key) do update
set dense = excluded.dense,
metadata = excluded.metadata,
updated_at = now()
`,
[
record.key,
toPgVector(record.dense),
JSON.stringify(record.metadata ?? {}),
],
)
}
},
async delete(keys: readonly string[]) {
if (keys.length === 0) return
await pool.query(
`
delete from crux_vectors
where key = any($1::text[])
`,
[keys],
)
},
async search(query: VectorSearchQuery): Promise<readonly VectorHit[]> {
if (!query.dense) {
throw new Error('postgresVectorStore() supports dense search only.')
}
if (query.sparse) {
throw new Error('postgresVectorStore() does not support sparse or hybrid search.')
}
const limit = query.limit ?? 10
const threshold = query.threshold ?? 0
const filter = query.filter ? JSON.stringify(query.filter) : null
const rows = await pool.query<{
key: string
score: number
metadata: Record<string, unknown>
}>(
`
select key,
1 - (dense <=> $1::vector) as score,
metadata
from crux_vectors
where dense is not null
and ($2::jsonb is null or metadata @> $2::jsonb)
and 1 - (dense <=> $1::vector) >= $3
order by dense <=> $1::vector
limit $4
`,
[toPgVector(query.dense), filter, threshold, limit],
)
return rows.rows.map((row) => ({
key: row.key,
score: row.score,
metadata: row.metadata,
}))
},
capabilities() {
return { dense: true, sparse: false, hybrid: false }
},
}
}
function toPgVector(vector: readonly number[]): string {
if (vector.length === 0) {
throw new Error('Vector must not be empty.')
}
for (const value of vector) {
if (!Number.isFinite(value)) {
throw new Error('Vector contains a non-finite value.')
}
}
return `[${vector.join(',')}]`
}Use it with indexing and retrieval:
const data = postgresDataStore(pool)
const vectors = postgresVectorStore(pool)
const docsIndexer = indexer({
id: 'docs',
namespace: 'docs',
data,
vectors,
dense,
})
const docs = retriever({
id: 'docs',
namespace: 'docs',
data,
vectors,
dense,
search: { mode: 'dense', limit: 8 },
})BlobStore With bytea
Postgres can store binary content with bytea. This is convenient for tests, internal tools, and small generated files. It is usually not the best choice for large PDFs, images, or user uploads; object storage is cheaper and better at serving bytes.
Schema:
create table crux_blobs (
uri text primary key,
content bytea not null,
mime_type text not null,
size integer not null,
metadata jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);Implementation:
import type {
BlobPutInput,
BlobReadResult,
BlobRef,
BlobStore,
} from '@crux/core/storage'
export function postgresBlobStore(pool: Queryable): BlobStore {
return {
async put(input: BlobPutInput): Promise<BlobRef> {
const key = input.key ?? crypto.randomUUID()
const uri = `postgres://crux_blobs/${encodeURIComponent(key)}`
const bytes = await toUint8Array(input.content)
await pool.query(
`
insert into crux_blobs (uri, content, mime_type, size, metadata, updated_at)
values ($1, $2, $3, $4, $5::jsonb, now())
on conflict (uri) do update
set content = excluded.content,
mime_type = excluded.mime_type,
size = excluded.size,
metadata = excluded.metadata,
updated_at = now()
`,
[
uri,
Buffer.from(bytes),
input.mimeType,
bytes.byteLength,
JSON.stringify(input.metadata ?? {}),
],
)
return { uri, size: bytes.byteLength }
},
async get(uri: string): Promise<BlobReadResult> {
const row = await pool.query<{
content: Buffer
mime_type: string
size: number
}>(
`
select content, mime_type, size
from crux_blobs
where uri = $1
limit 1
`,
[uri],
)
const blob = row.rows[0]
if (!blob) {
throw new Error(`Blob not found: ${uri}`)
}
return {
content: new Uint8Array(blob.content),
mimeType: blob.mime_type,
size: blob.size,
}
},
async delete(uri: string) {
await pool.query('delete from crux_blobs where uri = $1', [uri])
},
capabilities() {
return { maxBytes: 10 * 1024 * 1024 }
},
}
}
async function toUint8Array(content: BlobPutInput['content']): Promise<Uint8Array> {
if (content instanceof Uint8Array) return content
if (typeof content === 'string') return new TextEncoder().encode(content)
if (content instanceof Blob) return new Uint8Array(await content.arrayBuffer())
const reader = content.getReader()
const chunks: Uint8Array[] = []
while (true) {
const { done, value } = await reader.read()
if (done) break
chunks.push(value)
}
const size = chunks.reduce((total, chunk) => total + chunk.byteLength, 0)
const result = new Uint8Array(size)
let offset = 0
for (const chunk of chunks) {
result.set(chunk, offset)
offset += chunk.byteLength
}
return result
}Wire it into a workspace:
import { storage } from '@crux/core/storage'
import { workspace } from '@crux/core/workspace'
const files = workspace({
id: 'thread-files',
namespace: threadId,
storage: storage({
data: postgresDataStore(pool),
blobs: postgresBlobStore(pool),
}),
})
await files.write('/outputs/report.pdf', pdfBytes, {
mimeType: 'application/pdf',
})Use It
import { memory, workingState } from '@crux/core/memory'
import { Pool } from 'pg'
const data = postgresDataStore(
new Pool({ connectionString: process.env.DATABASE_URL }),
)
const mem = memory({
id: 'assistant',
namespace: `user:${userId}`,
store: data,
blocks: [workingState({ id: 'profile', schema })],
})For retrieval, pair Postgres data with either the pgvector store above or another vector store:
const docsIndexer = indexer({
id: 'docs',
namespace: 'docs',
data,
vectors,
dense,
})
const docs = retriever({
id: 'docs',
namespace: 'docs',
data,
vectors,
dense,
})Postgres owns JSON record hydration. VectorStore owns similarity search. BlobStore owns bytes for workspaces. Keeping those capabilities separate makes each adapter simpler and avoids pretending Postgres is also object storage or vector infrastructure unless you explicitly add those capabilities.
Production Notes
Use a tenant-aware key prefix or separate databases when tenants need hard isolation. Crux keys are already namespaced by feature, but tenant isolation is an application security boundary.
Run a periodic cleanup job if you use TTL heavily:
delete from crux_records
where expires_at is not null and expires_at <= now();For high-write workloads, add indexes that match your app’s actual prefixes and keep list() limits bounded.
For pgvector, benchmark hnsw vs ivfflat indexes with your corpus size and update pattern. For small corpora, exact scan can be acceptable; for larger corpora, use an ANN index and monitor recall.
For blob storage, keep a hard size limit if you use Postgres bytea. Store large user uploads and generated artifacts in object storage and keep only metadata/URIs in Postgres.