Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions web/src/lib/message-window-store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
getMessageWindowState,
ingestIncomingMessages,
markMessagesConsumed,
reconcileQueuedAgainstLatest,
removeOptimisticMessage,
setAtBottom,
VISIBLE_WINDOW_SIZE,
Expand Down Expand Up @@ -560,4 +561,123 @@ describe('message-window-store visible trimming', () => {
expect(state.messages.some((message) => message.id === 'main-user-before-agent-flood')).toBe(true)
expect(state.messages.filter((message) => message.id.startsWith('agent-run-latest-'))).toHaveLength(50)
})

it('drops a stale queued ghost on at-bottom refresh when the server no longer reports it as queued', async () => {
const baseTime = 1_700_000_200_000
// A queued row persisted from a prior session: server-echoed (id != localId),
// immediate (no scheduledAt), still invokedAt === null locally. The CLI
// consumed it while the client was offline, so messages-consumed was missed.
const ghost: DecryptedMessage = {
id: 'ghost-server-id',
seq: 1,
localId: 'ghost-local-id',
content: { role: 'user', content: { type: 'text', text: 'Ingest 范围' } },
createdAt: baseTime,
invokedAt: null,
status: undefined,
} as DecryptedMessage
ingestIncomingMessages(SESSION_ID, [ghost])
// sanity: the ghost is present and queued before the refresh
expect(getMessageWindowState(SESSION_ID).messages.some((m) => m.id === 'ghost-server-id')).toBe(true)

const api = {
getMessages: async (_sessionId: string, options: { limit?: number } = {}) => ({
// Latest window does NOT include the ghost (it was invoked long ago,
// out of the newest window). Only a fresh agent message comes back.
messages: [makeAgentMessage({ id: 'fresh-agent', seq: 99, createdAt: baseTime + 100_000 })],
page: { limit: options.limit ?? 50, nextBeforeSeq: null, nextBeforeAt: null, hasMore: false },
}),
} as Pick<ApiClient, 'getMessages'>

await fetchLatestMessages(api as ApiClient, SESSION_ID)

const state = getMessageWindowState(SESSION_ID)
expect(state.messages.some((m) => m.id === 'ghost-server-id')).toBe(false)
expect(state.pending.some((m) => m.id === 'ghost-server-id')).toBe(false)
})

it('reconcileQueuedAgainstLatest keeps genuine queued, optimistic, and scheduled rows', () => {
const base = 1_700_000_200_000
const queuedInWindow: DecryptedMessage = {
id: 'queued-server-id', seq: 5, localId: 'queued-local',
content: { role: 'user', content: { type: 'text', text: 'still queued' } },
createdAt: base, invokedAt: null, status: undefined,
} as DecryptedMessage
const optimistic: DecryptedMessage = {
id: 'opt-local', seq: null, localId: 'opt-local',
content: { role: 'user', content: { type: 'text', text: 'echo in flight' } },
createdAt: base, invokedAt: null, status: 'queued',
} as DecryptedMessage
const scheduled: DecryptedMessage = {
id: 'sched-server-id', seq: 6, localId: 'sched-local',
content: { role: 'user', content: { type: 'text', text: 'future' } },
createdAt: base, invokedAt: null, scheduledAt: base + 3_600_000, status: undefined,
} as DecryptedMessage
const ghost: DecryptedMessage = {
id: 'ghost-server-id', seq: 1, localId: 'ghost-local',
content: { role: 'user', content: { type: 'text', text: 'ghost' } },
createdAt: base, invokedAt: null, status: undefined,
} as DecryptedMessage

// A queued row that appeared after the fetch was issued (not in eligibleIds):
// must survive even though the older server snapshot can't include it.
const freshArrival: DecryptedMessage = {
id: 'fresh-server-id', seq: 7, localId: 'fresh-local',
content: { role: 'user', content: { type: 'text', text: 'arrived mid-fetch' } },
createdAt: base, invokedAt: null, status: undefined,
} as DecryptedMessage

// eligibleIds = the immediate queued rows present when the fetch started.
// Server's latest window only confirms the genuinely-queued row.
const reconciled = reconcileQueuedAgainstLatest(
[queuedInWindow, optimistic, scheduled, ghost, freshArrival],
[queuedInWindow],
new Set(['queued-server-id', 'ghost-server-id'])
)
const ids = reconciled.map((m) => m.id)
expect(ids).toContain('queued-server-id') // confirmed by server -> kept
expect(ids).toContain('opt-local') // optimistic -> kept (echo may be in flight)
expect(ids).toContain('sched-server-id') // scheduled -> kept (hub omits future rows)
expect(ids).toContain('fresh-server-id') // arrived after fetch start -> kept
expect(ids).not.toContain('ghost-server-id') // echoed+immediate+eligible+absent -> dropped
})

