Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 96 additions & 1 deletion hub/src/sync/messageService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { join } from 'node:path'
import { MessageService } from './messageService'
import { Store } from '../store'
import type { Server } from 'socket.io'
import type { SyncEvent } from '@hapi/protocol/types'
import type { Session, SyncEvent } from '@hapi/protocol/types'

// ---------------------------------------------------------------------------
// Test helpers
Expand All @@ -28,6 +28,32 @@ function makeSession(store: Store, tag: string) {
return store.sessions.getOrCreateSession(tag, { path: `/tmp/${tag}` }, null, 'default')
}

function toProtocolSession(session: ReturnType<typeof makeSession>): Session {
return {
id: session.id,
namespace: session.namespace,
seq: session.seq,
createdAt: session.createdAt,
updatedAt: session.updatedAt,
active: session.active,
activeAt: session.activeAt ?? session.updatedAt,
metadata: {
path: `/tmp/${session.tag ?? session.id}`,
host: 'localhost'
},
metadataVersion: session.metadataVersion,
agentState: null,
agentStateVersion: session.agentStateVersion,
thinking: false,
thinkingAt: session.updatedAt,
model: session.model,
modelReasoningEffort: session.modelReasoningEffort,
effort: session.effort,
permissionMode: 'default',
collaborationMode: 'default'
}
}

type AckCallback = (err: Error | null, responses: Array<{ removed: boolean }>) => void

function makeIo(onEmit: (ack: AckCallback) => void, socketCount = 1): Server {
Expand Down Expand Up @@ -94,6 +120,75 @@ describe('MessageService goal status filtering', () => {
])
})

it('exports chronological visible messages and omits queued user rows', () => {
const store = makeStore()
const session = makeSession(store, 'session-export-visible')

const first = store.messages.addMessage(session.id, { role: 'user', content: 'Hello' })
const queued = store.messages.addMessage(session.id, { role: 'user', content: 'Queued' }, 'local-queued')
store.messages.addMessage(session.id, redundantGoalStatusContent('Goal active'))
const hiddenSystem = store.messages.addMessage(session.id, {
role: 'agent',
content: {
type: 'output',
data: { type: 'system', subtype: 'init', uuid: 'sys-init' }
}
})
const second = store.messages.addMessage(session.id, { role: 'agent', content: 'Hi' })

const service = new MessageService(store, makeIo(() => {}), makePublisher() as any)
const result = service.getSessionExport(session.id, toProtocolSession(session))

expect(result.type).toBe('success')
if (result.type !== 'success') throw new Error('Expected success export')
expect(result.payload.messages.map((message) => message.id)).toEqual([first.id, second.id])
expect(result.payload.messages.some((message) => message.id === queued.id)).toBe(false)
expect(result.payload.messages.some((message) => message.id === hiddenSystem.id)).toBe(false)
})

it('orders invoked scheduled messages by display time, not insertion seq', () => {
const store = makeStore()
const session = makeSession(store, 'session-export-scheduled-order')

const scheduled = store.messages.addMessage(
session.id,
{ role: 'user', content: { type: 'text', text: 'Scheduled' } },
'local-scheduled',
Date.now() + 60_000
)
const normal = store.messages.addMessage(
session.id,
{ role: 'user', content: { type: 'text', text: 'Normal' } },
'local-normal'
)
store.messages.markMessagesInvoked(session.id, ['local-normal'], 2_000)
store.messages.markMessagesInvoked(session.id, ['local-scheduled'], 3_000)

const service = new MessageService(store, makeIo(() => {}), makePublisher() as any)
const result = service.getSessionExport(session.id, toProtocolSession(session))

expect(result.type).toBe('success')
if (result.type !== 'success') throw new Error('Expected success export')
expect(result.payload.messages.map((message) => message.id)).toEqual([normal.id, scheduled.id])
})

it('returns too-large instead of truncating an export over the cap', () => {
const store = makeStore()
const session = makeSession(store, 'session-export-cap')

store.messages.addMessage(session.id, { role: 'user', content: 'One' })
store.messages.addMessage(session.id, { role: 'agent', content: 'Two' })

const service = new MessageService(store, makeIo(() => {}), makePublisher() as any)
const result = service.getSessionExport(session.id, toProtocolSession(session), 1)

expect(result).toEqual({
type: 'too-large',
count: 2,
limit: 1
})
})

