Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
"""Backfill search_vector for native-backend observations.

Observations created or updated by the consolidator landed with a NULL
``search_vector`` under the ``native`` text-search backend: the
single-row INSERT/UPDATE paths in ``consolidator.py`` never populated the
tsvector (only the batch raw-fact path in ``ops_postgresql.insert_facts_batch``
did). Those observations were therefore invisible to the BM25 retrieval arm
until they were re-written by a later consolidation pass. The writer is fixed
in the same change set (all four consolidator sites now call
``to_tsvector($lang, COALESCE(text, ''))``); this migration repairs the
historical residue so existing observations become BM25-searchable without a
re-ingest.

Scope mirrors the writer fix exactly:
* Only the ``native`` backend is touched. The gate is the column *type*:
under ``native`` ``search_vector`` is a regular (non-generated) tsvector
column; under ``vchord`` it is a ``bm25vector`` and under
``pg_textsearch`` / ``pgroonga`` / ``pg_search`` it is a dummy ``text``
column. ``_is_regular_tsvector`` is true only for ``native``, so every
other backend is a no-op.
* The tsvector is built from the observation's own ``text`` only — matching
the consolidator INSERT/UPDATE paths (entity / source / temporal signals
are intentionally excluded; the other retrieval arms cover those).
* Only ``fact_type = 'observation'`` rows with a NULL ``search_vector`` are
rewritten. Raw facts already carry a populated tsvector, and the
``IS NULL`` predicate makes the migration idempotent and re-runnable.

The configured ``HINDSIGHT_API_TEXT_SEARCH_EXTENSION_NATIVE_LANGUAGE`` is used
so backfilled rows are lexically identical to newly-created observations. The
value is validated as a PG identifier (mirroring
``HindsightConfig.validate``) before being embedded as a SQL literal.

This is a single UPDATE per schema: it locks the targeted observation rows for
its duration. It is one-time and only touches unpopulated rows, so subsequent
online writes (which now carry the tsvector via the writer fix) are unaffected.

Oracle slot is intentionally absent: the consolidator INSERT/UPDATE paths that
this repairs are PostgreSQL-specific (``ops_postgresql``), and the native
tsvector ``search_vector`` column only exists on PostgreSQL. There is no Oracle
residue to repair.

Revision ID: c3f7a1b9d2e4
Revises: f4d1c2b3a5e6
Create Date: 2026-06-29
"""

import os
import re
from collections.abc import Sequence

from alembic import context, op
from sqlalchemy import Connection, text

from hindsight_api.alembic._dialect import run_for_dialect
from hindsight_api.config import (
DEFAULT_TEXT_SEARCH_EXTENSION_NATIVE_LANGUAGE,
ENV_TEXT_SEARCH_EXTENSION_NATIVE_LANGUAGE,
)

revision: str = "c3f7a1b9d2e4"
down_revision: str | Sequence[str] | None = "f4d1c2b3a5e6"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None

# Matches HindsightConfig.validate(): a tsvector regconfig name embedded as a
# SQL literal must be a bare PG identifier.
_PG_IDENTIFIER = re.compile(r"[a-zA-Z_][a-zA-Z0-9_]*")


def _schema_prefix() -> str:
schema = context.config.get_main_option("target_schema")
return f'"{schema}".' if schema else ""


def _schema_name() -> str:
return (context.config.get_main_option("target_schema") or "public").strip('"')


def _native_language() -> str:
"""Configured native tsvector language, validated as a PG identifier."""
lang = os.getenv(
ENV_TEXT_SEARCH_EXTENSION_NATIVE_LANGUAGE,
DEFAULT_TEXT_SEARCH_EXTENSION_NATIVE_LANGUAGE,
)
if not _PG_IDENTIFIER.fullmatch(lang):
return DEFAULT_TEXT_SEARCH_EXTENSION_NATIVE_LANGUAGE
return lang