it('keeps a queued row that arrives via SSE while the latest fetch is in flight', async () => {
const base = 1_700_000_200_000
// A pre-existing ghost in the hydrated window (queued when the fetch starts,
// server no longer reports it as queued).
const ghost: DecryptedMessage = {
id: 'ghost-server-id', seq: 1, localId: 'ghost-local',
content: { role: 'user', content: { type: 'text', text: 'ghost' } },
createdAt: base, invokedAt: null, status: undefined,
} as DecryptedMessage
ingestIncomingMessages(SESSION_ID, [ghost])

const httpRequest = deferred<Awaited<ReturnType<ApiClient['getMessages']>>>()
const api = { getMessages: vi.fn(async () => httpRequest.promise) } as Pick<ApiClient, 'getMessages'> & {
getMessages: ReturnType<typeof vi.fn>
}

const load = fetchLatestMessages(api as unknown as ApiClient, SESSION_ID)

// While the HTTP request is in flight, a fresh server-echoed queued row lands
// via SSE. It is absent from the (older) response snapshot below.
const fresh: DecryptedMessage = {
id: 'fresh-server-id', seq: 2, localId: 'fresh-local',
content: { role: 'user', content: { type: 'text', text: 'fresh queued' } },
createdAt: base + 50_000, invokedAt: null, status: undefined,
} as DecryptedMessage
ingestIncomingMessages(SESSION_ID, [fresh])

httpRequest.resolve({
messages: [makeAgentMessage({ id: 'fresh-agent', seq: 3, createdAt: base + 100_000 })],
page: { limit: 50, nextBeforeSeq: null, nextBeforeAt: null, hasMore: false },
})
await load

const state = getMessageWindowState(SESSION_ID)
const ids = [...state.messages, ...state.pending].map((m) => m.id)
expect(ids).not.toContain('ghost-server-id') // queued at fetch start + absent -> dropped
expect(ids).toContain('fresh-server-id') // arrived mid-fetch -> kept
})
})
74 changes: 73 additions & 1 deletion web/src/lib/message-window-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,68 @@ function isOptimisticMessage(message: DecryptedMessage): boolean {
return Boolean(message.localId && message.id === message.localId)
}

/**
* Drops phantom queued messages during an at-bottom full refresh.
*
* A queued message (invokedAt === null) is normally cleared by the live
* `messages-consumed` SSE (markMessagesConsumed flips invokedAt). That event is
* one-shot: if the client was offline/closed when the CLI consumed the message,
* the signal is lost forever. On reload the row is restored from sessionStorage
* still carrying invokedAt: null, but the server's invoked copy is too old to
* appear in the latest window, so mergeMessages never corrects it and
* trimPreservingQueued pins it — a ghost card above the composer that never
* clears.
*
* The latest at-bottom page is authoritative for the newest slice of history: a
* genuinely-still-queued immediate message sorts by createdAt to the very top
* and is therefore always present in the fetched window. So an immediate,
* server-echoed, locally-queued message whose id is absent from the server
* response is a ghost and is dropped.
*
* Guards against false positives (these are kept even when absent from the
* response):
* - optimistic rows (id === localId): the server echo may still be in flight;
* mergeMessages owns their reconciliation.
* - scheduled rows (scheduledAt != null): the hub omits not-yet-mature
* scheduled messages from getMessages, so absence is expected; they have
* their own maturation/release path.
* - rows absent from `eligibleIds`: only messages already queued when the
* fetch was issued are candidates. `serverMessages` is the HTTP snapshot
* taken at the request's start; a `message-received` SSE that lands while
* the fetch is in flight can add a real server-echoed queued row the
* snapshot never saw. Without this gate that fresh row would be filtered as
* a ghost and the queued bar would lose genuine work.
*
* @internal Exported for unit testing.
*/
export function reconcileQueuedAgainstLatest(
merged: DecryptedMessage[],
serverMessages: DecryptedMessage[],
eligibleIds: Set<string>
): DecryptedMessage[] {
const serverIds = new Set(serverMessages.map((m) => m.id))
return merged.filter((msg) => {
if (!isQueuedForInvocation(msg)) return true
if (msg.scheduledAt != null) return true
if (isOptimisticMessage(msg)) return true
if (!eligibleIds.has(msg.id)) return true
return serverIds.has(msg.id)
})
}

/** Ids of immediate, server-echoed queued rows in a snapshot — the only rows
* eligible for ghost reconciliation. Captured at fetch-request start so rows
* added by a concurrent SSE are exempt. See reconcileQueuedAgainstLatest. */
function queuedReconcileCandidateIds(messages: DecryptedMessage[], pending: DecryptedMessage[]): Set<string> {
const ids = new Set<string>()
for (const msg of [...messages, ...pending]) {
if (isQueuedForInvocation(msg) && msg.scheduledAt == null && !isOptimisticMessage(msg)) {
ids.add(msg.id)
}
}
return ids
}

function mergeIntoPending(
prev: InternalState,
incoming: DecryptedMessage[]
Expand Down Expand Up @@ -762,6 +824,11 @@ export async function fetchLatestMessages(api: ApiClient, sessionId: string): Pr
if (initial.isLoading) {
return
}
// Snapshot the queued rows that exist now, before awaiting the HTTP fetch.
// Only these are eligible for ghost reconciliation — a queued row inserted by
// a concurrent message-received SSE must not be filtered against the older
// response snapshot that predates it.
const reconcileCandidateIds = queuedReconcileCandidateIds(initial.messages, initial.pending)
const generation = beginAsyncGeneration(sessionId, 'latest', { isLoading: true, warning: null })

try {
Expand All @@ -781,7 +848,12 @@ export async function fetchLatestMessages(api: ApiClient, sessionId: string): Pr
updateStateForGeneration(sessionId, 'latest', generation, (prev) => {
if (prev.atBottom) {
const merged = mergeMessages(prev.messages, [...prev.pending, ...response.messages])
const trimmed = trimVisible(merged, 'append')
// Reconcile against the authoritative latest page before trimming:
// trimVisible preserves every queued row, so a ghost (queued locally
// but already invoked server-side, missed messages-consumed while
// offline) would otherwise be pinned forever.
const reconciled = reconcileQueuedAgainstLatest(merged, response.messages, reconcileCandidateIds)
const trimmed = trimVisible(reconciled, 'append')
return buildState(prev, {
messages: trimmed,
pending: [],
Expand Down
Loading