diff --git a/hindsight-api-slim/hindsight_api/alembic/versions/c3f7a1b9d2e4_backfill_observation_search_vector.py b/hindsight-api-slim/hindsight_api/alembic/versions/c3f7a1b9d2e4_backfill_observation_search_vector.py new file mode 100644 index 0000000000..1a9aad5391 --- /dev/null +++ b/hindsight-api-slim/hindsight_api/alembic/versions/c3f7a1b9d2e4_backfill_observation_search_vector.py @@ -0,0 +1,144 @@ +"""Backfill search_vector for native-backend observations. + +Observations created or updated by the consolidator landed with a NULL +``search_vector`` under the ``native`` text-search backend: the +single-row INSERT/UPDATE paths in ``consolidator.py`` never populated the +tsvector (only the batch raw-fact path in ``ops_postgresql.insert_facts_batch`` +did). Those observations were therefore invisible to the BM25 retrieval arm +until they were re-written by a later consolidation pass. The writer is fixed +in the same change set (all four consolidator sites now call +``to_tsvector($lang, COALESCE(text, ''))``); this migration repairs the +historical residue so existing observations become BM25-searchable without a +re-ingest. + +Scope mirrors the writer fix exactly: + * Only the ``native`` backend is touched. The gate is the column *type*: + under ``native`` ``search_vector`` is a regular (non-generated) tsvector + column; under ``vchord`` it is a ``bm25vector`` and under + ``pg_textsearch`` / ``pgroonga`` / ``pg_search`` it is a dummy ``text`` + column. ``_is_regular_tsvector`` is true only for ``native``, so every + other backend is a no-op. + * The tsvector is built from the observation's own ``text`` only — matching + the consolidator INSERT/UPDATE paths (entity / source / temporal signals + are intentionally excluded; the other retrieval arms cover those). + * Only ``fact_type = 'observation'`` rows with a NULL ``search_vector`` are + rewritten. Raw facts already carry a populated tsvector, and the + ``IS NULL`` predicate makes the migration idempotent and re-runnable. + +The configured ``HINDSIGHT_API_TEXT_SEARCH_EXTENSION_NATIVE_LANGUAGE`` is used +so backfilled rows are lexically identical to newly-created observations. The +value is validated as a PG identifier (mirroring +``HindsightConfig.validate``) before being embedded as a SQL literal. + +This is a single UPDATE per schema: it locks the targeted observation rows for +its duration. It is one-time and only touches unpopulated rows, so subsequent +online writes (which now carry the tsvector via the writer fix) are unaffected. + +Oracle slot is intentionally absent: the consolidator INSERT/UPDATE paths that +this repairs are PostgreSQL-specific (``ops_postgresql``), and the native +tsvector ``search_vector`` column only exists on PostgreSQL. There is no Oracle +residue to repair. + +Revision ID: c3f7a1b9d2e4 +Revises: f4d1c2b3a5e6 +Create Date: 2026-06-29 +""" + +import os +import re +from collections.abc import Sequence + +from alembic import context, op +from sqlalchemy import Connection, text + +from hindsight_api.alembic._dialect import run_for_dialect +from hindsight_api.config import ( + DEFAULT_TEXT_SEARCH_EXTENSION_NATIVE_LANGUAGE, + ENV_TEXT_SEARCH_EXTENSION_NATIVE_LANGUAGE, +) + +revision: str = "c3f7a1b9d2e4" +down_revision: str | Sequence[str] | None = "f4d1c2b3a5e6" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + +# Matches HindsightConfig.validate(): a tsvector regconfig name embedded as a +# SQL literal must be a bare PG identifier. +_PG_IDENTIFIER = re.compile(r"[a-zA-Z_][a-zA-Z0-9_]*") + + +def _schema_prefix() -> str: + schema = context.config.get_main_option("target_schema") + return f'"{schema}".' if schema else "" + + +def _schema_name() -> str: + return (context.config.get_main_option("target_schema") or "public").strip('"') + + +def _native_language() -> str: + """Configured native tsvector language, validated as a PG identifier.""" + lang = os.getenv( + ENV_TEXT_SEARCH_EXTENSION_NATIVE_LANGUAGE, + DEFAULT_TEXT_SEARCH_EXTENSION_NATIVE_LANGUAGE, + ) + if not _PG_IDENTIFIER.fullmatch(lang): + return DEFAULT_TEXT_SEARCH_EXTENSION_NATIVE_LANGUAGE + return lang + + +def _is_regular_tsvector(conn: Connection, schema: str, table: str) -> bool: + """True iff ``schema.table.search_vector`` is a non-generated tsvector column. + + This is the ``native`` backend signature. ``vchord`` (bm25vector) and + ``pg_textsearch`` / ``pgroonga`` / ``pg_search`` (dummy text column) all + fail this check, so the backfill is a no-op for them. + """ + row = conn.execute( + text( + """ + SELECT is_generated, udt_name + FROM information_schema.columns + WHERE table_schema = :schema + AND table_name = :table + AND column_name = 'search_vector' + """ + ), + {"schema": schema, "table": table}, + ).fetchone() + if not row: + return False + is_generated, udt_name = row[0], row[1] + return udt_name == "tsvector" and is_generated != "ALWAYS" + + +def _pg_upgrade() -> None: + conn = op.get_bind() + schema_name = _schema_name() + if not _is_regular_tsvector(conn, schema_name, "memory_units"): + # Non-native backend (or column absent) — nothing to backfill. + return + schema_prefix = _schema_prefix() + lang = _native_language() + op.execute( + f""" + UPDATE {schema_prefix}memory_units + SET search_vector = to_tsvector('{lang}'::regconfig, COALESCE(text, '')) + WHERE fact_type = 'observation' AND search_vector IS NULL + """ + ) + + +def _pg_downgrade() -> None: + # No-op: backfilled rows are indistinguishable from observations that were + # populated by the post-fix writer, and reverting either to NULL would + # re-break BM25 retrieval. The column simply stays populated. + pass + + +def upgrade() -> None: + run_for_dialect(pg=_pg_upgrade) + + +def downgrade() -> None: + run_for_dialect(pg=_pg_downgrade) diff --git a/hindsight-api-slim/hindsight_api/engine/consolidation/consolidator.py b/hindsight-api-slim/hindsight_api/engine/consolidation/consolidator.py index 14e0c3f70e..acc26c4cc3 100644 --- a/hindsight-api-slim/hindsight_api/engine/consolidation/consolidator.py +++ b/hindsight-api-slim/hindsight_api/engine/consolidation/consolidator.py @@ -224,13 +224,18 @@ async def _dedup_reconcile_create( # Fold the new source facts into the twin and persist the merged text. We keep the twin's # existing embedding: the merged text is >= threshold similar, so the stored vector stays # representative and we avoid a re-embed + a dialect-specific vector UPDATE. + search_vector_clause = ( + f",\n search_vector = to_tsvector('{config.text_search_extension_native_language}'::regconfig, COALESCE($1, ''))" + if config.text_search_extension == "native" + else "" + ) await conn.execute( f""" UPDATE {fq_table("memory_units")} SET text = $1, source_memory_ids = (SELECT array_agg(DISTINCT e) FROM unnest(source_memory_ids || $2::uuid[]) e), proof_count = (SELECT count(DISTINCT e) FROM unnest(source_memory_ids || $2::uuid[]) e), - updated_at = now() + updated_at = now(){search_vector_clause} WHERE id = $3::uuid """, outcome.merged_text, @@ -279,6 +284,11 @@ async def _dedup_reconcile_update( # the create path) then delete the now-redundant updated row. The all_strict/any tag match # guarantees twin and updated share scope, so dropping the updated row's tags loses no # visibility. Temporal fields follow the surviving twin (minimal scope; matches create). + search_vector_clause = ( + f",\n search_vector = to_tsvector('{config.text_search_extension_native_language}'::regconfig, COALESCE($1, ''))" + if config.text_search_extension == "native" + else "" + ) await conn.execute( f""" UPDATE {fq_table("memory_units")} t @@ -289,7 +299,7 @@ async def _dedup_reconcile_update( proof_count = ( SELECT count(DISTINCT e) FROM unnest(t.source_memory_ids || u.source_memory_ids) e ), - updated_at = now() + updated_at = now(){search_vector_clause} FROM {fq_table("memory_units")} u WHERE t.id = $2::uuid AND u.id = $3::uuid """, @@ -1845,6 +1855,12 @@ async def _execute_update_action( config = get_config() + search_vector_clause = ( + f",\n search_vector = to_tsvector('{config.text_search_extension_native_language}'::regconfig, COALESCE($1, ''))" + if config.text_search_extension == "native" + else "" + ) + t0 = time.time() await conn.execute( f""" @@ -1857,7 +1873,7 @@ async def _execute_update_action( updated_at = now(), occurred_start = LEAST(occurred_start, COALESCE($6, occurred_start)), occurred_end = GREATEST(occurred_end, COALESCE($7, occurred_end)), - mentioned_at = GREATEST(mentioned_at, COALESCE($8, mentioned_at)) + mentioned_at = GREATEST(mentioned_at, COALESCE($8, mentioned_at)){search_vector_clause} WHERE id = $5 """, new_text, @@ -2333,16 +2349,20 @@ async def _create_observation_directly( tokenize($3, 'llmlingua2')::bm25_catalog.bm25vector) RETURNING id """ - else: # native, pg_textsearch, pgroonga, or pg_search - # pg_textsearch / pgroonga / pg_search: indexes operate on base text - # columns directly, so the dummy search_vector column is left NULL. - # Native: the migration p4q5r6s7t8u9 dropped the GENERATED expression on - # search_vector to allow per-deployment language configuration; the - # batch insert path in ops_postgresql.insert_facts_batch now populates - # it via to_tsvector($lang, ...). This single-observation INSERT does - # not, so observations under the native backend currently land with - # NULL search_vector and are not BM25-searchable until reflected/ - # re-ingested. Tracking a separate fix for that gap. + elif config.text_search_extension == "native": + # Native: search_vector is populated with to_tsvector() using the + # configured native language dictionary, matching the batch insert + # path in ops_postgresql.insert_facts_batch. + query = f""" + INSERT INTO {fq_table("memory_units")} ( + id, bank_id, text, fact_type, embedding, proof_count, source_memory_ids, + tags, event_date, occurred_start, occurred_end, mentioned_at, search_vector + ) + VALUES ($1, $2, $3, 'observation', $4::vector, 1, $5, $6, $7, $8, $9, $10, + to_tsvector('{config.text_search_extension_native_language}'::regconfig, COALESCE($3, ''))) + RETURNING id + """ + else: # pg_textsearch, pgroonga, pg_search: indexes operate on base text columns directly query = f""" INSERT INTO {fq_table("memory_units")} ( id, bank_id, text, fact_type, embedding, proof_count, source_memory_ids, diff --git a/hindsight-api-slim/tests/test_chunking.py b/hindsight-api-slim/tests/test_chunking.py index ea69a46567..e6e40f610a 100644 --- a/hindsight-api-slim/tests/test_chunking.py +++ b/hindsight-api-slim/tests/test_chunking.py @@ -455,4 +455,3 @@ def test_merged_json_array_routes_to_conversation_chunking(): assert isinstance(parsed, list), f"Chunk must be a JSON array: {chunk[:60]}" assert all(isinstance(e, dict) for e in parsed), f"Every element must be a dict: {chunk[:60]}" assert all("role" in e for e in parsed), f"Every element must have a role key: {chunk[:60]}" - diff --git a/hindsight-api-slim/tests/test_consolidation.py b/hindsight-api-slim/tests/test_consolidation.py index cd8e6fe2d2..932d41e531 100644 --- a/hindsight-api-slim/tests/test_consolidation.py +++ b/hindsight-api-slim/tests/test_consolidation.py @@ -3619,3 +3619,39 @@ def test_consolidation_prompt_split_is_cacheable_and_complete(): ) assert "OBSERVATION LIMIT REACHED" in capped assert "OBSERVATION LIMIT REACHED" not in sys_prompt + + +@pytest.mark.asyncio +async def test_create_observation_populates_search_vector_native(memory, request_context): + """Observations created via consolidation must have search_vector populated + when text_search_extension == 'native', so BM25 retrieval finds them.""" + from hindsight_api.config import get_config + + config = get_config() + if config.text_search_extension != "native": + pytest.skip("Only applies to native text search backend") + + bank_id = f"test-search-vector-{uuid.uuid4().hex[:8]}" + await memory.get_bank_profile(bank_id=bank_id, request_context=request_context) + + await memory.retain_async( + bank_id=bank_id, + content="Django uses middleware for request processing.", + request_context=request_context, + ) + + async with memory._pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT search_vector + FROM memory_units + WHERE bank_id = $1 AND fact_type = 'observation' + LIMIT 1 + """, + bank_id, + ) + + assert row is not None, "Consolidation should have created an observation" + assert row["search_vector"] is not None, "search_vector must be populated for BM25 retrieval under native backend" + + await memory.delete_bank(bank_id, request_context=request_context) diff --git a/hindsight-api-slim/tests/test_migration_observation_search_vector_backfill.py b/hindsight-api-slim/tests/test_migration_observation_search_vector_backfill.py new file mode 100644 index 0000000000..77c9a7c10f --- /dev/null +++ b/hindsight-api-slim/tests/test_migration_observation_search_vector_backfill.py @@ -0,0 +1,155 @@ +"""Regression for the consolidator search_vector gap (PR #2425). + +Observations written by the consolidator under the ``native`` text-search +backend landed with a NULL ``search_vector`` and were invisible to BM25. The +writer is fixed to populate the tsvector; migration +``c3f7a1b9d2e4`` backfills the historical NULL observations. + +This test seeds an observation with a NULL ``search_vector`` at the revision +just before the backfill, runs the migration to head, and asserts the row is +populated with a valid tsvector — and that already-populated rows are left +untouched. Uses a dedicated pg0 instance (mirrors test_migration_backsweep) so +it controls exactly which migrations have run and never stamps the shared test +instance. +""" + +import asyncio +import uuid +from pathlib import Path + +import pytest +from alembic import command +from alembic.config import Config +from sqlalchemy import create_engine, text + +_SCRIPT_LOCATION = str(Path(__file__).parent.parent / "hindsight_api" / "alembic") + +# Revision immediately before the backfill migration. +_PRE_BACKFILL_REVISION = "f4d1c2b3a5e6" +_BACKFILL_REVISION = "c3f7a1b9d2e4" + + +def _alembic_cfg(db_url: str) -> Config: + cfg = Config() + cfg.set_main_option("script_location", _SCRIPT_LOCATION) + cfg.set_main_option("sqlalchemy.url", db_url) + cfg.set_main_option("prepend_sys_path", ".") + cfg.set_main_option("path_separator", "os") + return cfg + + +@pytest.fixture(scope="module") +def pre_backfill_db_url(): + """pg0 instance brought to the revision just before the backfill so the + migration's UPDATE runs against seeded NULL-search_vector observations.""" + from hindsight_api.pg0 import EmbeddedPostgres + + pg0 = EmbeddedPostgres(name="hindsight-obs-sv-backfill-test", port=5568) + loop = asyncio.new_event_loop() + try: + url = loop.run_until_complete(pg0.ensure_running()) + finally: + loop.close() + + # pg0 data dirs persist across runs, so normalise: go to head, then down to + # just before the backfill. + command.upgrade(_alembic_cfg(url), "heads") + command.downgrade(_alembic_cfg(url), _PRE_BACKFILL_REVISION) + return url + + +def test_backfill_populates_null_observation_search_vector(pre_backfill_db_url): + db_url = pre_backfill_db_url + bank_id = f"obs-sv-{uuid.uuid4().hex[:12]}" + + null_obs_id = uuid.uuid4() + populated_obs_id = uuid.uuid4() + world_id = uuid.uuid4() + + engine = create_engine(db_url) + with engine.connect() as conn: + # Sanity: under the default native backend search_vector is a regular + # tsvector column; otherwise this test wouldn't exercise the gate. + udt = conn.execute( + text( + """ + SELECT udt_name FROM information_schema.columns + WHERE table_name = 'memory_units' AND column_name = 'search_vector' + """ + ) + ).scalar() + assert udt == "tsvector", f"expected native tsvector backend, got {udt!r}" + + conn.execute(text("INSERT INTO banks (bank_id) VALUES (:b)"), {"b": bank_id}) + + # The bug shape: an observation with NULL search_vector. + conn.execute( + text( + """ + INSERT INTO memory_units (id, bank_id, text, fact_type, search_vector) + VALUES (:id, :b, 'Django uses middleware for request processing', 'observation', NULL) + """ + ), + {"id": null_obs_id, "b": bank_id}, + ) + # An observation already populated — must be left byte-for-byte intact. + conn.execute( + text( + """ + INSERT INTO memory_units (id, bank_id, text, fact_type, search_vector) + VALUES (:id, :b, 'Already indexed observation', 'observation', + to_tsvector('english', 'Already indexed observation')) + """ + ), + {"id": populated_obs_id, "b": bank_id}, + ) + # A non-observation row with NULL search_vector — must NOT be touched + # (the migration is scoped to fact_type = 'observation'). + conn.execute( + text( + """ + INSERT INTO memory_units (id, bank_id, text, fact_type, search_vector) + VALUES (:id, :b, 'A world fact', 'world', NULL) + """ + ), + {"id": world_id, "b": bank_id}, + ) + conn.commit() + + # Run the backfill. + command.upgrade(_alembic_cfg(db_url), _BACKFILL_REVISION) + + with engine.connect() as conn: + null_obs_sv, null_obs_match = conn.execute( + text( + """ + SELECT search_vector IS NOT NULL, + search_vector @@ plainto_tsquery('english', 'middleware') + FROM memory_units WHERE id = :id + """ + ), + {"id": null_obs_id}, + ).fetchone() + assert null_obs_sv, "backfill must populate the NULL observation's search_vector" + assert null_obs_match, "backfilled tsvector must be BM25-searchable on its own text" + + populated_match = conn.execute( + text("SELECT search_vector @@ plainto_tsquery('english', 'indexed') FROM memory_units WHERE id = :id"), + {"id": populated_obs_id}, + ).scalar() + assert populated_match, "pre-populated observation must remain searchable" + + world_null = conn.execute( + text("SELECT search_vector IS NULL FROM memory_units WHERE id = :id"), + {"id": world_id}, + ).scalar() + assert world_null, "non-observation rows must be left untouched by the backfill" + + # Idempotency: re-running touches nothing and stays at head. + command.upgrade(_alembic_cfg(db_url), "heads") + with engine.connect() as conn: + still_populated = conn.execute( + text("SELECT search_vector IS NOT NULL FROM memory_units WHERE id = :id"), + {"id": null_obs_id}, + ).scalar() + assert still_populated diff --git a/hindsight-api-slim/tests/test_retain_append_mode.py b/hindsight-api-slim/tests/test_retain_append_mode.py index d02b65b353..44060a2eed 100644 --- a/hindsight-api-slim/tests/test_retain_append_mode.py +++ b/hindsight-api-slim/tests/test_retain_append_mode.py @@ -254,10 +254,12 @@ async def test_append_mode_conversation_arrays_produce_valid_json(memory, reques try: # First retain - JSON conversation array - turn1 = json.dumps([ - {"role": "user", "content": "Hello"}, - {"role": "assistant", "content": "Hi there"}, - ]) + turn1 = json.dumps( + [ + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi there"}, + ] + ) await memory.retain_batch_async( bank_id=bank_id, contents=[ @@ -271,10 +273,12 @@ async def test_append_mode_conversation_arrays_produce_valid_json(memory, reques ) # Second retain - append more turns - turn2 = json.dumps([ - {"role": "user", "content": "How are you"}, - {"role": "assistant", "content": "Doing well"}, - ]) + turn2 = json.dumps( + [ + {"role": "user", "content": "How are you"}, + {"role": "assistant", "content": "Doing well"}, + ] + ) await memory.retain_batch_async( bank_id=bank_id, contents=[ @@ -300,10 +304,12 @@ async def test_append_mode_conversation_arrays_produce_valid_json(memory, reques assert len(parsed) == 4, "Should contain all 4 messages from both retains" # Third retain - append again, verify no degradation - turn3 = json.dumps([ - {"role": "user", "content": "What is new"}, - {"role": "assistant", "content": "Not much"}, - ]) + turn3 = json.dumps( + [ + {"role": "user", "content": "What is new"}, + {"role": "assistant", "content": "Not much"}, + ] + ) await memory.retain_batch_async( bank_id=bank_id, contents=[ @@ -329,4 +335,3 @@ async def test_append_mode_conversation_arrays_produce_valid_json(memory, reques finally: await memory.delete_bank(bank_id, request_context=request_context) -