Skip to content

Commit aacfba4

Browse files
authored
expose ShapeStream.fetchSnapshot as public api (#3463)
This exposes the `ShapeStream.fetchSnapshot` as a public api that can be used to fetch a snapshot without it being injected into the emitted stream of change messages. This is useful for cases where the user wants to handle the application of these snapshot in a custom way, as we need to in Tanstack DB for the `progressive` mode. While working on this I also took the opportunity to tidy up the parsing of the messages, in the old implementation we would JSON.stringify the messages, only to the parse them again. I refactored the parser to be able to work on already parsed (JSON -> basic JS) messages. First commit is the core change to expose `fetchSnapshot`, second commit is the refactor of message parsing.
1 parent ff45de1 commit aacfba4

File tree

4 files changed

+340
-39
lines changed

4 files changed

+340
-39
lines changed

.changeset/late-owls-wave.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@electric-sql/client': patch
3+
---
4+
5+
Expose the ShapeStream.fetchSnapshot as a public api that can be used to fetch a snapshot without it being injected into the emitted stream of change messages. This is useful for cases where the user wants to handle the application of these snapshot in a custom way.

packages/typescript-client/src/client.ts

Lines changed: 71 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import {
1717
InvalidSignalError,
1818
MissingShapeHandleError,
1919
ReservedParamError,
20+
MissingHeadersError,
2021
} from './error'
2122
import {
2223
BackoffDefaults,
@@ -368,16 +369,15 @@ export interface ShapeStreamInterface<T extends Row<unknown> = Row> {
368369

369370
forceDisconnectAndRefresh(): Promise<void>
370371

371-
requestSnapshot(params: {
372-
where?: string
373-
params?: Record<string, string>
374-
limit: number
375-
offset?: number
376-
orderBy: string
377-
}): Promise<{
372+
requestSnapshot(params: SubsetParams): Promise<{
378373
metadata: SnapshotMetadata
379374
data: Array<Message<T>>
380375
}>
376+
377+
fetchSnapshot(opts: SubsetParams): Promise<{
378+
metadata: SnapshotMetadata
379+
data: Array<ChangeMessage<T>>
380+
}>
381381
}
382382

383383
/**
@@ -774,7 +774,10 @@ export class ShapeStream<T extends Row<unknown> = Row>
774774
fetchUrl.searchParams.set(OFFSET_QUERY_PARAM, this.#lastOffset)
775775
fetchUrl.searchParams.set(LOG_MODE_QUERY_PARAM, this.#mode)
776776

777-
if (this.#isUpToDate) {
777+
// Snapshot requests (with subsetParams) should never use live polling
778+
const isSnapshotRequest = subsetParams !== undefined
779+
780+
if (this.#isUpToDate && !isSnapshotRequest) {
778781
// If we are resuming from a paused state, we don't want to perform a live request
779782
// because it could be a long poll that holds for 20sec
780783
// and during all that time `isConnected` will be false
@@ -846,11 +849,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
846849
this.#liveCacheBuster = liveCacheBuster
847850
}
848851

849-
const getSchema = (): Schema => {
850-
const schemaHeader = headers.get(SHAPE_SCHEMA_HEADER)
851-
return schemaHeader ? JSON.parse(schemaHeader) : {}
852-
}
853-
this.#schema = this.#schema ?? getSchema()
852+
this.#schema = this.#schema ?? getSchemaFromHeaders(headers)
854853

855854
// NOTE: 204s are deprecated, the Electric server should not
856855
// send these in latest versions but this is here for backwards
@@ -1237,7 +1236,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
12371236
}
12381237

12391238
/**
1240-
* Request a snapshot for subset of data.
1239+
* Request a snapshot for subset of data and inject it into the subscribed data stream.
12411240
*
12421241
* Only available when mode is `changes_only`.
12431242
* Returns the insertion point & the data, but more importantly injects the data
@@ -1275,16 +1274,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
12751274
this.#pause()
12761275
}
12771276

1278-
const { fetchUrl, requestHeaders } = await this.#constructUrl(
1279-
this.options.url,
1280-
true,
1281-
opts
1282-
)
1283-
1284-
const { metadata, data } = await this.#fetchSnapshot(
1285-
fetchUrl,
1286-
requestHeaders
1287-
)
1277+
const { metadata, data } = await this.fetchSnapshot(opts)
12881278

12891279
const dataWithEndBoundary = (data as Array<Message<T>>).concat([
12901280
{ headers: { control: `snapshot-end`, ...metadata } },
@@ -1309,30 +1299,79 @@ export class ShapeStream<T extends Row<unknown> = Row>
13091299
}
13101300
}
13111301

1312-
async #fetchSnapshot(url: URL, headers: Record<string, string>) {
1313-
const response = await this.#fetchClient(url.toString(), { headers })
1302+
/**
1303+
* Fetch a snapshot for subset of data.
1304+
* Returns the metadata and the data, but does not inject it into the subscribed data stream.
1305+
*
1306+
* @param opts - The options for the snapshot request.
1307+
* @returns The metadata and the data for the snapshot.
1308+
*/
1309+
async fetchSnapshot(opts: SubsetParams): Promise<{
1310+
metadata: SnapshotMetadata
1311+
data: Array<ChangeMessage<T>>
1312+
}> {
1313+
const { fetchUrl, requestHeaders } = await this.#constructUrl(
1314+
this.options.url,
1315+
true,
1316+
opts
1317+
)
1318+
1319+
const response = await this.#fetchClient(fetchUrl.toString(), {
1320+
headers: requestHeaders,
1321+
})
13141322

13151323
if (!response.ok) {
13161324
throw new FetchError(
13171325
response.status,
13181326
undefined,
13191327
undefined,
13201328
Object.fromEntries([...response.headers.entries()]),
1321-
url.toString()
1329+
fetchUrl.toString()
13221330
)
13231331
}
13241332

1325-
const { metadata, data } = await response.json()
1326-
const batch = this.#messageParser.parse<Array<ChangeMessage<T>>>(
1327-
JSON.stringify(data),
1328-
this.#schema!
1333+
// Use schema from stream if available, otherwise extract from response header
1334+
const schema: Schema =
1335+
this.#schema ??
1336+
getSchemaFromHeaders(response.headers, {
1337+
required: true,
1338+
url: fetchUrl.toString(),
1339+
})
1340+
1341+
const { metadata, data: rawData } = await response.json()
1342+
const data = this.#messageParser.parseSnapshotData<ChangeMessage<T>>(
1343+
rawData,
1344+
schema
13291345
)
13301346