it('pages past hidden-only goal status rows', () => {
const store = makeStore()
const session = makeSession(store, 'goal-status-pagination')
Expand Down
78 changes: 76 additions & 2 deletions hub/src/sync/messageService.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
import type { AttachmentMetadata, DecryptedMessage } from '@hapi/protocol/types'
import { isRedundantGoalStatusEventContent } from '@hapi/protocol/messages'
import {
HAPI_SESSION_EXPORT_SCHEMA_VERSION,
SESSION_EXPORT_MESSAGE_LIMIT,
type HapiSessionExportResult
} from '@hapi/protocol/sessionExport'
import type { AttachmentMetadata, DecryptedMessage, Session } from '@hapi/protocol/types'
import {
isClaudeChatVisibleMessage,
isRedundantGoalStatusEventContent,
unwrapRoleWrappedRecordEnvelope
} from '@hapi/protocol/messages'
import { isObject } from '@hapi/protocol'
import type { Server } from 'socket.io'
import { randomUUID } from 'node:crypto'
import type { Store, CancelQueuedMessageResult } from '../store'
Expand Down Expand Up @@ -27,6 +37,37 @@ function toVisibleDecryptedMessages(messages: StoredMessageForDelivery[]): Decry
return messages.filter(isWebVisibleStoredMessage).map(toDecryptedMessage)
}

function isQueuedUserMessage(message: StoredMessageForDelivery): boolean {
const record = unwrapRoleWrappedRecordEnvelope(message.content)
return record?.role === 'user' && message.invokedAt === null
}

function isExportVisibleStoredMessage(message: StoredMessageForDelivery): boolean {
if (!isWebVisibleStoredMessage(message) || isQueuedUserMessage(message)) {
return false
}

const record = unwrapRoleWrappedRecordEnvelope(message.content)
if (record?.role !== 'agent') {
return true
}

if (!isObject(record.content) || record.content.type !== 'output') {
return true
}

const data = isObject(record.content.data) ? record.content.data : null
if (!data) {
return true
}

if (Boolean(data.isMeta) || Boolean(data.isCompactSummary)) {
return false
}

return isClaudeChatVisibleMessage({ type: data.type, subtype: data.subtype })
}

export class MessageService {
/** One scheduled-matured SSE per localId per hub process (cleared on cancel/consume paths here). */
private readonly scheduledMatureNotifiedLocalIds = new Set<string>()
Expand All @@ -50,6 +91,39 @@ export class MessageService {
return toVisibleDecryptedMessages(stored)
}

getSessionExport(
sessionId: string,
session: Session,
limit: number = SESSION_EXPORT_MESSAGE_LIMIT
): HapiSessionExportResult {
const messages = this.store.messages.getAllMessages(sessionId)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MAJOR] getAllMessages() returns rows by insertion seq, but scheduled local messages are inserted when queued and receive invokedAt later. After release/consume, this export includes them at their old insertion position, unlike the visible thread which orders by COALESCE(invoked_at, created_at), seq. That makes JSON/Markdown exports disagree with the visible chronology.

Suggested fix:

const messages = this.store.messages.getAllMessages(sessionId)
    .filter(isExportVisibleStoredMessage)
    .sort((a, b) => {
        const aAt = a.invokedAt ?? a.createdAt
        const bAt = b.invokedAt ?? b.createdAt
        return aAt !== bAt ? aAt - bAt : a.seq - b.seq
    })
    .map(toDecryptedMessage)

.filter(isExportVisibleStoredMessage)
.sort((a, b) => {
const aAt = a.invokedAt ?? a.createdAt
const bAt = b.invokedAt ?? b.createdAt
return aAt !== bAt ? aAt - bAt : a.seq - b.seq
})
.map(toDecryptedMessage)

if (messages.length > limit) {
return {
type: 'too-large',
count: messages.length,
limit
}
}

return {
type: 'success',
payload: {
schemaVersion: HAPI_SESSION_EXPORT_SCHEMA_VERSION,
exportedAt: Date.now(),
session,
messages
}
}
}

getMessagesPage(
sessionId: string,
options: { limit: number; before?: { at: number; seq: number } | null }
Expand Down
5 changes: 5 additions & 0 deletions hub/src/sync/syncEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import type { AgentFlavor, CodexCollaborationMode, DecryptedMessage, PermissionM
import { unwrapRoleWrappedRecordEnvelope } from '@hapi/protocol/messages'
import type { Server } from 'socket.io'
import type { Store, CancelQueuedMessageResult } from '../store'
import type { HapiSessionExportResult } from '@hapi/protocol/sessionExport'
import type { RpcRegistry } from '../socket/rpcRegistry'
import type { SSEManager } from '../sse/sseManager'
import { EventPublisher, type SyncEventListener } from './eventPublisher'
Expand Down Expand Up @@ -243,6 +244,10 @@ export class SyncEngine {
return this.messageService.getMessagesPage(sessionId, options)
}

getSessionExport(sessionId: string, session: Session): HapiSessionExportResult {
return this.messageService.getSessionExport(sessionId, session)
}

getDeliverableMessagesAfter(sessionId: string, options: { afterSeq: number; limit: number; now: number }): DecryptedMessage[] {
return this.messageService.getDeliverableMessagesAfter(sessionId, options)
}
Expand Down
86 changes: 86 additions & 0 deletions hub/src/web/routes/sessions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ function createSession(overrides?: Partial<Session>): Session {
function createApp(session: Session, opts?: {
resumeSession?: (sessionId: string, namespace: string, resumeOpts?: { permissionMode?: string }) => Promise<{ type: string; sessionId?: string; message?: string; code?: string }>
listSlashCommands?: SyncEngine['listSlashCommands']
getSessionExport?: (sessionId: string, session: Session) => unknown
}) {
const applySessionConfigCalls: Array<[string, Record<string, unknown>]> = []
const applySessionConfig = async (sessionId: string, config: Record<string, unknown>) => {
Expand Down Expand Up @@ -88,6 +89,15 @@ function createApp(session: Session, opts?: {
listCursorModelsForSession,
listOpencodeModelsForSession,
resumeSession,
getSessionExport: opts?.getSessionExport ?? (() => ({
type: 'success',
payload: {
schemaVersion: 1,
exportedAt: 1_762_000_000_000,
session,
messages: []
}
})),
listSlashCommands: opts?.listSlashCommands ?? (async () => ({
success: true,
commands: []
Expand All @@ -105,6 +115,82 @@ function createApp(session: Session, opts?: {
}

describe('sessions routes', () => {
it('exports an empty session conversation payload', async () => {
const session = createSession()
const { app } = createApp(session)

const response = await app.request('/api/sessions/session-1/export')

expect(response.status).toBe(200)
expect(await response.json()).toEqual({
schemaVersion: 1,
exportedAt: 1_762_000_000_000,
session,
messages: []
})
})

it('exports visible messages in chronological order', async () => {
const session = createSession()
const messages = [
{
id: 'msg-1',
seq: 1,
localId: null,
content: { role: 'user', content: 'Hello' },
createdAt: 1000,
invokedAt: 1001,
scheduledAt: null
},
{
id: 'msg-2',
seq: 2,
localId: null,
content: { role: 'agent', content: 'Hi there' },
createdAt: 1002,
invokedAt: 1002,
scheduledAt: null
}
]
const { app } = createApp(session, {
getSessionExport: () => ({
type: 'success',
payload: {
schemaVersion: 1,
exportedAt: 1_762_000_000_000,
session,
messages
}
})
})

const response = await app.request('/api/sessions/session-1/export')

expect(response.status).toBe(200)
const body = await response.json() as { messages: Array<{ id: string }> }
expect(body.messages.map((message) => message.id)).toEqual(['msg-1', 'msg-2'])
})

it('returns 413 when the export exceeds the hard message cap', async () => {
const session = createSession()
const { app } = createApp(session, {
getSessionExport: () => ({
type: 'too-large',
count: 20_001,
limit: 20_000
})
})

const response = await app.request('/api/sessions/session-1/export')

expect(response.status).toBe(413)
expect(await response.json()).toEqual({
error: 'Session export too large',
count: 20_001,
limit: 20_000
})
})

it('rejects collaboration mode changes for local Codex sessions', async () => {
const session = createSession({
agentState: {
Expand Down
23 changes: 23 additions & 0 deletions hub/src/web/routes/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,29 @@ export function createSessionsRoutes(getSyncEngine: () => SyncEngine | null): Ho
return c.json({ sessions })
})

app.get('/sessions/:id/export', (c) => {
const engine = requireSyncEngine(c, getSyncEngine)
if (engine instanceof Response) {
return engine
}

const sessionResult = requireSessionFromParam(c, engine)
if (sessionResult instanceof Response) {
return sessionResult
}

const result = engine.getSessionExport(sessionResult.sessionId, sessionResult.session)
if (result.type === 'too-large') {
return c.json({
error: 'Session export too large',
count: result.count,
limit: result.limit
}, 413)
}

return c.json(result.payload)
})

app.get('/sessions/:id', (c) => {
const engine = requireSyncEngine(c, getSyncEngine)
if (engine instanceof Response) {
Expand Down
1 change: 1 addition & 0 deletions shared/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"./modes": "./src/modes.ts",
"./rpcMethods": "./src/rpcMethods.ts",
"./schemas": "./src/schemas.ts",
"./sessionExport": "./src/sessionExport.ts",
"./types": "./src/types.ts",
"./voice": "./src/voice.ts"
},
Expand Down
1 change: 1 addition & 0 deletions shared/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export * from './resume'
export * from './rpcMethods'
export * from './socket'
export * from './sessionSummary'
export * from './sessionExport'
export * from './slashCommands'
export * from './utils'
export * from './version'
Expand Down
Loading
Loading