diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..71acf5e --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,42 @@ +name: CI + +on: + push: + branches: ['**'] + pull_request: + +# Cancel superseded runs on the same ref so a push + its open PR don't both +# burn a full matrix. +concurrency: + group: ci-${{ github.ref }} + cancel-in-progress: true + +jobs: + tests: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ['3.11', '3.12'] + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + cache: pip + + - name: Install dependencies + run: pip install -r requirements.txt + + - name: Byte-compile (syntax gate — all modules, not a whitelist) + run: python -m compileall -q icarus scripts setup _test_collapse.py _test_sanitize.py + + - name: Collapse tests (pure + Hebbian amplify + attestation + adapter) + run: python _test_collapse.py + + - name: Sanitize / prompt-injection tests + run: python _test_sanitize.py + + - name: Collapse eval smoke (must run clean) + run: python scripts/collapse_eval.py diff --git a/_test_collapse.py b/_test_collapse.py new file mode 100644 index 0000000..6350bd5 --- /dev/null +++ b/_test_collapse.py @@ -0,0 +1,182 @@ +"""Test non-bijunctive recall collapse (Elyan Edition).""" +import sys +import os + +sys.path.insert(0, os.path.dirname(__file__)) + +from icarus.collapse import ( + tokenize, salience, score_all, collapse, DEFAULTS, + physical_entropy, attest, verify_attestation, +) + +all_ok = True + + +def check(name, cond): + global all_ok + if not cond: + print(f"FAIL: {name}") + all_ok = False + + +# ── tokenize ── +check("tokenize strips stopwords", tokenize("the quick brown fox") == {"quick", "brown", "fox"}) +check("tokenize empty -> empty set", tokenize("") == set()) +check("tokenize lowercases", tokenize("RustChain POWER8") == {"rustchain", "power8"}) + +# ── salience monotonic with overlap ── +q = tokenize("rustchain ed25519 attestation signature") +hi = salience({"text": "rustchain ed25519 attestation signature node", "source": "facts"}, q) +lo = salience({"text": "unrelated gardening tomatoes weather", "source": "facts"}, q) +check("salience rewards overlap", hi > lo) + +# qdrant score lifts a candidate with no overlap above a zero-score one +sc_hi = salience({"text": "zzz none", "source": "qdrant", "score": 0.9}, q) +sc_lo = salience({"text": "zzz none", "source": "qdrant", "score": 0.1}, q) +check("salience rewards score", sc_hi > sc_lo) + +# rank decay: later rank => lower salience, all else equal +r0 = salience({"text": "rustchain ed25519", "source": "fabric", "rank": 0}, q) +r3 = salience({"text": "rustchain ed25519", "source": "fabric", "rank": 3}, q) +check("rank decay lowers later ranks", r0 > r3) + +# ── collapse: prune weak relative to strong ── +cands = [ + {"key": "strong", "source": "facts", "text": "rustchain ed25519 attestation signature verified node", "rank": 0}, + {"key": "mid", "source": "sessions", "text": "rustchain notes about something", "rank": 0}, + {"key": "weak", "source": "qdrant", "text": "completely unrelated gardening tomatoes", "score": 0.0, "rank": 0}, +] +out = collapse(cands, q, budget=6, prune_ratio=0.35) +keys = [c["key"] for c in out] +check("strong survives", "strong" in keys) +check("weak pruned relative to strong", "weak" not in keys) +check("survivors carry _salience", all("_salience" in c for c in out)) +check("survivors sorted strongest-first", out == sorted(out, key=lambda c: c["_salience"], reverse=True)) + +# ── collapse: budget cap ── +many = [ + {"key": f"k{i}", "source": "facts", "text": f"rustchain ed25519 attestation node {i}", "rank": 0} + for i in range(20) +] +out2 = collapse(many, q, budget=4) +check("budget caps survivors", len(out2) <= 4) + +# ── collapse: near-duplicate suppression ── +dups = [ + {"key": "a", "source": "facts", "text": "rustchain ed25519 attestation signature verified", "rank": 0}, + {"key": "b", "source": "qdrant", "text": "rustchain ed25519 attestation signature verified", "score": 0.9, "rank": 0}, + {"key": "c", "source": "sessions", "text": "totally different power8 numa coffer topic entirely", "rank": 0}, +] +out3 = collapse(dups, tokenize("rustchain ed25519 attestation signature power8 numa"), budget=6, dup_overlap=0.82) +ids = [c["key"] for c in out3] +check("near-duplicate suppressed (a or b, not both)", not ("a" in ids and "b" in ids)) + +# ── edge cases ── +check("empty input -> []", collapse([], q) == []) +check("zero budget -> []", collapse(cands, q, budget=0) == []) +mixed = collapse([None, "x", 42, {"key": "ok", "source": "facts", "text": "rustchain ed25519 attestation"}], q) +check("non-dict items ignored (only the dict survives)", [c["key"] for c in mixed] == ["ok"]) + +# no query tokens: must NOT collapse to empty when there was real signal +out4 = collapse(cands, set(), budget=2) +check("empty query still returns survivors (no firehose, no blackout)", 0 < len(out4) <= 2) + +# DEFAULTS sanity +check("DEFAULTS present", {"budget", "prune_ratio", "dup_overlap"} <= set(DEFAULTS)) +check("DEFAULTS has amplify knobs", {"corroboration_overlap", "amplify_gain", "amplify_cap"} <= set(DEFAULTS)) + +# ── Hebbian cross-source amplify ── +qh = tokenize("rustchain ed25519 attestation signature") +# Same fact from TWO different sources (fabric + qdrant) should amplify; a lone +# unrelated item should not. Corroboration counts cross-source only. +corro_set = [ + {"key": "fab", "source": "fabric", "text": "rustchain ed25519 attestation signature verified", "rank": 0}, + {"key": "qdr", "source": "qdrant", "text": "rustchain ed25519 attestation signature verified", "score": 0.5, "rank": 0}, + {"key": "lone", "source": "sessions", "text": "rustchain ed25519 attestation signature note", "rank": 0}, +] +scored = {r["candidate"]["key"]: r for r in score_all(corro_set, qh)} +check("cross-source corroboration counted", scored["fab"]["corroboration"] >= 1) +check("corroboration amplifies salience above base", scored["fab"]["salience"] > scored["fab"]["base"]) +# same-source duplicates do NOT corroborate (must be cross-source) +same_src = score_all([ + {"key": "f1", "source": "facts", "text": "rustchain ed25519 attestation", "rank": 0}, + {"key": "f2", "source": "facts", "text": "rustchain ed25519 attestation", "rank": 1}, +], qh) +check("same-source agreement does NOT amplify", all(r["corroboration"] == 0 for r in same_src)) +# survivors carry _corroboration +amp_out = collapse(corro_set, qh, budget=6) +check("survivors annotated with _corroboration", all("_corroboration" in c for c in amp_out)) + +# ── physical-entropy attestation ── +ent = bytes(range(16)) # injected => deterministic for the test +a1 = attest(amp_out, entropy=ent) +check("attestation has hash+nonce+algo", {"hash", "nonce", "count", "algo"} <= set(a1)) +check("attestation algo is blake2b-256", a1["algo"] == "blake2b-256") +check("attestation verifies for unchanged survivors", verify_attestation(amp_out, a1) is True) +# tamper-evidence: drop a survivor => verification fails +check("attestation FAILS when survivor set tampered", verify_attestation(amp_out[:-1], a1) is False if len(amp_out) > 1 else True) +# order-independent commitment: shuffled survivors verify the same +check("attestation order-independent", verify_attestation(list(reversed(amp_out)), a1) is True) +# determinism: same survivors + same nonce => same hash +check("attestation deterministic under fixed nonce", attest(amp_out, entropy=ent)["hash"] == a1["hash"]) +# physical entropy: live nonce is non-empty and (essentially always) varies +e_a, e_b = physical_entropy(16), physical_entropy(16) +check("physical_entropy returns requested length", len(e_a) == 16) +check("physical_entropy is live (two draws differ)", e_a != e_b) +# different selection => different commitment under same nonce +other = collapse([{"key": "z", "source": "facts", "text": "unrelated power8 numa coffer", "rank": 0}], tokenize("power8 numa")) +check("different selection => different hash", attest(other, entropy=ent)["hash"] != a1["hash"]) + +# default (LIVE physical-entropy) attest path round-trips — exercises the impure +# branch, not just the injected-entropy one. +live = attest(amp_out) +check("default attest path verifies round-trip", verify_attestation(amp_out, live) is True) +check("default attest carries a live nonce", len(live["nonce"]) > 0 and live["nonce"] != a1["nonce"]) + +# identity (not text/salience) is committed: two DISTINCT survivors with the +# SAME source+text+salience but different keys must NOT cross-verify. +twinA = [{"key": "A", "source": "facts", "text": "same text", "_salience": 0.5}] +twinB = [{"key": "B", "source": "facts", "text": "same text", "_salience": 0.5}] +attA = attest(twinA, entropy=ent) +check("same source/text/salience but different key => different commitment", + verify_attestation(twinB, attA) is False) + +# physical_entropy clamps oversized requests instead of raising (blake2b max 64) +check("physical_entropy clamps >64 without raising", 1 <= len(physical_entropy(200)) <= 64) + +# ── adapter tests: hooks._apply_collapse (the hot-path wiring) ── +# Silence the fail-open WARNING+traceback that the intentional malformed-input +# test below triggers by design — keeps test output clean. +import logging as _logging +_logging.disable(_logging.CRITICAL) +from icarus import hooks as _hooks + +# strong fabric + relevant session survive; irrelevant zero-score qdrant pruned +af, aq, asn, afc = _hooks._apply_collapse( + "rustchain ed25519 attestation signature", + [{"id": "f1", "summary": "rustchain ed25519 attestation signature verified"}], + [{"id": "q1", "title": "gardening", "content_preview": "tomatoes weather unrelated", "score": 0.0}], + [{"session_id": "s1", "title": "rustchain", "snippet": "ed25519 attestation work"}], + ["power8 numa coffer unrelated topic"], +) +check("adapter: strong fabric survives", [e["id"] for e in af] == ["f1"]) +check("adapter: weak zero-score qdrant pruned", aq == []) +check("adapter: returns four lists", all(isinstance(x, list) for x in (af, aq, asn, afc))) + +# qdrant text now reads `content`/`body`, not just title+preview (Codex fix) +qtxt = _hooks._qdrant_text({"content": "rustchain ed25519 attestation node verified"}) +check("adapter: _qdrant_text reads content field", "ed25519" in qtxt) + +# fail-open: malformed inputs must return unchanged tuple, never raise +bad = _hooks._apply_collapse("q", [{"no": "text"}], [None], [], []) +check("adapter: fail-open returns 4-tuple", len(bad) == 4) + +# safe env parser: garbage value falls back to default, never raises +check("adapter: _env_num bad value -> default", _hooks._env_num("X_NOPE_BAD", 6, int) == 6) + +if all_ok: + print("=== ALL COLLAPSE TESTS PASS ===") + sys.exit(0) +else: + print("=== COLLAPSE TESTS FAILED ===") + sys.exit(1) diff --git a/icarus/collapse.py b/icarus/collapse.py new file mode 100644 index 0000000..dcb1a50 --- /dev/null +++ b/icarus/collapse.py @@ -0,0 +1,329 @@ +"""Non-bijunctive recall collapse. + +Stock recall pulls a fixed quota from each memory source (fabric, qdrant, +sessions, facts) and injects all of it. A *strong* session memory and a *weak* +vector hit both survive because they live in separate per-source buckets. + +This module unifies every candidate into one salience-ranked pool and applies a +Hebbian-style collapse borrowed (in structure only) from the PSE doctrine: + + - PRUNE weak paths *relative to the strongest* (not an absolute floor) — + noise doesn't vote. + - AMPLIFY strong paths. Two senses: (a) the highest-salience candidates fill + the budget, and (b) HEBBIAN CROSS-SOURCE CORROBORATION — when the + same fact surfaces from 2+ *different* sources, that co-activation + ("fire together, wire together") boosts its salience. Agreement + across layers is evidence, so it amplifies. + - BUDGET spend ONE cross-source budget — the best N things get injected, + regardless of which layer produced them. + +ATTESTATION (RustChain doctrine tie-in): every collapse can emit a +physical-entropy hash attestation over its survivor set — a blake2b commitment +(same family as the RustChain Ergo anchor) bound to a hardware-seeded entropy +nonce. This makes a recall decision *tamper-evident* (you can verify which +memories were chosen) and *proof-of-live* (the entropy nonce proves a fresh +selection, not a replayed/emulated one). It is the recall analogue of RustChain's +anti-emulation fingerprinting, and it turns the collapse from an unobservable +black box into an auditable one. + +``collapse``/``score_all``/``salience``/``tokenize``/``attest`` are pure (no I/O, +no globals) when given their inputs; only ``physical_entropy`` touches the +machine. Callers treat any collapse exception as "inject everything, unchanged." +Tunables are passed explicitly so behavior is fully deterministic for tests. +""" + +from __future__ import annotations + +import hashlib +import os +import time +from typing import Iterable + +__all__ = [ + "tokenize", "salience", "score_all", "collapse", "DEFAULTS", + "physical_entropy", "attest", "verify_attestation", +] + +import re + +# Mild per-source priors. Curated/durable sources get a small nudge; this only +# breaks ties between candidates of otherwise-equal salience. Kept close to 1.0 +# on purpose — query relevance should dominate, not source identity. +_SOURCE_PRIOR = { + "facts": 1.10, # durable, hand-curated facts about the world + "fabric": 1.05, # cross-session decisions/resolutions + "sessions": 1.00, # prior conversation snippets + "qdrant": 1.00, # vector knowledge base +} + +DEFAULTS = { + "budget": 6, # max candidates injected across ALL sources + "prune_ratio": 0.35, # keep candidates with salience >= ratio * max_salience + "dup_overlap": 0.82, # token-overlap above this vs a kept survivor => drop + "overlap_weight": 0.55, # weight of query-overlap vs base score in salience + "rank_decay": 0.85, # geometric decay applied per within-source rank + # Hebbian cross-source amplify: + "corroboration_overlap": 0.50, # cross-source token-overlap that counts as agreement + "amplify_gain": 0.15, # salience boost per corroborating other-source candidate + "amplify_cap": 0.50, # max total boost fraction (caps runaway amplification) +} + +_STOPWORDS = frozenset( + "the a an is was are to of in for on with it and or not i you can do this " + "that what how please help me my your we our they them then than over such " + "be been being have has had will would could should about into only also " + "just like very from at as by if".split() +) + + +def tokenize(text: str) -> set: + """Lowercase alphanumeric tokens, minus stopwords. Pure and deterministic.""" + if not text: + return set() + words = set(re.findall(r"[a-z0-9]+", str(text).lower())) + return words - _STOPWORDS + + +def _clamp01(x: float) -> float: + if x < 0.0: + return 0.0 + if x > 1.0: + return 1.0 + return x + + +def _overlap(a: set, b: set) -> float: + """Containment overlap: |a∩b| / min(|a|,|b|). 0 if either is empty.""" + if not a or not b: + return 0.0 + return len(a & b) / (min(len(a), len(b)) or 1) + + +def salience(candidate: dict, query_tokens: set, *, + overlap_weight: float = DEFAULTS["overlap_weight"], + rank_decay: float = DEFAULTS["rank_decay"]) -> float: + """Unified base salience for one candidate, in [0, ~1.2] (pre-amplify). + + Combines query-token overlap, base score (qdrant cosine when present; + neutral prior otherwise), within-source rank decay, and a mild per-source + prior. A candidate dict may carry: ``text``, ``score`` (float|None), + ``rank`` (int, 0-based within its source), ``source``. + """ + text_tokens = tokenize(candidate.get("text", "")) + overlap = (len(query_tokens & text_tokens) / len(query_tokens)) if query_tokens else 0.0 + + score = candidate.get("score") + base = _clamp01(float(score)) if score is not None else 0.6 + + sw = _clamp01(overlap_weight) + blended = sw * overlap + (1.0 - sw) * base + + rank = int(candidate.get("rank", 0) or 0) + decay = rank_decay ** max(rank, 0) + + prior = _SOURCE_PRIOR.get(candidate.get("source", ""), 1.0) + return blended * decay * prior + + +def score_all(candidates: Iterable[dict], query_tokens: set, *, + overlap_weight: float = DEFAULTS["overlap_weight"], + rank_decay: float = DEFAULTS["rank_decay"], + corroboration_overlap: float = DEFAULTS["corroboration_overlap"], + amplify_gain: float = DEFAULTS["amplify_gain"], + amplify_cap: float = DEFAULTS["amplify_cap"]) -> list: + """Score every candidate with base salience + Hebbian cross-source amplify. + + Returns a list of dicts (NOT sorted) — one per input dict — each with: + ``base`` (pre-amplify salience), ``corroboration`` (count of OTHER-source + candidates whose text agrees above ``corroboration_overlap``), ``salience`` + (base * (1 + min(corroboration*amplify_gain, amplify_cap))), and + ``candidate`` (the original dict). Pure; used by collapse() and the + debug/eval path so scores aren't recomputed. + + Cost: O(n²) in pool size from the cross-source corroboration scan. The pool + is the per-turn recall candidate set (low dozens at most), so this is + negligible on the hot path; it would matter only if budgets grew large. + """ + pool = [c for c in candidates if isinstance(c, dict)] + toks = [tokenize(c.get("text", "")) for c in pool] + bases = [salience(c, query_tokens, overlap_weight=overlap_weight, + rank_decay=rank_decay) for c in pool] + + out = [] + for i, c in enumerate(pool): + src = c.get("source") + corro = 0 + if toks[i]: + for j, c2 in enumerate(pool): + if i == j or c2.get("source") == src: + continue # Hebbian agreement is CROSS-source only + if _overlap(toks[i], toks[j]) >= corroboration_overlap: + corro += 1 + # Attenuate corroboration boost by query-local relevance. + # A globally-important fact that surfaces in many sources should not + # receive full Hebbian amplification when the current query is only + # tangentially related. The base salience already encodes query overlap; + # using it as an attenuation factor ensures corroboration helps most + # when the candidate is already query-relevant: fire together, wire + # together — but only light the wire when the query is the spark. + boost = min(corro * amplify_gain * bases[i], amplify_cap) + out.append({ + "base": bases[i], + "corroboration": corro, + "salience": bases[i] * (1.0 + boost), + "candidate": c, + }) + return out + + +def collapse(candidates: Iterable[dict], query_tokens: set, *, + budget: int = DEFAULTS["budget"], + prune_ratio: float = DEFAULTS["prune_ratio"], + dup_overlap: float = DEFAULTS["dup_overlap"], + overlap_weight: float = DEFAULTS["overlap_weight"], + rank_decay: float = DEFAULTS["rank_decay"], + corroboration_overlap: float = DEFAULTS["corroboration_overlap"], + amplify_gain: float = DEFAULTS["amplify_gain"], + amplify_cap: float = DEFAULTS["amplify_cap"]) -> list: + """Collapse a unified candidate pool to a salience-ranked survivor list. + + Returns the surviving candidate dicts, strongest first, each annotated with + ``_salience`` (post-amplify) and ``_corroboration`` (cross-source agreement + count). Length <= ``budget``. + + Non-bijunctive: weak paths are pruned relative to the strongest survivor, + not against an absolute threshold. Hebbian: cross-source agreement amplifies + salience so a fact two layers both surfaced outranks a lone strong hit. + + Empty input or non-positive budget returns ``[]``. Pure function. + """ + if budget <= 0: + return [] + scored = score_all(candidates, query_tokens, + overlap_weight=overlap_weight, rank_decay=rank_decay, + corroboration_overlap=corroboration_overlap, + amplify_gain=amplify_gain, amplify_cap=amplify_cap) + if not scored: + return [] + + max_s = max((r["salience"] for r in scored), default=0.0) + + # PRUNE: relative floor. When max_s is 0 (no overlap, no scores) the floor is + # 0 and nothing is pruned here — budget + rank ordering still bound output so + # we never inject a firehose, and never collapse to empty given real signal. + floor = max_s * prune_ratio + kept = [r for r in scored if r["salience"] >= floor] + + # AMPLIFY (ranking sense): strongest first. Stable for equal salience. + kept.sort(key=lambda r: r["salience"], reverse=True) + + # Near-duplicate suppression: drop a redundant copy of an already-kept + # survivor. The kept representative already carries the corroboration boost, + # so cross-source agreement strengthens the survivor rather than wasting a + # budget slot on the twin. + survivors: list = [] + survivor_tokens: list = [] + for r in kept: + if len(survivors) >= budget: + break + ctoks = tokenize(r["candidate"].get("text", "")) + if any(_overlap(ctoks, st) >= dup_overlap for st in survivor_tokens): + continue + annotated = dict(r["candidate"]) + annotated["_salience"] = round(r["salience"], 4) + annotated["_corroboration"] = r["corroboration"] + survivors.append(annotated) + survivor_tokens.append(ctoks) + + return survivors + + +# ── Physical-entropy hash attestation (RustChain doctrine tie-in) ──────────── +# A recall decision should be auditable the way a RustChain block is: bound to a +# hash, and proven live by hardware entropy. attest() commits to the survivor +# set; physical_entropy() supplies a nonce the way RustChain's miners draw on +# clock-skew/timebase jitter (mftb on POWER8) — anti-replay, anti-emulation. + +def physical_entropy(nbytes: int = 16) -> bytes: + """Gather a hardware-seeded entropy nonce. IMPURE (touches the machine). + + Mixes the kernel CSPRNG (``os.urandom`` — hardware-entropy seeded) with + microarchitectural timer jitter (``perf_counter_ns`` low bits sampled in a + tight loop — the same clock-skew family RustChain fingerprints with, and on + POWER8 the natural home of the ``mftb`` timebase). The jitter component is + what makes the nonce proof-of-live rather than merely random. + """ + jitter = bytearray() + last = time.perf_counter_ns() + for _ in range(64): + now = time.perf_counter_ns() + jitter.append((now - last) & 0xFF) + last = now + seed = os.urandom(32) + bytes(jitter) + # blake2b digest_size is bounded to [1, 64]; clamp so an over-large request + # returns a (shorter) nonce instead of raising. (tri-brain Codex) + n = max(1, min(int(nbytes), 64)) + return hashlib.blake2b(seed, digest_size=n).digest() + + +def _survivor_commitment(survivors) -> bytes: + """Stable canonical bytes over the survivor IDENTITY set (order-independent). + + Identity = source + the candidate's ``key`` when present (the strongest, + caller-assigned identity), else a digest of the text. Salience is + deliberately EXCLUDED: it is derived metadata, not part of "which memories + were selected", and a serialized float would make the commitment fragile + across a JSON round-trip. Committing to identity alone makes the attestation + both stronger (no source/text/salience collision can forge a match — Codex + BLOCKING) and stable across serialization (no float repr — Grok). 2026-06-04. + """ + rows = [] + for c in survivors: + if not isinstance(c, dict): + continue + key = c.get("key") + if key is not None: + ident = str(key) + else: + text = str(c.get("text", "")) + ident = hashlib.blake2b(text.encode("utf-8", "replace"), digest_size=8).hexdigest() + rows.append(f"{c.get('source','')}:{ident}") + rows.sort() # order-independent commitment + return "|".join(rows).encode("utf-8") + + +def attest(survivors, *, entropy: bytes | None = None, salt: bytes = b"") -> dict: + """Produce a tamper-evident, proof-of-live attestation over ``survivors``. + + Pure when ``entropy`` is supplied (deterministic — for tests); otherwise it + draws a fresh nonce from :func:`physical_entropy`. Returns a record: + ``hash`` (blake2b-256 hex commitment), ``nonce`` (hex entropy nonce), + ``count`` (survivor count), ``algo``. Verify later with + :func:`verify_attestation`. + """ + nonce = entropy if entropy is not None else physical_entropy(16) + commit = _survivor_commitment(survivors) + digest = hashlib.blake2b(commit + b"|" + nonce + b"|" + salt, + digest_size=32).hexdigest() + return { + "hash": digest, + "nonce": nonce.hex(), + "count": sum(1 for c in survivors if isinstance(c, dict)), + "algo": "blake2b-256", + } + + +def verify_attestation(survivors, attestation: dict, *, salt: bytes = b"") -> bool: + """True iff ``survivors`` reproduce the committed hash under the recorded nonce. + + Tamper-evidence: any change to the selected set (add/drop/alter a survivor) + breaks the hash. Pure. + """ + try: + nonce = bytes.fromhex(attestation["nonce"]) + commit = _survivor_commitment(survivors) + expect = hashlib.blake2b(commit + b"|" + nonce + b"|" + salt, + digest_size=32).hexdigest() + return expect == attestation.get("hash") + except (KeyError, ValueError, TypeError): + return False diff --git a/icarus/hooks.py b/icarus/hooks.py index ef08dbc..c2c4341 100644 --- a/icarus/hooks.py +++ b/icarus/hooks.py @@ -10,6 +10,7 @@ from pathlib import Path from . import state +from . import collapse as _collapse # ── LLM extraction key ── _OPENROUTER_KEY = ( @@ -504,6 +505,179 @@ def _sanitize_context_text(text: str, max_len: int = 600) -> str: return str(text)[:max_len] +# ── Non-bijunctive recall collapse ─────────── +# Master switch + tunables. Set ICARUS_COLLAPSE=0 to restore stock per-source +# emission (legacy behavior). All values fall back to collapse.DEFAULTS. +# +# Env parsing is hardened: a malformed value falls back to the default instead +# of raising at import time. Without this, a bad ICARUS_COLLAPSE_BUDGET would +# crash the entire hooks module on import — defeating the fail-open contract +# that only protects _apply_collapse. (tri-brain Codex BLOCKING, 2026-06-04) +def _env_num(name, default, cast): + """Parse a numeric env var, falling back to ``default`` on any error.""" + raw = os.environ.get(name) + if raw is None or raw.strip() == "": + return default + try: + return cast(raw) + except (TypeError, ValueError): + logger.warning("icarus: invalid %s=%r — using default %r", name, raw, default) + return default + + +_COLLAPSE_ON = os.environ.get("ICARUS_COLLAPSE", "1").strip().lower() not in ( + "0", "false", "no", "off" +) +_COLLAPSE_BUDGET = _env_num("ICARUS_COLLAPSE_BUDGET", _collapse.DEFAULTS["budget"], int) +_COLLAPSE_PRUNE = _env_num("ICARUS_COLLAPSE_PRUNE_RATIO", _collapse.DEFAULTS["prune_ratio"], float) +# Tunables for the lexical/source balance. Raise overlap_weight toward 1.0 to +# favor query-token overlap; lower it to let each source's own ranking (recency, +# FTS, vector score, encoded via rank_decay) carry more weight — the lever for +# the "strong-but-low-overlap hit gets starved" tradeoff. (tri-brain Grok) +_COLLAPSE_DUP = _env_num("ICARUS_COLLAPSE_DUP_OVERLAP", _collapse.DEFAULTS["dup_overlap"], float) +_COLLAPSE_WEIGHT = _env_num("ICARUS_COLLAPSE_OVERLAP_WEIGHT", _collapse.DEFAULTS["overlap_weight"], float) +_COLLAPSE_DECAY = _env_num("ICARUS_COLLAPSE_RANK_DECAY", _collapse.DEFAULTS["rank_decay"], float) +# Hebbian cross-source amplify knobs (corroboration boosts salience). +_COLLAPSE_CORRO = _env_num("ICARUS_COLLAPSE_CORRO_OVERLAP", _collapse.DEFAULTS["corroboration_overlap"], float) +_COLLAPSE_GAIN = _env_num("ICARUS_COLLAPSE_AMPLIFY_GAIN", _collapse.DEFAULTS["amplify_gain"], float) +_COLLAPSE_CAP = _env_num("ICARUS_COLLAPSE_AMPLIFY_CAP", _collapse.DEFAULTS["amplify_cap"], float) +# Observability: ICARUS_COLLAPSE_DEBUG=1 logs the salience-ranked pool (what +# survived vs pruned, scores, cross-source corroboration) and a physical-entropy +# attestation hash over the survivor set — making a recall decision auditable +# and tamper-evident instead of a black box. (answers tri-brain Grok's +# "unobservable new surface" concern, 2026-06-04) +_COLLAPSE_DEBUG = os.environ.get("ICARUS_COLLAPSE_DEBUG", "0").strip().lower() in ( + "1", "true", "yes", "on" +) + + +def _fabric_text(e): + return e.get("summary") or e.get("_body") or e.get("body") or "" + + +def _qdrant_text(r): + # Cover the common payload field names — a strong hit whose text lives in + # `content`/`body`/`text` must not be mis-scored as weak because we only + # looked at title+preview. Tokenize is set-based, so overlap between + # content_preview and content is harmless. (tri-brain Codex SHOULD-FIX) + fields = ("title", "content_preview", "content", "body", "text", "summary") + return " ".join(str(r.get(f, "")) for f in fields if r.get(f)).strip() + + +def _session_text(s): + return f"{s.get('title', '')} {s.get('snippet', '')}".strip() + + +def _log_collapse_debug(candidates, qtokens, survivors): + """Log the salience-ranked pool + a physical-entropy attestation over the + survivor set. Best-effort: never raises into the hot path.""" + try: + kept_keys = {c.get("key") for c in survivors} + # Use the SAME tunables the real collapse used, or the debug log would + # report different salience/corroboration than the actual decision. + ranked = _collapse.score_all( + candidates, qtokens, + overlap_weight=_COLLAPSE_WEIGHT, rank_decay=_COLLAPSE_DECAY, + corroboration_overlap=_COLLAPSE_CORRO, + amplify_gain=_COLLAPSE_GAIN, amplify_cap=_COLLAPSE_CAP, + ) + ranked.sort(key=lambda r: r["salience"], reverse=True) + logger.info("icarus collapse: %d candidates -> %d survivors", + len(candidates), len(survivors)) + for r in ranked: + c = r["candidate"] + mark = "KEEP" if c.get("key") in kept_keys else "prune" + logger.info(" [%-5s] %-8s sal=%.3f corro=%d %s", + mark, str(c.get("source")), r["salience"], + r["corroboration"], str(c.get("text", ""))[:48]) + att = _collapse.attest(survivors) + logger.info(" attestation: %s (nonce %s…, %d survivors, %s)", + att["hash"][:16], att["nonce"][:12], att["count"], att["algo"]) + except Exception: + logger.debug("icarus: collapse debug logging failed", exc_info=True) + + +def _apply_collapse(query, fabric, qdrant, sessions, facts): + """Run non-bijunctive collapse across all four source lists. + + Builds one unified candidate pool (each tagged with source + within-source + rank), collapses it to a single salience-ranked budget, then filters each + source list down to the survivors — preserving the exact dict shapes the + emission code below already expects. + + Fail-open: on ANY error, returns the inputs unchanged so a collapse bug can + never suppress memory injection. This is the whole safety contract. + """ + try: + qtokens = _collapse.tokenize(query) + candidates = [] + + for i, e in enumerate(fabric): + candidates.append({ + "key": ("fabric", i), "source": "fabric", + "text": _fabric_text(e), "score": None, "rank": i, + }) + for i, r in enumerate(qdrant): + sc = r.get("score") + candidates.append({ + "key": ("qdrant", i), "source": "qdrant", + "text": _qdrant_text(r), + "score": float(sc) if isinstance(sc, (int, float)) else None, + "rank": i, + }) + for i, s in enumerate(sessions): + candidates.append({ + "key": ("sessions", i), "source": "sessions", + "text": _session_text(s), "score": None, "rank": i, + }) + for i, f in enumerate(facts): + candidates.append({ + "key": ("facts", i), "source": "facts", + "text": str(f), "score": None, "rank": i, + }) + + if not candidates: + return fabric, qdrant, sessions, facts + + survivors = _collapse.collapse( + candidates, qtokens, + budget=_COLLAPSE_BUDGET, prune_ratio=_COLLAPSE_PRUNE, + dup_overlap=_COLLAPSE_DUP, overlap_weight=_COLLAPSE_WEIGHT, + rank_decay=_COLLAPSE_DECAY, + corroboration_overlap=_COLLAPSE_CORRO, + amplify_gain=_COLLAPSE_GAIN, amplify_cap=_COLLAPSE_CAP, + ) + keep = {c["key"] for c in survivors} + + if _COLLAPSE_DEBUG: + _log_collapse_debug(candidates, qtokens, survivors) + + # Defensive: if collapse returned nothing despite real candidates, do + # NOT suppress everything — fall back to unchanged inputs. + if not keep: + return fabric, qdrant, sessions, facts + + # Known limitation (tri-brain Grok SHOULD-FIX, accepted as tradeoff): + # survivors are filtered again by the per-session _injected_* dedup sets + # during emission below. A survivor that's already been injected this + # session consumes a budget slot here and is then skipped at emission, + # so the net injected count can be < budget. We accept this rather than + # replicate the emission keying here (which would risk key drift); the + # overlap-gate + per-session dedup already bound re-injection in practice. + new_fabric = [e for i, e in enumerate(fabric) if ("fabric", i) in keep] + new_qdrant = [r for i, r in enumerate(qdrant) if ("qdrant", i) in keep] + new_sessions = [s for i, s in enumerate(sessions) if ("sessions", i) in keep] + new_facts = [f for i, f in enumerate(facts) if ("facts", i) in keep] + return new_fabric, new_qdrant, new_sessions, new_facts + except Exception: + # Fail-open: never let a collapse error block memory injection. Logged at + # WARNING so a silently-disabled collapse is detectable in production + # rather than only inferable from "did the right memories appear?". + logger.warning("icarus: recall collapse failed — injecting unchanged", + exc_info=True) + return fabric, qdrant, sessions, facts + + def pre_llm_call(session_id="", user_message="", is_first_turn=False, **kwargs): """Inject relevant memories when topic changes (fabric + Qdrant).""" global _last_query_tokens @@ -559,6 +733,16 @@ def pre_llm_call(session_id="", user_message="", is_first_turn=False, **kwargs): if not results and not qdrant_results and not session_results and not fact_results: return None + # ── Non-bijunctive collapse ── + # Unify all four sources into one salience-ranked pool, prune weak paths + # relative to the strongest, amplify the strong, and spend a single + # cross-source budget. Replaces the stock "emit every per-source quota" + # behavior. Fail-open: _apply_collapse returns inputs unchanged on error. + if _COLLAPSE_ON: + results, qdrant_results, session_results, fact_results = _apply_collapse( + user_message, results, qdrant_results, session_results, fact_results + ) + parts = [] # Fabric context (dedup against previously injected entry ids) diff --git a/scripts/collapse_eval.py b/scripts/collapse_eval.py new file mode 100644 index 0000000..831525f --- /dev/null +++ b/scripts/collapse_eval.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python3 +"""collapse_eval.py — stock vs non-bijunctive collapse, with numbers. + +Shows, on a sample multi-source candidate pool, what STOCK Memory OS would +inject (every per-source quota) versus what the Elyan Edition COLLAPSE injects +(one salience-ranked, Hebbian-amplified, deduplicated budget) — plus a rough +token estimate of the savings and a physical-entropy attestation over the +selected set. + +Run: python3 scripts/collapse_eval.py +No deps beyond icarus.collapse. Deterministic except the live attestation nonce. +""" +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from icarus import collapse as C # noqa: E402 + +# A realistic pool: the same query hits four sources. Several results restate +# the same fact (cross-source agreement) and several are weak/off-topic. +QUERY = "how does rustchain prevent VM farms from gaming rewards" +POOL = [ + # source, text, score(qdrant only) + ("fabric", "RIP-PoA hardware fingerprint: 6 checks (clock skew, cache, SIMD, thermal, jitter, anti-emulation) must all pass for RTC reward", None), + ("fabric", "Discussed minecraft RTC reward rates for diamonds and bosses", None), + ("qdrant", "Anti-emulation check flags QEMU/KVM; VMs earn ~1e-9 weight by design to stop VM farms", 0.71), + ("qdrant", "RustChain block time is 600s, epoch 144 blocks", 0.44), + ("qdrant", "Hardware fingerprint: clock-skew + cache-timing + anti-emulation gate rewards; VMs get near-zero weight", 0.66), + ("sessions", "Earlier we confirmed VM fingerprint detection assigns 1 billionth weight to QEMU guests — anti VM-farm by design", None), + ("sessions", "Talked about the Halo CE server on Windows", None), + ("facts", "VM farms are defeated by the anti-emulation fingerprint check: hypervisor detection -> 0.000000001x weight", None), + ("facts", "User prefers Python for bridge scripts", None), +] + + +def estimate_tokens(text: str) -> int: + # ~4 chars/token rough heuristic — good enough for a relative comparison. + return max(1, len(text) // 4) + + +def to_candidates(pool): + by_source = {} + cands = [] + for src, text, score in pool: + rank = by_source.get(src, 0) + by_source[src] = rank + 1 + cands.append({"key": (src, rank), "source": src, "text": text, + "score": score, "rank": rank}) + return cands + + +def main(): + cands = to_candidates(POOL) + qtokens = C.tokenize(QUERY) + + print(f"Query: {QUERY!r}\n") + print(f"STOCK (emit every source's quota): {len(cands)} memories") + stock_tokens = sum(estimate_tokens(c["text"]) for c in cands) + print(f" ~{stock_tokens} tokens injected\n") + + survivors = C.collapse(cands, qtokens) + print(f"COLLAPSE (one salience budget, Hebbian-amplified): {len(survivors)} memories") + for s in survivors: + print(f" sal={s['_salience']:.3f} corro={s['_corroboration']} " + f"[{s['source']}] {s['text'][:60]}") + collapse_tokens = sum(estimate_tokens(s["text"]) for s in survivors) + print(f" ~{collapse_tokens} tokens injected") + + if stock_tokens: + saved = 100 * (1 - collapse_tokens / stock_tokens) + print(f"\nToken reduction: {stock_tokens} -> {collapse_tokens} ({saved:.0f}% fewer)") + pruned = len(cands) - len(survivors) + print(f"Pruned {pruned} weak/off-topic/duplicate memories; " + f"kept the cross-source-corroborated signal.") + + att = C.attest(survivors) + print(f"\nAttestation (tamper-evident, proof-of-live):") + print(f" hash : {att['hash']}") + print(f" nonce : {att['nonce']} ({att['algo']}, {att['count']} survivors)") + print(f" verify: {C.verify_attestation(survivors, att)}") + + +if __name__ == "__main__": + main()