diff --git a/AGENTS.md b/AGENTS.md index 3441319..72a9308 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -5,9 +5,9 @@ Wearable AI navigation assistant for blind users. Raspberry Pi 4 + Intel RealSen camera on a chest harness. Detects obstacles, scores threats, speaks warnings through Bluetooth headphones using Piper neural TTS. On-demand scene description via Codex Vision API. -**Current production version:** v3.26 HEADLESS +**Current production version:** v3.28 HEADLESS **Production file:** `raspberry_pi/yolo_realsense_navigation.py` -**Test files:** `tests/test_blindnav.py` + `tests/test_blindnav_v326.py` (150 collected tests, no hardware required) +**Test files:** `tests/test_blindnav.py` + `tests/test_blindnav_v326.py` (172 collected tests, no hardware required) --- @@ -184,10 +184,10 @@ When incrementing to vX.XX, update ALL of these: --- -## Pending Work (as of v3.23) +## Pending Work (as of v3.28) - [ ] **Heatsink** — #1 hardware priority. Pi throttles to 6–8 FPS above ~65°C. -- [ ] Merge/push v3.26 to GitHub main +- [ ] Merge/push v3.28 to GitHub main - [ ] Field test with Ricardo Salazar (blind user, primary tester) — not yet scheduled - [ ] Record bag file scenarios for regression testing (5 scenarios: person approach, chair, close-range, white wall, person turning away) - [ ] Traffic light detection — crop YOLO's `traffic light` box, classify red/green pixels diff --git a/MASTER_UPDATED.md b/MASTER_UPDATED.md index 54f5ae6..f44de06 100644 --- a/MASTER_UPDATED.md +++ b/MASTER_UPDATED.md @@ -1,16 +1,17 @@ # BlindNav Master Status -Current production version: `v3.27 HEADLESS` +Current production version: `v3.28 HEADLESS` Status: - Production script: `raspberry_pi/yolo_realsense_navigation.py` -- Hardware-free validation: `168 passed` -- Verified on: April 20, 2026 +- Hardware-free validation: `172 passed` +- Verified on: April 21, 2026 Version history: | Version | Status | Notes | | --- | --- | --- | -| v3.27 | Current branch target | Piper default alerts, cached alert clips, filtered motion, wide-angle position hysteresis, clean shutdown | +| v3.28 | Current branch target | Bucketed speech, side-pass person alerts, bad-ego TTC clamp, richer voice diagnostics | +| v3.27 | Previous branch target | Piper default alerts, cached alert clips, filtered motion, wide-angle position hysteresis, clean shutdown | | v3.26b | Previous production baseline | BT-safe skip-ahead and neutral-wording fixes | | v3.25 | Earlier baseline | Ego confidence gating, latency logging, zone keys | diff --git a/README.md b/README.md index 91bb282..72484f3 100644 --- a/README.md +++ b/README.md @@ -5,12 +5,12 @@ RealSense D435. The system detects obstacles, estimates threat from distance and time-to-collision, and speaks warnings through Bluetooth headphones using Piper neural TTS. -Current production version: `v3.27 HEADLESS` +Current production version: `v3.28 HEADLESS` - Production script: `raspberry_pi/yolo_realsense_navigation.py` - Foundational regression suite: `tests/test_blindnav.py` - Advanced voice/latency regression suite: `tests/test_blindnav_v326.py` -- Verified locally on April 20, 2026: `168 passed` +- Verified locally on April 21, 2026: `172 passed` ## What It Does @@ -24,6 +24,20 @@ Current production version: `v3.27 HEADLESS` ## Recent Changes +### v3.28 + +- Bucketed spoken distances to the same 30 cm voice buckets already used by + cooldown keys so repeat warnings reuse the same Piper phrases instead of + synthesizing slightly different decimals. +- Added richer voice diagnostics to `events.log`, including queue wait, synth + time, launch wait, cache hit vs miss, and synthesis mode. +- Promoted nearby side-pass people on the left/right while the user is moving, + so a person walking by no longer depends entirely on radial TTC. +- Clamped bad-ego TTC usage to close range when the user is still, blocking + far nonsense alerts such as `person ahead, 6.4 meters`. +- Switched non-person urgent/warning phrasing to cached `obstacle` wording so + close-object warnings stay fast even when the classifier label changes. + ### v3.27 - Switched urgent/warning alerts to Piper by default while keeping `espeak` @@ -120,7 +134,7 @@ Current collected totals: ## Current Priorities - Add a heatsink before field sessions. -- Review and merge the v3.27 repo state, then field-test it with Ricardo Salazar. +- Review and merge the v3.28 repo state, then field-test it with Ricardo Salazar. - Record bag-file scenarios for regression playback. - Add traffic-light color classification after the base obstacle system is stable. diff --git a/SETUP.md b/SETUP.md index 6b31488..4b6c84c 100644 --- a/SETUP.md +++ b/SETUP.md @@ -122,7 +122,7 @@ These tests require no hardware: pytest tests/test_blindnav.py tests/test_blindnav_v326.py -v ``` -Expected current result: `168 passed` +Expected current result: `172 passed` ## Health Checks Before Field Use diff --git a/STATUS.md b/STATUS.md index 002f9d6..8a265cb 100644 --- a/STATUS.md +++ b/STATUS.md @@ -1,7 +1,7 @@ # Project Status -Last updated: April 20, 2026 -Current repo target: `v3.27 HEADLESS` +Last updated: April 21, 2026 +Current repo target: `v3.28 HEADLESS` ## Verified in Code and Tests @@ -14,6 +14,10 @@ Current repo target: `v3.27 HEADLESS` - safe urgent supersession before playback - Piper as the default urgent/warning alert voice - prewarmed cached alert clips for common short safety phrases + - bucketed spoken distances for cache reuse + - explicit queue/synth/cache latency diagnostics in `events.log` +- Side-pass people now get awareness/warning promotion while the user is moving. +- Bad-ego TTC is now trusted only at close range while the user is still. - Motion filtering now suppresses large one-frame depth jumps and small far-range drift before threat scoring/logging. - Position labeling now uses wide-angle-aware angle mapping with hysteresis. @@ -26,7 +30,7 @@ Current repo target: `v3.27 HEADLESS` - Hardware-free validation currently passes: - `tests/test_blindnav.py` - `tests/test_blindnav_v326.py` - - combined result: `168 passed` + - combined result: `172 passed` ## Confirmed Design Invariants @@ -73,7 +77,7 @@ Current repo target: `v3.27 HEADLESS` ## Pending Work - Heatsink: highest hardware priority -- Review and merge v3.27 to GitHub main +- Review and merge v3.28 to GitHub main - Schedule a field test with Ricardo Salazar - Record five bag-file regression scenarios - Add traffic-light color classification diff --git a/docs/NEWS_INTERVIEW_REVIEW.md b/docs/NEWS_INTERVIEW_REVIEW.md new file mode 100644 index 0000000..a5eff8c --- /dev/null +++ b/docs/NEWS_INTERVIEW_REVIEW.md @@ -0,0 +1,246 @@ +# BlindNav News Interview Review Sheet + +## One-Sentence Summary + +BlindNav is a wearable AI navigation assistant for blind users that combines +real-time object detection, depth sensing, motion awareness, and voice alerts +to help a user understand nearby obstacles while walking. + +## 20-Second Version + +I built a chest-mounted navigation system that runs on a Raspberry Pi 4 with an +Intel RealSense depth camera. It detects people and obstacles, estimates how +threatening they are based on distance and motion, and speaks short left/right/ +ahead warnings through headphones. It also supports on-demand AI scene +description. + +## 60-Second Version + +The project started from a simple assistive-tech question: can I build +something that helps a blind user understand the space directly in front of +them while moving? The current system uses a Raspberry Pi 4, an Intel RealSense +D435 depth camera, an IMU, YOLO object detection, depth-based distance +measurement, ego-motion compensation, and local Piper text-to-speech. The goal +is not to replace a cane or a guide dog. The goal is to provide extra spatial +awareness by warning about nearby people and obstacles in a way that is timely, +brief, and usable while walking. + +## What Problem It Solves + +- Standard obstacle detection alone is often too blunt: it can tell you that + something exists, but not whether it matters right now. +- Blind users need timing, direction, and confidence, not just labels. +- Indoor spaces are dynamic. People pass by, chairs move, tables stick out, + and walls or glass may not be obvious to a standard RGB-only system. +- A good assistive system must avoid constant chatter. Silence is as important + as speech. + +## What Makes This Different + +- It combines object detection and depth sensing, so it knows both what an + object is and roughly how far away it is. +- It estimates threat, not just presence. A close object matters more than a + far object. A moving person matters differently than a static chair. +- It tracks whether the user is moving, because navigation guidance should + change if the user is standing still versus walking. +- It is designed around short voice output, not a screen. +- It includes hardware-free regression tests, so behavior can be improved + between field runs without needing the camera every time. + +## What Changed Since The ESP32 Stage + +- The earlier ESP32 phase was much more limited and was closer to a sensor + experiment than a full navigation assistant. +- The current system moved to a Raspberry Pi 4 so it can run camera-based + computer vision, ONNX inference, depth processing, and local neural speech. +- The project shifted from "can I detect obstacles at all?" to "can I deliver + useful, low-chatter, real-time guidance while walking?" +- The current version includes: + - YOLO26n object detection + - Intel RealSense D435 depth sensing + - IMU-based movement awareness + - ego-motion compensation + - left/right/ahead voice guidance + - on-demand AI scene description + - a 172-test hardware-free regression suite + +## Current Hardware Stack + +- Raspberry Pi 4 +- Intel RealSense D435 +- ICM-20948 IMU +- Bluetooth headphones +- Chest harness + +## Current Software Stack + +- Python +- ONNX Runtime +- YOLO26n object detector +- Intel RealSense SDK +- Piper TTS +- Claude for on-demand scene description + +## Core Runtime Pipeline + +1. Capture synchronized RGB and depth frames from the RealSense camera. +2. Run YOLO object detection on the RGB frame. +3. Track objects across frames. +4. Sample depth inside each tracked box to estimate distance. +5. Estimate ego-motion so stationary objects do not look like they are moving + just because the user is walking. +6. Score threats based on distance, motion, and context. +7. Choose whether to speak, what to say, and how urgently to say it. +8. Play a short voice alert through headphones. + +## The Most Important Engineering Constraint + +The hardest problem is not object detection by itself. The hardest problem is +deciding when to speak and when not to speak. If the system talks too much, it +becomes distracting. If it talks too little, it misses something important. + +## Safety Philosophy + +- This is an assistive prototype, not a replacement for a cane, guide dog, or + formal Orientation and Mobility training. +- It should add awareness, not take control away from the user. +- It should be conservative about uncertainty. +- It should avoid pretending to know more than it really knows. +- It should fail quietly rather than sound overconfident when motion or depth + data becomes unreliable. + +## What v3.28 Added + +- Bucketed spoken distances: + warnings now snap to stable 30 cm voice buckets so repeated alerts reuse the + same phrases instead of synthesizing many slightly different decimals. +- Better voice diagnostics: + logs now separate queue wait, synthesis time, playback launch delay, and + cache hit versus miss. +- Better side-pass person handling: + people moving by on the left or right are promoted earlier, even when radial + time-to-collision is weak. +- Better bad-ego handling: + the system now suppresses far false motion-based alerts when ego-motion + confidence is poor. + +## Numbers To Remember + +- Current version: `v3.28 HEADLESS` +- Main production script: `raspberry_pi/yolo_realsense_navigation.py` +- Current hardware-free validation: `172 passed` +- Camera: Intel RealSense D435 +- Compute: Raspberry Pi 4 + +## What Is Validated Well + +- Object tracking logic +- Distance sampling logic +- Voice cooldown logic +- Priority queue behavior +- Voice TTL and skip-ahead behavior +- Threat scoring truth tables +- Left/right/ahead classification logic +- Motion filtering and bad-ego suppression logic + +## What Still Needs Real-World Testing + +- Crowded scenes +- Long walking sessions +- Bluetooth timing under field conditions +- Thermal performance on the Pi +- User preferences for how much voice output is helpful + +## Honest Limitations + +- The system can still slow down when the Pi gets hot. +- A spoken warning can still be late if a phrase is already playing, because + active audio playback is intentionally never hard-killed. +- Camera occlusion and depth noise still matter in real environments. +- It is still a prototype and should be described honestly as one. + +## Good Ways To Describe The Project + +- "A wearable AI navigation assistant prototype for blind users." +- "A system that combines depth sensing, computer vision, and voice alerts." +- "An attempt to make obstacle awareness more timely and less noisy." + +## Bad Ways To Describe The Project + +- "It solves blindness." +- "It replaces a white cane." +- "It is production-ready for unsupervised public deployment." +- "It always understands the environment correctly." + +## Likely Interview Questions + +### What inspired this project? + +I became interested in accessibility technology through my grandmother's vision +loss and started asking what kind of system could actually be helpful in real +physical spaces, not just in a lab demo. + +### Why not just use a phone app? + +A phone is useful for many things, but navigation assistance while walking +works better when the sensing is hands-free, chest-mounted, and always aimed at +the path ahead. + +### Why is depth sensing important? + +Because object labels alone are not enough. A chair two meters away matters +very differently than a chair thirty centimeters away. + +### Why does voice design matter so much? + +Because a blind user cannot afford to be flooded with useless audio. The system +has to be brief, timely, and quiet when nothing important is happening. + +### What was the hardest technical problem? + +Balancing false positives and false negatives in a way that still feels usable +in motion. That includes motion compensation, deciding when a person passing by +matters, and reducing voice delay. + +### What are you most proud of? + +That it is not just a one-off demo. I built a real regression suite around it, +so I can improve behavior systematically instead of guessing between field +tests. + +### Is it finished? + +No. It is a serious prototype with strong software validation, but it still +needs continued field testing, refinement, and safety-focused iteration. + +## Strong Soundbites + +- "The challenge is not just seeing objects. The challenge is deciding when a + spoken warning is actually useful." +- "For assistive technology, silence can be as important as speech." +- "A navigation system has to understand both the world and the user's motion + through it." +- "I wanted to move from a sensor demo to something that behaves more like a + real assistant." + +## If Asked About AI + +- The system uses AI in two different ways: + - local vision for real-time detection + - on-demand multimodal AI for richer scene description +- Real-time safety-relevant behavior should be short, deterministic, and + low-latency. +- Longer open-ended descriptions are useful, but they are not the same as + split-second walking guidance. + +## If Asked About Impact + +The goal is not to claim that one student project fixes a huge accessibility +problem. The goal is to contribute something real: a working prototype, a set +of engineering lessons, and open project documentation that can help future +assistive-tech work. + +## Best Closing Line + +This project is really about respecting the difference between detecting the +world and making that information usable in the moment for a blind traveler. diff --git a/raspberry_pi/yolo_realsense_navigation.py b/raspberry_pi/yolo_realsense_navigation.py index 547c4b4..8415608 100644 --- a/raspberry_pi/yolo_realsense_navigation.py +++ b/raspberry_pi/yolo_realsense_navigation.py @@ -1,7 +1,25 @@ #!/usr/bin/env python3 """ -Intelligent Navigation Assistant v3.27 HEADLESS -Builds on v3.26b with: +Intelligent Navigation Assistant v3.28 HEADLESS +Builds on v3.27 with: + + FIX 13 - Bucketed speech, broader cache reuse, and clearer voice diagnostics. + Spoken distances now snap to stable 30cm voice buckets so the same warning + phrases repeat instead of synthesizing slightly different decimals. Voice + latency logs now separate queue wait, synth time, launch delay, cache hit + vs miss, and whether the phrase came from prefetched Piper, cached Piper, + or live synthesis. + + FIX 14 - Side-pass people are promoted before they are right on top of you. + A person moving past on your left/right can have weak radial TTC even when + they are a real navigation hazard. This version boosts nearby side-person + scoring while the user is moving and adds a dedicated side-pass awareness/ + warning rule so those passes are spoken earlier. + + FIX 15 - Bad ego state no longer drives far or medium-range TTC chatter. + When ego-motion confidence is bad, motion-derived TTC is now trusted only at + close range while the user is still. This blocks nonsense alerts like + "person ahead, 6.4 meters" while preserving close distance overrides. FIX 10 - Piper alerts by default, with cached urgent/warning clips. Amy-medium remains the default Piper voice, urgent/warning phrases use @@ -138,12 +156,24 @@ ALERT_CACHE_PREWARM = os.environ.get( "BLINDNAV_ALERT_CACHE_PREWARM", "1" ).strip().lower() not in {"0", "false", "no"} -ALERT_CACHE_COMMON_DISTANCES_M = (0.6, 1.2, 1.8, 2.4) +ALERT_DISTANCE_BUCKET_CM = 30 +ALERT_SPOKEN_OFFSET_CM = 20 +ALERT_CACHE_MAX_BUCKET = 5 +ALERT_CACHE_POSITIONS = ("ahead", "on your left", "on your right") +ALERT_CACHE_COMMON_DISTANCES_M = tuple( + ((bucket * ALERT_DISTANCE_BUCKET_CM) + ALERT_SPOKEN_OFFSET_CM) / 100.0 + for bucket in range(ALERT_CACHE_MAX_BUCKET + 1) +) FAST_ALERT_SILENCE_MS = 220 ESPEAK_ALERT_SPEED = 175 ESPEAK_ALERT_GAP = 8 ESPEAK_ALERT_AMPLITUDE = 180 +SIDE_PASS_PERSON_AWARE_CM = 150 +SIDE_PASS_PERSON_WARN_CM = 110 +BAD_EGO_TTC_MAX_DISTANCE_CM = 120 +VOICE_POLICY_LOG_COOLDOWN_S = 2.0 + POSITION_CAMERA_HFOV_DEG = 69.0 POSITION_SIDE_ENTER_DEG = 13.1 POSITION_SIDE_EXIT_DEG = 10.0 @@ -216,56 +246,159 @@ def _voice_key(pos: str, class_name: str, tier: str, dbucket: int = 0) -> str: return f"{zone}_{family}_{tier}_{dbucket}" -def _select_voice_message(obj, pos, dist_m, dist_cm, vel, - user_moving, ego_reliable, - approaching, very_close, close, fast_approach, ttc): - """ - Pure function: given threat state -> return (tier, message) or (None, None). +def _distance_bucket(dist_cm): + if dist_cm is None or dist_cm < 0: + return 0 + return max(0, int(dist_cm // ALERT_DISTANCE_BUCKET_CM)) - tier: 'urgent' | 'warning' | 'awareness' | None - Extracted from main() so it can be unit-tested without hardware. - ego_reliable gates ALL directional ("approaching", "closer") wording. - When unreliable -> neutral "{obj} {pos}, {dist}m" at appropriate tier. - """ +def _bucket_distance_cm(dbucket): + bucket = max(0, int(dbucket)) + return bucket * ALERT_DISTANCE_BUCKET_CM + ALERT_SPOKEN_OFFSET_CM + + +def _spoken_distance_m(dist_cm): + return _bucket_distance_cm(_distance_bucket(dist_cm)) / 100.0 + + +def _spoken_object_name(obj, tier): + if obj == "person" or tier == "awareness": + return obj + return "obstacle" + + +def _select_voice_decision(obj, pos, dist_cm, vel, + user_moving, ego_reliable, + approaching, very_close, close, fast_approach, ttc): + spoken_dist_m = _spoken_distance_m(dist_cm) + side_pass_person = ( + obj == "person" + and pos != "ahead" + and user_moving + and dist_cm <= SIDE_PASS_PERSON_AWARE_CM + ) + if very_close: - # FIX 9: gate approaching wording on ego_reliable - if approaching and ego_reliable and not user_moving: - msg = f"{obj} {pos} approaching, {dist_m:.1f} meters" + spoken_obj = _spoken_object_name(obj, "urgent") + if spoken_obj == "person" and approaching and ego_reliable and not user_moving: + msg = f"{spoken_obj} {pos} approaching, {spoken_dist_m:.1f} meters" + reason = "very_close_approach" elif approaching and ego_reliable and user_moving: - msg = f"Stop! {obj} {pos}, {dist_m:.1f} meters" + msg = f"Stop! {spoken_obj} {pos}, {spoken_dist_m:.1f} meters" + reason = "very_close_stop" else: - msg = f"{obj} {pos}, {dist_m:.1f} meters" - return "urgent", msg + msg = f"{spoken_obj} {pos}, {spoken_dist_m:.1f} meters" + reason = "very_close_distance" + return { + "tier": "urgent", + "message": msg, + "reason": reason, + "spoken_distance_m": spoken_dist_m, + "spoken_object": spoken_obj, + } if close: - # FIX 9: gate approaching wording on ego_reliable - if approaching and ego_reliable and user_moving: - msg = f"Watch out, {obj} {pos}, {dist_m:.1f} meters" - elif approaching and ego_reliable and not user_moving: - msg = f"{obj} {pos} closer, {dist_m:.1f} meters" + spoken_obj = _spoken_object_name(obj, "warning") + if spoken_obj == "person" and approaching and ego_reliable and not user_moving: + msg = f"{spoken_obj} {pos} closer, {spoken_dist_m:.1f} meters" + reason = "close_person_closer" + elif approaching and ego_reliable and user_moving: + msg = f"Watch out, {spoken_obj} {pos}, {spoken_dist_m:.1f} meters" + reason = "close_watch" else: - msg = f"{obj} {pos}, {dist_m:.1f} meters" - return "warning", msg + msg = f"{spoken_obj} {pos}, {spoken_dist_m:.1f} meters" + reason = "close_distance" + return { + "tier": "warning", + "message": msg, + "reason": reason, + "spoken_distance_m": spoken_dist_m, + "spoken_object": spoken_obj, + } + + if side_pass_person: + if dist_cm <= SIDE_PASS_PERSON_WARN_CM: + return { + "tier": "warning", + "message": f"Watch out, person {pos}, {spoken_dist_m:.1f} meters", + "reason": "side_pass_warning", + "spoken_distance_m": spoken_dist_m, + "spoken_object": "person", + } + return { + "tier": "awareness", + "message": f"Heads up, person {pos}, {spoken_dist_m:.1f} meters", + "reason": "side_pass_awareness", + "spoken_distance_m": spoken_dist_m, + "spoken_object": "person", + } if ttc < 2: - if ego_reliable and fast_approach: - msg = f"Stop! {obj} {pos}, {dist_m:.1f} meters" - elif ego_reliable and approaching: - msg = f"{obj} {pos} approaching, {dist_m:.1f} meters" + spoken_obj = _spoken_object_name(obj, "urgent") + if spoken_obj == "person" and ego_reliable and fast_approach: + msg = f"Stop! {spoken_obj} {pos}, {spoken_dist_m:.1f} meters" + reason = "ttc_urgent_fast" + elif spoken_obj == "person" and ego_reliable and approaching: + msg = f"{spoken_obj} {pos} approaching, {spoken_dist_m:.1f} meters" + reason = "ttc_urgent_approach" else: - msg = f"{obj} {pos}, {dist_m:.1f} meters" - return "urgent", msg + msg = f"{spoken_obj} {pos}, {spoken_dist_m:.1f} meters" + reason = "ttc_urgent_neutral" + return { + "tier": "urgent", + "message": msg, + "reason": reason, + "spoken_distance_m": spoken_dist_m, + "spoken_object": spoken_obj, + } if ttc < 4: - msg = (f"Watch out, {obj} {pos}, {dist_m:.1f} meters" - if ego_reliable else f"{obj} {pos}, {dist_m:.1f} meters") - return "warning", msg + spoken_obj = _spoken_object_name(obj, "warning") + msg = (f"Watch out, {spoken_obj} {pos}, {spoken_dist_m:.1f} meters" + if ego_reliable else f"{spoken_obj} {pos}, {spoken_dist_m:.1f} meters") + return { + "tier": "warning", + "message": msg, + "reason": "ttc_warning", + "spoken_distance_m": spoken_dist_m, + "spoken_object": spoken_obj, + } if ttc < 8: - return "awareness", f"Heads up, {obj} {pos}, {dist_m:.1f} meters" + return { + "tier": "awareness", + "message": f"Heads up, {obj} {pos}, {spoken_dist_m:.1f} meters", + "reason": "ttc_awareness", + "spoken_distance_m": spoken_dist_m, + "spoken_object": obj, + } - return None, None + return { + "tier": None, + "message": None, + "reason": "no_actionable_distance_or_ttc", + "spoken_distance_m": spoken_dist_m, + "spoken_object": obj, + } + + +def _select_voice_message(obj, pos, dist_m, dist_cm, vel, + user_moving, ego_reliable, + approaching, very_close, close, fast_approach, ttc): + """ + Pure function: given threat state -> return (tier, message) or (None, None). + + tier: 'urgent' | 'warning' | 'awareness' | None + Extracted from main() so it can be unit-tested without hardware. + + ego_reliable gates ALL directional ("approaching", "closer") wording. + When unreliable -> neutral "{obj} {pos}, {dist}m" at appropriate tier. + """ + decision = _select_voice_decision( + obj, pos, dist_cm, vel, + user_moving, ego_reliable, + approaching, very_close, close, fast_approach, ttc) + return decision["tier"], decision["message"] # ============= PIPER VOICE ============= @@ -282,6 +415,11 @@ def __init__(self): self._voice_label = os.path.splitext(os.path.basename(PIPER_MODEL))[0] self._alert_cache_dir = ALERT_CACHE_DIR self._alert_cache_enabled = False + self._last_alert_synth_meta = { + "mode": "n/a", + "cache": "n/a", + "silence_ms": 0, + } try: from piper.voice import PiperVoice as _PiperVoice from piper.config import SynthesisConfig @@ -304,8 +442,11 @@ def __init__(self): if self._alert_cache_enabled: os.makedirs(self._alert_cache_dir, exist_ok=True) if ALERT_CACHE_PREWARM: - phrases = self._prime_alert_cache() - print(f"[VOICE] Piper alert cache primed ({phrases} clips)") + threading.Thread( + target=self._prime_alert_cache_async, + daemon=True, + name="piper-alert-cache", + ).start() print( f"[OK] Piper TTS ready ({self._sample_rate}Hz, " f"voice={self._voice_label}, speed={PIPER_SPEED})" @@ -343,6 +484,13 @@ def _alert_cache_key(self, text): ).hexdigest() return os.path.join(self._alert_cache_dir, f"{digest}.wav") + def _prime_alert_cache_async(self): + try: + phrases = self._prime_alert_cache() + print(f"[VOICE] Piper alert cache primed ({phrases} clips)") + except Exception as e: + print(f"[VOICE] Piper alert cache prime failed: {e}") + def _copy_to_temp(self, wav_path): with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: tmpfile = f.name @@ -352,11 +500,23 @@ def _copy_to_temp(self, wav_path): def _materialize_cached_alert(self, text): cache_path = self._alert_cache_key(text) if os.path.exists(cache_path): + self._last_alert_synth_meta = { + "mode": "piper_alert", + "cache": "hit", + "silence_ms": 0, + } return self._copy_to_temp(cache_path) + synth_start = time.time() tmpfile = self.synthesize_to_file(text, silence_ms=0) if tmpfile is None: return None + self._last_alert_synth_meta = { + "mode": "piper_alert", + "cache": "miss", + "silence_ms": 0, + "synth_s": time.time() - synth_start, + } try: if not os.path.exists(cache_path): shutil.copyfile(tmpfile, cache_path) @@ -368,9 +528,15 @@ def _prime_alert_cache(self): phrases = [] for dist_m in ALERT_CACHE_COMMON_DISTANCES_M: phrases.append(f"Obstacle, {dist_m:.1f} meters") - for pos in ("ahead", "on your left", "on your right"): + for pos in ALERT_CACHE_POSITIONS: + phrases.append(f"obstacle {pos}, {dist_m:.1f} meters") + phrases.append(f"Stop! obstacle {pos}, {dist_m:.1f} meters") + phrases.append(f"Watch out, obstacle {pos}, {dist_m:.1f} meters") + phrases.append(f"person {pos}, {dist_m:.1f} meters") + phrases.append(f"person {pos} approaching, {dist_m:.1f} meters") phrases.append(f"Stop! person {pos}, {dist_m:.1f} meters") phrases.append(f"Watch out, person {pos}, {dist_m:.1f} meters") + phrases.append(f"Heads up, person {pos}, {dist_m:.1f} meters") for phrase in phrases: tmpfile = self._materialize_cached_alert(phrase) if tmpfile: @@ -429,7 +595,9 @@ def synthesize_alert_to_file(self, text, silence_ms=0): if tmpfile is None: return None if silence_ms > 0: + self._last_alert_synth_meta["silence_ms"] = silence_ms return self.prepend_silence(tmpfile, silence_ms) + self._last_alert_synth_meta["silence_ms"] = 0 return tmpfile if self._espeak_cmd is None: @@ -466,7 +634,7 @@ def synthesize_alert_to_file(self, text, silence_ms=0): -# ============= VOICE ASSISTANT (v3.27) ============= +# ============= VOICE ASSISTANT (v3.28) ============= class VoiceAssistant: """ 3-slot priority queue with pre-synthesis, BT-safe skip-ahead (FIX 8), @@ -670,10 +838,15 @@ def _speak_worker(self, *args): def _speak_thread(self, text, prefetched_wav=None, label="", priority=None, event_created_ts=None, enqueued_ts=None): """Play audio. Logs per-alert latency timestamps.""" - tts_start_ts = None + tts_start_ts = None + synth_done_ts = None play_start_ts = None - play_end_ts = None + play_end_ts = None skipped_before_play = False + voice_mode = "unknown" + cache_state = "n/a" + silence_ms_used = 0 + need_silence = False try: if self._tts._use_piper: with self._lock: @@ -684,26 +857,35 @@ def _speak_thread(self, text, prefetched_wav=None, label="", tts_start_ts = time.time() if prefetched_wav: + voice_mode = "piper_prefetch" + cache_state = "prefetch" if need_silence: - tmpfile = self._tts.prepend_silence(prefetched_wav, PIPER_SILENCE_MS) + silence_ms_used = PIPER_SILENCE_MS + tmpfile = self._tts.prepend_silence(prefetched_wav, silence_ms_used) else: tmpfile = prefetched_wav + synth_done_ts = time.time() else: - silence_ms = self._alert_silence_ms(need_silence) if use_alert_synth else ( + silence_ms_used = self._alert_silence_ms(need_silence) if use_alert_synth else ( PIPER_SILENCE_MS if need_silence else 0 ) if use_alert_synth: - tmpfile = self._tts.synthesize_alert_to_file(text, silence_ms=silence_ms) + tmpfile = self._tts.synthesize_alert_to_file( + text, silence_ms=silence_ms_used + ) + meta = getattr(self._tts, "_last_alert_synth_meta", {}) + voice_mode = meta.get("mode", "piper_alert") + cache_state = meta.get("cache", "unknown") else: - # FIX: use semaphore for direct synthesis too — prevents - # speak_thread and a stale presyn_worker from both calling - # synthesize_to_file at the same time (blocking acquire is - # fine here since _speak_thread runs on its own daemon thread) + voice_mode = "piper_live" self._presyn_sem.acquire() try: - tmpfile = self._tts.synthesize_to_file(text, silence_ms=silence_ms) + tmpfile = self._tts.synthesize_to_file( + text, silence_ms=silence_ms_used + ) finally: self._presyn_sem.release() + synth_done_ts = time.time() if tmpfile: with self._lock: @@ -713,8 +895,10 @@ def _speak_thread(self, text, prefetched_wav=None, label="", for higher in range(priority) ) if skipped_before_play: - try: os.unlink(tmpfile) - except Exception: pass + try: + os.unlink(tmpfile) + except Exception: + pass else: play_start_ts = time.time() if self._player_fn is not None: @@ -727,13 +911,17 @@ def _speak_thread(self, text, prefetched_wav=None, label="", proc.wait() play_end_ts = time.time() with self._lock: - self._current_proc = None + self._current_proc = None self._last_speech_end = play_end_ts - try: os.unlink(tmpfile) - except Exception: pass + try: + os.unlink(tmpfile) + except Exception: + pass else: - tts_start_ts = time.time() + tts_start_ts = time.time() + synth_done_ts = tts_start_ts play_start_ts = time.time() + voice_mode = "espeak_live" if self._player_fn is not None: proc = self._player_fn(text) else: @@ -745,23 +933,38 @@ def _speak_thread(self, text, prefetched_wav=None, label="", proc.wait() play_end_ts = time.time() with self._lock: - self._current_proc = None + self._current_proc = None self._last_speech_end = play_end_ts except Exception as e: print(f"[VOICE] Thread error: {e}") finally: - # FIX 6: Log per-alert latency timestamps to events.log if self._event_logger and event_created_ts is not None: t0 = event_created_ts + queue_wait = max(0.0, (tts_start_ts or t0) - (enqueued_ts or t0)) + synth_time = max( + 0.0, + (synth_done_ts or tts_start_ts or t0) - (tts_start_ts or t0) + ) + launch_wait = max( + 0.0, + (play_start_ts or synth_done_ts or t0) - (synth_done_ts or tts_start_ts or t0) + ) lat_line = ( f"[LATENCY] label={label} text=\"{text[:40]}\"" f" event_created={t0:.3f}" f" enqueued={enqueued_ts - t0:.3f}s" - f" tts_start={((tts_start_ts or t0) - t0):.3f}s" + f" tts_start={((tts_start_ts or t0) - t0):.3f}s" f" play_start={((play_start_ts or t0) - t0):.3f}s" - f" play_end={((play_end_ts or t0) - t0):.3f}s" + f" play_end={((play_end_ts or t0) - t0):.3f}s" + f" queue_wait={queue_wait:.3f}s" + f" synth={synth_time:.3f}s" + f" launch_wait={launch_wait:.3f}s" + f" mode={voice_mode}" + f" cache={cache_state}" + f" cold_stream={'yes' if need_silence else 'no'}" + f" silence_ms={silence_ms_used}" ) self._event_logger(lat_line) if skipped_before_play: @@ -1211,6 +1414,7 @@ def calculate_threat_score(track, user_moving=False, ego_reliable=True): motion = evaluate_track_motion(track, user_moving=user_moving, ego_reliable=ego_reliable) v = motion["effective_velocity"] + pos = get_position(track) score = 0.0 if d < CRITICAL_DISTANCE: @@ -1226,6 +1430,12 @@ def calculate_threat_score(track, user_moving=False, ego_reliable=True): elif v > 5: score *= 0.2 + if (track.class_name == "person" + and user_moving + and pos != "ahead" + and d <= SIDE_PASS_PERSON_AWARE_CM): + score += 18.0 if d <= SIDE_PASS_PERSON_WARN_CM else 8.0 + score *= OBJECT_THREAT_WEIGHTS.get(track.class_name, 1.0) if track.score > 0.7: score *= 1.1 elif track.score < 0.3: score *= 0.9 @@ -1250,7 +1460,7 @@ def prioritize_threats(tracks, user_moving=False, ego_reliable=True): # ============= HELPER FUNCTIONS ============= def get_position(track, frame_width=640): - if track.box is None: return "ahead" + if getattr(track, "box", None) is None: return "ahead" angle_deg = _position_angle_deg(track.box, frame_width) prev_zone = getattr(track, "_position_zone", None) zone = _classify_position_zone(angle_deg, prev_zone) @@ -1333,7 +1543,12 @@ def evaluate_track_motion(track, user_moving=False, ego_reliable=True): "fast_approach": False, } - velocity_reliable = track.velocity_valid and (ego_reliable or not user_moving) + close_range_bad_ego = ( + track.distance <= BAD_EGO_TTC_MAX_DISTANCE_CM + and not user_moving + and not ego_reliable + ) + velocity_reliable = track.velocity_valid and (ego_reliable or close_range_bad_ego) effective_velocity = raw_velocity if velocity_reliable else 0.0 noise_floor = 3.0 + track.distance * 0.02 ttc = 999.0 @@ -1539,9 +1754,9 @@ def main(): global VOICE_ENABLED print("=" * 70) - print("INTELLIGENT NAVIGATION ASSISTANT v3.27 HEADLESS") + print("INTELLIGENT NAVIGATION ASSISTANT v3.28 HEADLESS") print("RealSense D435 + YOLO26n + IMU + Piper TTS + Claude Vision") - print("FIXES: piper alerts, cached clips, ego guard, filtered motion, wide-angle zones, clean shutdown") + print("FIXES: bucketed speech, side-pass people, bad-ego TTC clamp, richer voice logs") print("=" * 70) csv_file = open(CSV_FILE, "w", newline="", encoding="utf-8") @@ -1650,12 +1865,31 @@ def _capture_worker(): last_paused_print = time.time() last_wall_alert = 0.0 last_busy_alert = 0.0 + last_policy_log = {} detect_ms, depth_ms = [], [] fps_start = time.time() fps_frames = 0 avg_fps = 0.0 last_fps = 0.0 + def log_voice_policy(now, track, pos, score, motion_eval, reason, tier="NONE"): + dist_val = track.distance if track.distance is not None else -1 + key = f"{track.id}:{reason}:{tier}:{_distance_bucket(dist_val)}" + if now - last_policy_log.get(key, 0.0) < VOICE_POLICY_LOG_COOLDOWN_S: + return + last_policy_log[key] = now + ttc = motion_eval["ttc"] + ttc_text = f"{ttc:.1f}s" if ttc < 999 else "n/a" + line = ( + f"[VOICE_POLICY] {track.class_name}#{track.id}(f{track.seen_frames}) {pos}: " + f"dist={dist_val}cm raw_vel={motion_eval['raw_velocity']:.1f}cm/s " + f"eff_vel={motion_eval['effective_velocity']:.1f}cm/s ttc={ttc_text} " + f"score={score:.1f} tier={tier} reason={reason} " + f"user={'moving' if user_moving else 'still'} ego={'OK' if ego_reliable else 'BAD'}" + ) + print(line) + log_event(line) + try: while True: try: @@ -1730,7 +1964,7 @@ def _capture_worker(): and t.seen_frames >= 3 for t in tracks) if wall_dist_cm < WALL_DISTANCE_CM and not yolo_covers: voice.speak_urgent( - f"Obstacle, {wall_dist_cm/100:.1f} meters", + f"Obstacle, {_spoken_distance_m(wall_dist_cm):.1f} meters", key="wall_fallback") last_wall_alert = now print(f"[WALL] Depth fallback: {wall_dist_cm}cm") @@ -1759,12 +1993,20 @@ def _capture_worker(): if now - last_paused_print > 3.0: print(f"[PAUSED] Static {obj} {dist_m:.1f}m — suppressed") last_paused_print = now + log_voice_policy( + now, track, pos, score, motion_eval, + reason="static_stationary_suppression", + tier="NONE", + ) else: dbucket = int(dist_cm // 30) - tier, msg = _select_voice_message( - obj, pos, dist_m, dist_cm, vel, + decision = _select_voice_decision( + obj, pos, dist_cm, vel, user_moving, ego_reliable, approaching, very_close, close, fast_approach, ttc) + tier = decision["tier"] + msg = decision["message"] + reason = decision["reason"] if tier == "urgent": voice.speak_urgent( @@ -1779,6 +2021,20 @@ def _capture_worker(): elif tier == "awareness": voice.speak_awareness( msg, key=_voice_key(pos, obj, "aware", 0)) + else: + if obj == "person" or score >= 10 or dist_cm <= SIDE_PASS_PERSON_AWARE_CM: + log_voice_policy( + now, track, pos, score, motion_eval, + reason=reason, + tier="NONE", + ) + + if reason.startswith("side_pass"): + log_voice_policy( + now, track, pos, score, motion_eval, + reason=reason, + tier=(tier or "NONE").upper(), + ) if ttc_allowed and ttc < 100: tier = ("URGENT" if ttc<2 else "WARN" if ttc<4 diff --git a/tests/test_blindnav_v326.py b/tests/test_blindnav_v326.py index 2482fb7..85a5755 100644 --- a/tests/test_blindnav_v326.py +++ b/tests/test_blindnav_v326.py @@ -299,6 +299,23 @@ def test_approaching_wording_when_ego_reliable(self): _, msg = self._call(30, -20, user_moving=False, ego_reliable=True) assert "approaching" in msg, f"Expected 'approaching' in msg, got: {msg!r}" + def test_side_pass_person_gets_awareness_when_ttc_is_weak(self): + tier, msg = self._call(129, 0, user_moving=True, ego_reliable=False, + obj="person", pos="on your right") + assert tier == "awareness" + assert "person on your right" in msg + assert "1.4 meters" in msg + + def test_bad_ego_far_stationary_person_stays_silent(self): + tier, msg = self._call(639, -90, user_moving=False, ego_reliable=False, + ttc=999.0, obj="person", pos="ahead") + assert tier is None and msg is None + + def test_non_person_warning_uses_obstacle_wording(self): + _, msg = self._call(60, -30, user_moving=True, ego_reliable=True, + obj="chair", pos="on your right") + assert "obstacle on your right" in msg + def test_msg_is_string_when_tier_not_none(self): """All non-None tiers must produce a non-empty string message.""" for dist, vel, moving, ego, exp_tier in self.ROUTING_CASES: @@ -405,6 +422,15 @@ def test_far_velocity_suppressed_when_user_moving_and_ego_bad(self): f"bad ego should suppress far false CRITICALs, got {score:.1f}/{level}" ) + def test_far_velocity_suppressed_when_user_still_and_ego_bad(self): + track = self._make_track(639, -89.6, "person", 0.5) + score = MOD.ThreatAssessment.calculate_threat_score( + track, user_moving=False, ego_reliable=False) + level = MOD.ThreatAssessment.get_threat_level(score) + assert level == "SAFE", ( + f"bad ego should suppress far still-user false alerts, got {score:.1f}/{level}" + ) + def test_close_distance_stays_critical_when_user_moving_and_ego_bad(self): track = self._make_track(27, -29.1, "person", 0.5) score = MOD.ThreatAssessment.calculate_threat_score( @@ -648,10 +674,12 @@ def fake_materialize(text): piper._materialize_cached_alert = fake_materialize count = MOD.PiperVoice._prime_alert_cache(piper) - assert count == 28 - assert "Stop! person ahead, 1.2 meters" in seen - assert "Watch out, person on your left, 2.4 meters" in seen - assert "Obstacle, 0.6 meters" in seen + assert count == 150 + assert "Stop! person ahead, 1.1 meters" in seen + assert "Watch out, person on your left, 1.7 meters" in seen + assert "Obstacle, 0.5 meters" in seen + assert "obstacle on your right, 1.7 meters" in seen + assert "Heads up, person on your left, 1.4 meters" in seen