def _is_regular_tsvector(conn: Connection, schema: str, table: str) -> bool:
"""True iff ``schema.table.search_vector`` is a non-generated tsvector column.

This is the ``native`` backend signature. ``vchord`` (bm25vector) and
``pg_textsearch`` / ``pgroonga`` / ``pg_search`` (dummy text column) all
fail this check, so the backfill is a no-op for them.
"""
row = conn.execute(
text(
"""
SELECT is_generated, udt_name
FROM information_schema.columns
WHERE table_schema = :schema
AND table_name = :table
AND column_name = 'search_vector'
"""
),
{"schema": schema, "table": table},
).fetchone()
if not row:
return False
is_generated, udt_name = row[0], row[1]
return udt_name == "tsvector" and is_generated != "ALWAYS"


def _pg_upgrade() -> None:
conn = op.get_bind()
schema_name = _schema_name()
if not _is_regular_tsvector(conn, schema_name, "memory_units"):
# Non-native backend (or column absent) — nothing to backfill.
return
schema_prefix = _schema_prefix()
lang = _native_language()
op.execute(
f"""
UPDATE {schema_prefix}memory_units
SET search_vector = to_tsvector('{lang}'::regconfig, COALESCE(text, ''))
WHERE fact_type = 'observation' AND search_vector IS NULL
"""
)


def _pg_downgrade() -> None:
# No-op: backfilled rows are indistinguishable from observations that were
# populated by the post-fix writer, and reverting either to NULL would
# re-break BM25 retrieval. The column simply stays populated.
pass


def upgrade() -> None:
run_for_dialect(pg=_pg_upgrade)


