-
Notifications
You must be signed in to change notification settings - Fork 0
fix(phone-quickdrop): reliable WebSocket pipeline (heartbeat, resume, race fix, QR) #6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -276,6 +276,8 @@ <h2 id="errTitle">Couldn't start camera</h2> | |
| let recorder = null; | ||
| let recording = false; | ||
| let sessionId = null; | ||
| let sessionMime = null; | ||
| let sessionExt = null; | ||
| let bytesSent = 0; | ||
| let chunkCount = 0; | ||
| let startedAt = 0; | ||
|
|
@@ -284,6 +286,9 @@ <h2 id="errTitle">Couldn't start camera</h2> | |
| let micOn = true; | ||
| let wakeLock = null; | ||
| let pendingChunks = []; // queued while WS not ready (rare) | ||
| // Outstanding ev.data.arrayBuffer() promises. Must drain before we send | ||
| // "end", otherwise the final chunk races the end-marker and gets dropped. | ||
| const pendingSends = new Set(); | ||
|
|
||
| // ---------- mime negotiation ---------- | ||
| function pickMimeType() { | ||
|
|
@@ -331,7 +336,21 @@ <h2 id="errTitle">Couldn't start camera</h2> | |
| ws.addEventListener("open", () => { | ||
| wsReady = true; | ||
| setConn("connected", "ready"); | ||
| // flush anything queued (very rare path) | ||
| // If we died mid-recording, ask the server to resume this sessionId | ||
| // before we flush queued chunks. WebSocket preserves order within a | ||
| // single connection, so chunks sent next will land after the resume. | ||
| if (recording && sessionId) { | ||
| try { | ||
| ws.send(JSON.stringify({ | ||
| type: "start", | ||
| sessionId, | ||
| mime: sessionMime || "application/octet-stream", | ||
| ext: sessionExt || "bin", | ||
| resume: true, | ||
| ts: startedAt, | ||
| })); | ||
| } catch { /* will close and reconnect */ } | ||
| } | ||
| for (const buf of pendingChunks) ws.send(buf); | ||
| pendingChunks = []; | ||
| }); | ||
|
|
@@ -461,35 +480,48 @@ <h2 id="errTitle">Couldn't start camera</h2> | |
|
|
||
| sessionId = (crypto.randomUUID && crypto.randomUUID()) || | ||
| ("s" + Math.random().toString(36).slice(2) + Date.now().toString(36)); | ||
| sessionMime = recorder.mimeType || mime || "application/octet-stream"; | ||
| sessionExt = ext; | ||
| bytesSent = 0; | ||
| chunkCount = 0; | ||
| startedAt = Date.now(); | ||
| pendingSends.clear(); | ||
|
|
||
| // Tell server a recording is starting | ||
| wsSend(JSON.stringify({ | ||
| type: "start", | ||
| sessionId, | ||
| mime: recorder.mimeType || mime || "application/octet-stream", | ||
| ext, | ||
| mime: sessionMime, | ||
| ext: sessionExt, | ||
| ts: startedAt, | ||
| })); | ||
|
|
||
| recorder.ondataavailable = async (ev) => { | ||
| recorder.ondataavailable = (ev) => { | ||
| if (!ev.data || ev.data.size === 0) return; | ||
| chunkCount += 1; | ||
| bytesSent += ev.data.size; | ||
| const buf = await ev.data.arrayBuffer(); | ||
| wsSend(buf); | ||
| updateSizePill(); | ||
| // Schedule the arrayBuffer conversion and remember the promise so | ||
| // onstop can drain it before sending "end". | ||
| const p = ev.data.arrayBuffer() | ||
| .then((buf) => { wsSend(buf); updateSizePill(); }) | ||
| .catch(() => { /* ignore; recorder error handler reports */ }); | ||
| pendingSends.add(p); | ||
| p.finally(() => pendingSends.delete(p)); | ||
| }; | ||
| recorder.onstop = () => { | ||
| recorder.onstop = async () => { | ||
| // Drain any in-flight chunk conversions so the last chunk beats "end". | ||
| if (pendingSends.size) { | ||
| try { await Promise.allSettled([...pendingSends]); } catch {} | ||
| } | ||
| wsSend(JSON.stringify({ | ||
| type: "end", | ||
| sessionId, | ||
| totalBytes: bytesSent, | ||
| chunks: chunkCount, | ||
| })); | ||
| sessionId = null; | ||
| sessionMime = null; | ||
| sessionExt = null; | ||
|
Comment on lines
+511
to
+524
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't drop the session metadata until the server confirms save/abort. This clears 🤖 Prompt for AI Agents |
||
| recorder = null; | ||
| }; | ||
| recorder.onerror = (ev) => { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,199 @@ | ||
| #!/usr/bin/env node | ||
| /** | ||
| * Reconnect smoke test. | ||
| * | ||
| * Simulates a phone whose WebSocket dies in the middle of a recording: | ||
| * 1. Open WS, send "start" with sessionId S | ||
| * 2. Send N/2 chunks | ||
| * 3. Abruptly close the socket (not a graceful "end") | ||
| * 4. Open a new WS, send "start" with same sessionId and resume:true | ||
| * 5. Expect ack { type:"started", resumed:true, bytes: <what server has} } | ||
| * 6. Send the remaining N/2 chunks, then "end" | ||
| * 7. Verify disk file is exactly the concatenation of all chunks, in order | ||
| * | ||
| * Usage: | ||
| * node scripts/smoke-reconnect.js # defaults to localhost:8443 | ||
| * PORT=8444 node scripts/smoke-reconnect.js # worktree port | ||
| */ | ||
|
|
||
| const fs = require("fs"); | ||
| const path = require("path"); | ||
| const os = require("os"); | ||
| const crypto = require("crypto"); | ||
| const WebSocket = require("ws"); | ||
|
|
||
| const PORT = parseInt(process.env.PORT || "8443", 10); | ||
| const HOST = process.env.HOST || "localhost"; | ||
| const URL = `wss://${HOST}:${PORT}/ws`; | ||
| const OUTPUT_DIR = | ||
| process.env.QUICKDROP_OUT || | ||
| path.join(os.homedir(), "Desktop", "PhoneCaptures"); | ||
|
|
||
| const CHUNK_BYTES = 128 * 1024; | ||
| const CHUNK_COUNT = 8; // 4 before disconnect, 4 after | ||
| const TOTAL = CHUNK_BYTES * CHUNK_COUNT; | ||
|
|
||
| function sleep(ms) { return new Promise((r) => setTimeout(r, ms)); } | ||
|
|
||
| function connect() { | ||
| return new Promise((resolve, reject) => { | ||
| const ws = new WebSocket(URL, { rejectUnauthorized: false }); | ||
| ws.once("open", () => resolve(ws)); | ||
| ws.once("error", reject); | ||
| }); | ||
| } | ||
|
|
||
| function collectMessages(ws) { | ||
| const messages = []; | ||
| ws.on("message", (data, isBinary) => { | ||
| if (isBinary) return; | ||
| try { messages.push(JSON.parse(data.toString())); } catch {} | ||
| }); | ||
| return messages; | ||
| } | ||
|
|
||
| async function waitFor(messages, type, timeoutMs = 2000) { | ||
| const deadline = Date.now() + timeoutMs; | ||
| while (Date.now() < deadline) { | ||
| const m = messages.find((x) => x.type === type); | ||
| if (m) return m; | ||
| await sleep(10); | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| async function main() { | ||
| console.log(`smoke-reconnect: connecting to ${URL}`); | ||
|
|
||
| // Pre-generate all chunks so we know exactly what bytes should land on disk. | ||
| const chunks = []; | ||
| for (let i = 0; i < CHUNK_COUNT; i++) { | ||
| chunks.push(crypto.randomBytes(CHUNK_BYTES)); | ||
| } | ||
| const expectedBuffer = Buffer.concat(chunks); | ||
|
|
||
| const sessionId = "reconnect-" + crypto.randomBytes(3).toString("hex"); | ||
| const label = "reconnect-test"; | ||
|
|
||
| // ---- phase 1: open, start, send half, kill ---- | ||
| let ws = await connect(); | ||
| let msgs = collectMessages(ws); | ||
|
|
||
| ws.send(JSON.stringify({ | ||
| type: "start", | ||
| sessionId, | ||
| mime: "video/mp4", | ||
| ext: "mp4", | ||
| label, | ||
| ts: Date.now(), | ||
| })); | ||
|
|
||
| const started1 = await waitFor(msgs, "started", 2000); | ||
| if (!started1 || started1.resumed) { | ||
| console.error(`smoke-reconnect: FAIL — expected fresh 'started' ack, got ${JSON.stringify(started1)}`); | ||
| process.exit(2); | ||
| } | ||
| const fileName = started1.fileName; | ||
| console.log(`smoke-reconnect: started ${sessionId} → ${fileName}`); | ||
|
|
||
| for (let i = 0; i < CHUNK_COUNT / 2; i++) { | ||
| ws.send(chunks[i]); | ||
| await sleep(15); | ||
| } | ||
|
|
||
| // Give the server a beat to flush the binary frames to disk before we cut | ||
| // the socket — otherwise we're testing TCP close semantics, not resume. | ||
| await sleep(80); | ||
|
|
||
| // Abrupt close — terminate, not close() — so server sees an unclean drop. | ||
| ws.terminate(); | ||
| console.log(`smoke-reconnect: killed WS after ${CHUNK_COUNT / 2} chunks`); | ||
|
|
||
| // Wait briefly for server to register the close and orphan the session. | ||
| await sleep(200); | ||
|
|
||
| // ---- phase 2: reconnect with resume ---- | ||
| ws = await connect(); | ||
| msgs = collectMessages(ws); | ||
|
|
||
| ws.send(JSON.stringify({ | ||
| type: "start", | ||
| sessionId, | ||
| mime: "video/mp4", | ||
| ext: "mp4", | ||
| resume: true, | ||
| ts: Date.now(), | ||
| })); | ||
|
Comment on lines
+112
to
+126
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reconnect sleep makes this smoke test flaky. A fixed 200ms delay does not guarantee the server has already run its 🤖 Prompt for AI Agents |
||
|
|
||
| const started2 = await waitFor(msgs, "started", 2000); | ||
| if (!started2) { | ||
| console.error("smoke-reconnect: FAIL — no 'started' ack after resume"); | ||
| process.exit(3); | ||
| } | ||
| if (!started2.resumed) { | ||
| console.error(`smoke-reconnect: FAIL — expected resumed:true, got ${JSON.stringify(started2)}`); | ||
| process.exit(4); | ||
| } | ||
| const halfBytes = (CHUNK_COUNT / 2) * CHUNK_BYTES; | ||
| if (typeof started2.bytes !== "number" || started2.bytes < 1 || started2.bytes > halfBytes) { | ||
| console.error( | ||
| `smoke-reconnect: FAIL — resume bytes unreasonable: got ${started2.bytes}, expected 1..${halfBytes}` | ||
| ); | ||
| process.exit(5); | ||
| } | ||
| console.log( | ||
| `smoke-reconnect: resumed ${sessionId} (server already has ${started2.bytes} bytes)` | ||
| ); | ||
|
|
||
| // ---- phase 3: send the rest ---- | ||
| for (let i = CHUNK_COUNT / 2; i < CHUNK_COUNT; i++) { | ||
| ws.send(chunks[i]); | ||
| await sleep(15); | ||
| } | ||
| ws.send(JSON.stringify({ | ||
| type: "end", | ||
| sessionId, | ||
| totalBytes: TOTAL, | ||
| })); | ||
|
|
||
| const saved = await waitFor(msgs, "saved", 3000); | ||
| ws.close(); | ||
| if (!saved) { | ||
| console.error("smoke-reconnect: FAIL — no 'saved' ack"); | ||
| process.exit(6); | ||
| } | ||
|
|
||
| // ---- phase 4: verify on-disk bytes ---- | ||
| const filePath = path.join(OUTPUT_DIR, fileName); | ||
| if (!fs.existsSync(filePath)) { | ||
| console.error(`smoke-reconnect: FAIL — file missing: ${filePath}`); | ||
| process.exit(7); | ||
| } | ||
| const onDisk = fs.readFileSync(filePath); | ||
| if (onDisk.length !== TOTAL) { | ||
| console.error( | ||
| `smoke-reconnect: FAIL — size mismatch: disk=${onDisk.length} expected=${TOTAL}` | ||
| ); | ||
| process.exit(8); | ||
| } | ||
| if (!onDisk.equals(expectedBuffer)) { | ||
| // Find first divergence for debug | ||
| let i = 0; | ||
| while (i < onDisk.length && onDisk[i] === expectedBuffer[i]) i++; | ||
| console.error( | ||
| `smoke-reconnect: FAIL — content mismatch (first diff at byte ${i})` | ||
| ); | ||
| process.exit(9); | ||
| } | ||
|
|
||
| try { fs.unlinkSync(filePath); } catch {} | ||
|
|
||
| console.log( | ||
| `smoke-reconnect: OK — ${TOTAL.toLocaleString()} bytes landed correctly across a mid-session disconnect.` | ||
| ); | ||
| } | ||
|
|
||
| main().catch((err) => { | ||
| console.error("smoke-reconnect: error", err); | ||
| process.exit(1); | ||
| }); | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Track an unfinished upload, not
recording, before sendingresume.recording && sessionIdis the wrong gate here. On a cold connect it becomes true before the original queued"start"has reached the server, so this emits a second start and the queued one aborts/recreates the session inphone-quickdrop/server.js, Lines 401-413. After an offline stop it becomes false even though queued chunks and"end"still need the orphaned session to be reattached. Please drive resume off “session started but not yet saved/aborted” state instead.