13311347
return {
13321348
metadata,
1333-
data: batch,
1349+
data,
1350+
}
1351+
}
1352+
}
1353+
1354+
/**
1355+
* Extracts the schema from response headers.
1356+
* @param headers - The response headers
1357+
* @param options - Options for schema extraction
1358+
* @param options.required - If true, throws MissingHeadersError when header is missing. Defaults to false.
1359+
* @param options.url - The URL to include in the error message if required is true
1360+
* @returns The parsed schema, or an empty object if not required and header is missing
1361+
* @throws {MissingHeadersError} if required is true and the header is missing
1362+
*/
1363+
function getSchemaFromHeaders(
1364+
headers: Headers,
1365+
options?: { required?: boolean; url?: string }
1366+
): Schema {
1367+
const schemaHeader = headers.get(SHAPE_SCHEMA_HEADER)
1368+
if (!schemaHeader) {
1369+
if (options?.required && options?.url) {
1370+
throw new MissingHeadersError(options.url, [SHAPE_SCHEMA_HEADER])
13341371
}
1372+
return {}
13351373
}
1374+
return JSON.parse(schemaHeader)
13361375
}
13371376

13381377
/**

packages/typescript-client/src/parser.ts

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -122,18 +122,56 @@ export class MessageParser<T extends Row<unknown>> {
122122
typeof value === `object` &&
123123
value !== null
124124
) {
125-
// Parse the row values
126-
const row = value as Record<string, Value<GetExtensions<T>>>
127-
Object.keys(row).forEach((key) => {
128-
row[key] = this.parseRow(key, row[key] as NullableToken, schema)
129-
})
130-
131-
if (this.transformer) value = this.transformer(value)
125+
return this.transformMessageValue(value, schema)
132126
}
133127
return value
134128
}) as Result
135129
}
136130

131+
/**
132+
* Parse an array of ChangeMessages from a snapshot response.
133+
* Applies type parsing and transformations to the value and old_value properties.
134+
*/
135+
parseSnapshotData<Result>(
136+
messages: Array<unknown>,
137+
schema: Schema
138+
): Array<Result> {
139+
return messages.map((message) => {
140+
const msg = message as Record<string, unknown>
141+
142+
// Transform the value property if it exists
143+
if (msg.value && typeof msg.value === `object` && msg.value !== null) {
144+
msg.value = this.transformMessageValue(msg.value, schema)
145+
}
146+
147+
// Transform the old_value property if it exists
148+
if (
149+
msg.old_value &&
150+
typeof msg.old_value === `object` &&
151+
msg.old_value !== null
152+
) {
153+
msg.old_value = this.transformMessageValue(msg.old_value, schema)
154+
}
155+
156+
return msg as Result
157+
})
158+
}
159+
160+
/**
161+
* Transform a message value or old_value object by parsing its columns.
162+
*/
163+
private transformMessageValue(
164+
value: unknown,
165+
schema: Schema
166+
): Row<GetExtensions<T>> {
167+
const row = value as Record<string, Value<GetExtensions<T>>>
168+
Object.keys(row).forEach((key) => {
169+
row[key] = this.parseRow(key, row[key] as NullableToken, schema)
170+
})
171+
172+
return this.transformer ? this.transformer(row) : row
173+
}
174+
137175
// Parses the message values using the provided parser based on the schema information
138176
private parseRow(
139177
key: string,

0 commit comments

Comments
 (0)