def downgrade() -> None:
run_for_dialect(pg=_pg_downgrade)
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,18 @@ async def _dedup_reconcile_create(
# Fold the new source facts into the twin and persist the merged text. We keep the twin's
# existing embedding: the merged text is >= threshold similar, so the stored vector stays
# representative and we avoid a re-embed + a dialect-specific vector UPDATE.
search_vector_clause = (
f",\n search_vector = to_tsvector('{config.text_search_extension_native_language}'::regconfig, COALESCE($1, ''))"
if config.text_search_extension == "native"
else ""
)
await conn.execute(
f"""
UPDATE {fq_table("memory_units")}
SET text = $1,
source_memory_ids = (SELECT array_agg(DISTINCT e) FROM unnest(source_memory_ids || $2::uuid[]) e),
proof_count = (SELECT count(DISTINCT e) FROM unnest(source_memory_ids || $2::uuid[]) e),
updated_at = now()
updated_at = now(){search_vector_clause}
WHERE id = $3::uuid
""",
outcome.merged_text,
Expand Down Expand Up @@ -279,6 +284,11 @@ async def _dedup_reconcile_update(
# the create path) then delete the now-redundant updated row. The all_strict/any tag match
# guarantees twin and updated share scope, so dropping the updated row's tags loses no
# visibility. Temporal fields follow the surviving twin (minimal scope; matches create).
search_vector_clause = (
f",\n search_vector = to_tsvector('{config.text_search_extension_native_language}'::regconfig, COALESCE($1, ''))"
if config.text_search_extension == "native"
else ""
)
await conn.execute(
f"""
UPDATE {fq_table("memory_units")} t
Expand All @@ -289,7 +299,7 @@ async def _dedup_reconcile_update(
proof_count = (
SELECT count(DISTINCT e) FROM unnest(t.source_memory_ids || u.source_memory_ids) e
),
updated_at = now()
updated_at = now(){search_vector_clause}
FROM {fq_table("memory_units")} u
WHERE t.id = $2::uuid AND u.id = $3::uuid
""",
Expand Down Expand Up @@ -1845,6 +1855,12 @@ async def _execute_update_action(

config = get_config()

search_vector_clause = (
f",\n search_vector = to_tsvector('{config.text_search_extension_native_language}'::regconfig, COALESCE($1, ''))"
if config.text_search_extension == "native"
else ""
)

t0 = time.time()
await conn.execute(
f"""
Expand All @@ -1857,7 +1873,7 @@ async def _execute_update_action(
updated_at = now(),
occurred_start = LEAST(occurred_start, COALESCE($6, occurred_start)),
occurred_end = GREATEST(occurred_end, COALESCE($7, occurred_end)),
mentioned_at = GREATEST(mentioned_at, COALESCE($8, mentioned_at))
mentioned_at = GREATEST(mentioned_at, COALESCE($8, mentioned_at)){search_vector_clause}
WHERE id = $5
""",
new_text,
Expand Down Expand Up @@ -2333,16 +2349,20 @@ async def _create_observation_directly(
tokenize($3, 'llmlingua2')::bm25_catalog.bm25vector)
RETURNING id
"""
else: # native, pg_textsearch, pgroonga, or pg_search
# pg_textsearch / pgroonga / pg_search: indexes operate on base text
# columns directly, so the dummy search_vector column is left NULL.
# Native: the migration p4q5r6s7t8u9 dropped the GENERATED expression on
# search_vector to allow per-deployment language configuration; the
# batch insert path in ops_postgresql.insert_facts_batch now populates
# it via to_tsvector($lang, ...). This single-observation INSERT does
# not, so observations under the native backend currently land with
# NULL search_vector and are not BM25-searchable until reflected/
# re-ingested. Tracking a separate fix for that gap.
elif config.text_search_extension == "native":
# Native: search_vector is populated with to_tsvector() using the
# configured native language dictionary, matching the batch insert
# path in ops_postgresql.insert_facts_batch.
query = f"""
INSERT INTO {fq_table("memory_units")} (
id, bank_id, text, fact_type, embedding, proof_count, source_memory_ids,
tags, event_date, occurred_start, occurred_end, mentioned_at, search_vector
)
VALUES ($1, $2, $3, 'observation', $4::vector, 1, $5, $6, $7, $8, $9, $10,
to_tsvector('{config.text_search_extension_native_language}'::regconfig, COALESCE($3, '')))
RETURNING id
"""
else: # pg_textsearch, pgroonga, pg_search: indexes operate on base text columns directly
query = f"""
INSERT INTO {fq_table("memory_units")} (
id, bank_id, text, fact_type, embedding, proof_count, source_memory_ids,
Expand Down
1 change: 0 additions & 1 deletion hindsight-api-slim/tests/test_chunking.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,4 +455,3 @@ def test_merged_json_array_routes_to_conversation_chunking():
assert isinstance(parsed, list), f"Chunk must be a JSON array: {chunk[:60]}"
assert all(isinstance(e, dict) for e in parsed), f"Every element must be a dict: {chunk[:60]}"
assert all("role" in e for e in parsed), f"Every element must have a role key: {chunk[:60]}"

36 changes: 36 additions & 0 deletions hindsight-api-slim/tests/test_consolidation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3619,3 +3619,39 @@ def test_consolidation_prompt_split_is_cacheable_and_complete():
)
assert "OBSERVATION LIMIT REACHED" in capped
assert "OBSERVATION LIMIT REACHED" not in sys_prompt


@pytest.mark.asyncio
async def test_create_observation_populates_search_vector_native(memory, request_context):
"""Observations created via consolidation must have search_vector populated
when text_search_extension == 'native', so BM25 retrieval finds them."""
from hindsight_api.config import get_config

config = get_config()
if config.text_search_extension != "native":
pytest.skip("Only applies to native text search backend")

bank_id = f"test-search-vector-{uuid.uuid4().hex[:8]}"
await memory.get_bank_profile(bank_id=bank_id, request_context=request_context)

await memory.retain_async(
bank_id=bank_id,
content="Django uses middleware for request processing.",
request_context=request_context,
)

async with memory._pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT search_vector
FROM memory_units
WHERE bank_id = $1 AND fact_type = 'observation'
LIMIT 1
""",
bank_id,
)

assert row is not None, "Consolidation should have created an observation"
assert row["search_vector"] is not None, "search_vector must be populated for BM25 retrieval under native backend"

await memory.delete_bank(bank_id, request_context=request_context)
Loading