Skip to content

Commit e4a047b

Browse files
committed
feat: add support for retrieving offloaded measurements (#761)
1 parent 1b8d195 commit e4a047b

File tree

4 files changed

+313
-8
lines changed

4 files changed

+313
-8
lines changed

src/measurement/store-offloader.ts

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { promisify } from 'node:util';
2-
import { brotliCompress as brotliCompressCallback, constants as zlibConstants } from 'node:zlib';
2+
import { brotliCompress as brotliCompressCallback, brotliDecompress as brotliDecompressCallback, constants as zlibConstants } from 'node:zlib';
33
import { Queue as BullQueue, Worker } from 'bullmq';
44
import BatchQueue from '@martin-kolarik/batch-queue';
55
import Bluebird from 'bluebird';
@@ -14,9 +14,14 @@ import { MeasurementStore } from './store.js';
1414

1515
const logger = scopedLogger('db-store');
1616
const brotliCompress = promisify(brotliCompressCallback);
17+
const brotliDecompress = promisify(brotliDecompressCallback);
1718

18-
const compressRecord = (record: MeasurementRecord): Promise<Buffer> => {
19-
return brotliCompress(JSON.stringify(record), { params: { [zlibConstants.BROTLI_PARAM_QUALITY]: 5 } });
19+
const compressRecord = (record: string): Promise<Buffer> => {
20+
return brotliCompress(record, { params: { [zlibConstants.BROTLI_PARAM_QUALITY]: 5 } });
21+
};
22+
23+
const decompressRecord = async (buffer: Buffer): Promise<string> => {
24+
return (await brotliDecompress(buffer)).toString();
2025
};
2126

2227
export class MeasurementStoreOffloader {
@@ -53,6 +58,23 @@ export class MeasurementStoreOffloader {
5358
this.offloadQueues[tier].push(measurement);
5459
}
5560

61+
async getMeasurementString (id: string, userTierNum: keyof typeof USER_TIER_INVERTED, createdAtRounded: number): Promise<string | null> {
62+
const tier = USER_TIER_INVERTED[userTierNum];
63+
const table = `measurement_${tier}`;
64+
const createdAt = new Date(createdAtRounded);
65+
66+
const row = await this.measurementStoreDb(table)
67+
.where({ id, createdAt })
68+
.select<{ data: Buffer }[]>('data')
69+
.first();
70+
71+
if (!row) {
72+
return null;
73+
}
74+
75+
return decompressRecord(row.data);
76+
}
77+
5678
startRetryWorker () {
5779
const worker = new Worker<{ tier: UserTier; ids: string[] }>(
5880
this.fallbackQueueName,
@@ -127,14 +149,16 @@ export class MeasurementStoreOffloader {
127149
return {
128150
id: r.id,
129151
createdAt: roundIdTime(new Date(r.createdAt)),
130-
data: await compressRecord(r),
152+
data: await compressRecord(JSON.stringify(r)),
131153
};
132154
}, { concurrency: 4 });
133155

134156
await this.measurementStoreDb(table)
135157
.insert(rows)
136158
.onConflict([ 'id', 'createdAt' ])
137159
.ignore();
160+
161+
this.primaryMeasurementStore.setOffloadedExpiration(measurements.map(m => m.id)).catch(() => {});
138162
}
139163

140164
private async enqueueFallbackJob (tier: UserTier, ids: string[]) {

src/measurement/store.ts

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,19 @@ import _ from 'lodash';
55

66
import type { OfflineProbe, ServerProbe } from '../probe/types.js';
77
import { scopedLogger } from '../lib/logger.js';
8-
import type { MeasurementProgressMessage, MeasurementRecord, MeasurementRequest, MeasurementResult, MeasurementResultMessage, RequestType } from './types.js';
8+
import type {
9+
MeasurementProgressMessage,
10+
MeasurementRecord,
11+
MeasurementRequest,
12+
MeasurementResult,
13+
MeasurementResultMessage,
14+
RequestType,
15+
} from './types.js';
916
import { getDefaults } from './schema/utils.js';
1017
import { getMeasurementRedisClient, type RedisCluster } from '../lib/redis/measurement-client.js';
1118
import { getPersistentRedisClient, type RedisClient } from '../lib/redis/persistent-client.js';
1219
import { AuthenticateStateUser } from '../lib/http/middleware/authenticate.js';
13-
import { generateMeasurementId } from './id.js';
20+
import { generateMeasurementId, parseMeasurementId } from './id.js';
1421
import { MeasurementStoreOffloader } from './store-offloader.js';
1522
import { measurementStoreClient } from '../lib/sql/client.js';
1623

@@ -54,7 +61,31 @@ export class MeasurementStore {
5461
this.offloader = new MeasurementStoreOffloader(measurementStoreClient, this);
5562
}
5663

57-
async getMeasurementString (id: string): Promise<string> {
64+
async getMeasurementString (id: string): Promise<string | null> {
65+
let userTier;
66+
let minutesSinceEpoch;
67+
68+
try {
69+
({ minutesSinceEpoch, userTier } = parseMeasurementId(id));
70+
} catch {
71+
return null;
72+
}
73+
74+
const createdAtMs = minutesSinceEpoch * 60_000;
75+
const isOlderThan30m = Date.now() - createdAtMs > 30 * 60_000;
76+
77+
if (isOlderThan30m) {
78+
try {
79+
const offloaded = await this.offloader.getMeasurementString(id, userTier, createdAtMs);
80+
81+
if (offloaded) {
82+
return offloaded;
83+
}
84+
} catch {
85+
// Fall back to Redis.
86+
}
87+
}
88+
5889
const key = getMeasurementKey(id);
5990
return this.redis.sendCommand(key, true, [ 'JSON.GET', key ]);
6091
}
@@ -206,6 +237,14 @@ export class MeasurementStore {
206237
this.offloader.startRetryWorker();
207238
}
208239

240+
async setOffloadedExpiration (ids: string[]): Promise<void> {
241+
if (ids.length === 0) {
242+
return;
243+
}
244+
245+
await Bluebird.map(ids, id => this.redis.expire(getMeasurementKey(id), 60 * 60), { concurrency: 8 });
246+
}
247+
209248
removeDefaults (measurement: Partial<MeasurementRecord>, request: MeasurementRequest): Partial<MeasurementRecord> {
210249
const defaults = getDefaults(request);
211250

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
import type { Server } from 'node:http';
2+
import { brotliCompress as brotliCompressCallback, constants as zlibConstants } from 'node:zlib';
3+
import { promisify } from 'node:util';
4+
import request, { Agent } from 'supertest';
5+
import Bluebird from 'bluebird';
6+
import { expect } from 'chai';
7+
8+
import { getTestServer } from '../../../utils/server.js';
9+
import { measurementStoreClient } from '../../../../src/lib/sql/client.js';
10+
import { getMeasurementRedisClient } from '../../../../src/lib/redis/measurement-client.js';
11+
import { generateMeasurementId, roundIdTime } from '../../../../src/measurement/id.js';
12+
import { getMeasurementKey } from '../../../../src/measurement/store.js';
13+
14+
const brotliCompress = promisify(brotliCompressCallback);
15+
16+
describe('Get measurement', () => {
17+
let app: Server;
18+
let requestAgent: Agent;
19+
20+
const buildMeasurementRecord = (id: string, time: Date) => {
21+
return {
22+
id,
23+
type: 'ping',
24+
status: 'finished',
25+
createdAt: time.toISOString(),
26+
updatedAt: time.toISOString(),
27+
target: 'example.com',
28+
probesCount: 1,
29+
results: [],
30+
};
31+
};
32+
33+
before(async () => {
34+
app = await getTestServer();
35+
requestAgent = request(app);
36+
});
37+
38+
describe('errors', () => {
39+
it('should respond with a 404 for a non-existing measurement id', async () => {
40+
const nonExisting = generateMeasurementId(new Date());
41+
42+
await requestAgent
43+
.get(`/v1/measurements/${nonExisting}`)
44+
.expect(404)
45+
.expect((response) => {
46+
expect(response.body.error).to.include({
47+
message: 'Couldn\'t find the requested measurement.',
48+
type: 'not_found',
49+
});
50+
51+
expect(response).to.matchApiSchema();
52+
});
53+
});
54+
55+
it('should respond 404 for an invalid id format', async () => {
56+
await requestAgent
57+
.get('/v1/measurements/invalid-id-123')
58+
.expect(404)
59+
.expect((response) => {
60+
expect(response.body.error.type).to.equal('not_found');
61+
expect(response).to.matchApiSchema();
62+
});
63+
});
64+
});
65+
66+
describe('success (from Redis)', () => {
67+
const redisKeysToCleanup: string[] = [];
68+
69+
afterEach(async () => {
70+
const redis = getMeasurementRedisClient();
71+
72+
await Bluebird.map(redisKeysToCleanup.splice(0), (key) => {
73+
return redis.del(key);
74+
});
75+
});
76+
77+
it('should return measurement JSON stored in Redis', async () => {
78+
const now = new Date();
79+
const id = generateMeasurementId(now);
80+
const key = getMeasurementKey(id);
81+
const record = buildMeasurementRecord(id, now);
82+
83+
const redis = getMeasurementRedisClient();
84+
await redis.json.set(key, '$', record);
85+
redisKeysToCleanup.push(key);
86+
87+
await requestAgent
88+
.get(`/v1/measurements/${id}`)
89+
.expect(200)
90+
.expect((response) => {
91+
expect(response.body).to.deep.equal(record);
92+
expect(response).to.matchApiSchema();
93+
});
94+
});
95+
});
96+
97+
describe('success (from Postgres offload)', () => {
98+
const redisKeysToCleanup: string[] = [];
99+
const dbRowsToCleanup: { table: string; id: string; createdAt: Date }[] = [];
100+
101+
afterEach(async () => {
102+
const redis = getMeasurementRedisClient();
103+
104+
await Bluebird.map(redisKeysToCleanup.splice(0), (key) => {
105+
return redis.del(key);
106+
});
107+
108+
await Bluebird.map(dbRowsToCleanup.splice(0), (row) => {
109+
return measurementStoreClient(row.table)
110+
.where({ id: row.id, createdAt: row.createdAt })
111+
.delete();
112+
});
113+
});
114+
115+
it('should return measurement JSON from the offloaded store when likely offloaded and older than 30 minutes', async () => {
116+
// Create an ID in the past (> 30 minutes)
117+
const createdAt = new Date(Date.now() - 40 * 60_000);
118+
const id = generateMeasurementId(createdAt);
119+
const roundedCreatedAt = roundIdTime(new Date(createdAt));
120+
121+
// Seed Postgres offload table for anonymous tier
122+
const table = 'measurement_anonymous';
123+
const record = buildMeasurementRecord(id, createdAt);
124+
const compressed = await brotliCompress(JSON.stringify(record), { params: { [zlibConstants.BROTLI_PARAM_QUALITY]: 1 } });
125+
await measurementStoreClient(table).insert({ id, createdAt: new Date(roundedCreatedAt), data: compressed });
126+
dbRowsToCleanup.push({ table, id, createdAt: new Date(roundedCreatedAt) });
127+
128+
// Ensure Redis does not have this key
129+
const key = getMeasurementKey(id);
130+
const redis = getMeasurementRedisClient();
131+
await redis.del(key);
132+
133+
await requestAgent
134+
.get(`/v1/measurements/${id}`)
135+
.expect(200)
136+
.expect((response) => {
137+
expect(response.body).to.deep.equal(record);
138+
expect(response).to.matchApiSchema();
139+
});
140+
});
141+
});
142+
});

0 commit comments

Comments
 (0)