diff --git a/README.md b/README.md index 7ddc0ca7..9f809b3c 100644 --- a/README.md +++ b/README.md @@ -220,6 +220,29 @@ Print the Tor hostname: ./scripts/print_tor_hostname ``` +### Importing events from JSON Lines + +You can import NIP-01 events from a `.jsonl` file directly into the relay database. + +Basic import: + ``` + npm run import -- ./events.jsonl + ``` + +Set a custom batch size (default: `1000`): + ``` + npm run import -- ./events.jsonl --batch-size 500 + ``` + +The importer: + +- Processes the file line-by-line to keep memory usage bounded. +- Validates NIP-01 schema, event id hash, and Schnorr signature before insertion. +- Inserts in database transactions per batch. +- Skips duplicates without failing the whole import. +- Prints progress in the format: + `[Processed: 50,000 | Inserted: 45,000 | Skipped: 5,000 | Errors: 0]` + ### Running as a Service By default this server will run continuously until you stop it with Ctrl+C or until the system restarts. diff --git a/package.json b/package.json index 59cf9877..975cfa9d 100644 --- a/package.json +++ b/package.json @@ -31,6 +31,7 @@ "lint": "eslint --ext .ts ./src ./test", "lint:report": "eslint -o .lint-reports/eslint.json -f json --ext .ts ./src ./test", "lint:fix": "npm run lint -- --fix", + "import": "node -r ts-node/register src/import-events.ts", "db:migrate": "knex migrate:latest", "db:migrate:rollback": "knex migrate:rollback", "db:seed": "knex seed:run", diff --git a/src/@types/repositories.ts b/src/@types/repositories.ts index 671dfaee..86c91b4b 100644 --- a/src/@types/repositories.ts +++ b/src/@types/repositories.ts @@ -14,7 +14,9 @@ export interface IQueryResult extends Pick, keyof Promise & Exp export interface IEventRepository { create(event: Event): Promise + createMany(events: Event[]): Promise upsert(event: Event): Promise + upsertMany(events: Event[]): Promise findByFilters(filters: SubscriptionFilter[]): IQueryResult deleteByPubkeyAndIds(pubkey: Pubkey, ids: EventId[]): Promise } diff --git a/src/import-events.ts b/src/import-events.ts new file mode 100644 index 00000000..e6392a75 --- /dev/null +++ b/src/import-events.ts @@ -0,0 +1,183 @@ +import { extname, resolve } from 'path' + +import fs from 'fs' + +import dotenv from 'dotenv' + +dotenv.config() + +import { + createEventBatchPersister, + EventImportLineError, + EventImportService, + EventImportStats, +} from './services/event-import-service' +import { EventRepository } from './repositories/event-repository' +import { getMasterDbClient } from './database/client' + +interface CliOptions { + batchSize: number + filePath: string + showHelp: boolean +} + +const DEFAULT_BATCH_SIZE = 1000 +const MAX_ERROR_LOGS = 20 + +const formatNumber = (value: number): string => value.toLocaleString('en-US') + +const formatProgress = (stats: EventImportStats): string => { + return `[Processed: ${formatNumber(stats.processed)} | Inserted: ${formatNumber(stats.inserted)} | Skipped: ${formatNumber(stats.skipped)} | Errors: ${formatNumber(stats.errors)}]` +} + +const printUsage = (): void => { + console.log('Usage: npm run import -- [--batch-size ]') + console.log('Example: npm run import -- ./events.jsonl --batch-size 1000') +} + +const parseBatchSize = (value: string): number => { + const parsedValue = Number(value) + + if (!Number.isInteger(parsedValue) || parsedValue <= 0) { + throw new Error(`Invalid --batch-size value: ${value}`) + } + + return parsedValue +} + +const parseCliArgs = (args: string[]): CliOptions => { + let batchSize = DEFAULT_BATCH_SIZE + let filePath: string | undefined + + if (args.includes('--help') || args.includes('-h')) { + return { + batchSize, + filePath: '', + showHelp: true, + } + } + + for (let i = 0; i < args.length; i++) { + const arg = args[i] + + if (arg === '--batch-size') { + const nextArg = args[i + 1] + if (typeof nextArg !== 'string') { + throw new Error('Missing value for --batch-size') + } + + batchSize = parseBatchSize(nextArg) + i += 1 + continue + } + + if (arg.startsWith('--batch-size=')) { + batchSize = parseBatchSize(arg.split('=', 2)[1]) + continue + } + + if (arg.startsWith('--')) { + throw new Error(`Unknown option: ${arg}`) + } + + if (filePath) { + throw new Error(`Unexpected extra argument: ${arg}`) + } + + filePath = arg + } + + if (!filePath) { + throw new Error('Missing path to .jsonl file') + } + + return { + batchSize, + filePath, + showHelp: false, + } +} + +const ensureValidInputFile = (filePath: string): string => { + const absolutePath = resolve(process.cwd(), filePath) + + if (extname(absolutePath).toLowerCase() !== '.jsonl') { + throw new Error('Input file must have a .jsonl extension') + } + + if (!fs.existsSync(absolutePath)) { + throw new Error(`Input file does not exist: ${absolutePath}`) + } + + const stats = fs.statSync(absolutePath) + if (!stats.isFile()) { + throw new Error(`Input path is not a file: ${absolutePath}`) + } + + return absolutePath +} + +const run = async (): Promise => { + const options = parseCliArgs(process.argv.slice(2)) + + if (options.showHelp) { + printUsage() + return + } + + const absoluteFilePath = ensureValidInputFile(options.filePath) + + const dbClient = getMasterDbClient() + const eventRepository = new EventRepository(dbClient, dbClient) + const importer = new EventImportService(createEventBatchPersister(eventRepository)) + + let loggedErrors = 0 + let suppressedErrors = 0 + + const onLineError = ({ lineNumber, reason }: EventImportLineError) => { + if (loggedErrors < MAX_ERROR_LOGS) { + console.warn(`[line ${lineNumber}] ${reason}`) + loggedErrors += 1 + return + } + + suppressedErrors += 1 + } + + const onProgress = (stats: EventImportStats) => { + console.log(formatProgress(stats)) + } + + const startedAt = Date.now() + + try { + const stats = await importer.importFromJsonl(absoluteFilePath, { + batchSize: options.batchSize, + onLineError, + onProgress, + }) + + if (suppressedErrors > 0) { + console.warn(`Suppressed ${formatNumber(suppressedErrors)} additional line errors`) + } + + const elapsedSeconds = ((Date.now() - startedAt) / 1000).toFixed(2) + + console.log(`Import completed in ${elapsedSeconds}s`) + console.log(formatProgress(stats)) + } finally { + await dbClient.destroy() + } +} + +if (require.main === module) { + run().catch((error: unknown) => { + if (error instanceof Error) { + console.error(`Import failed: ${error.message}`) + } else { + console.error('Import failed with unknown error') + } + + process.exit(1) + }) +} diff --git a/src/repositories/event-repository.ts b/src/repositories/event-repository.ts index 9c645b8a..6e848178 100644 --- a/src/repositories/event-repository.ts +++ b/src/repositories/event-repository.ts @@ -170,9 +170,22 @@ export class EventRepository implements IEventRepository { return this.insert(event).then(prop('rowCount') as () => number, () => 0) } - private insert(event: Event) { - debug('inserting event: %o', event) - const row = applySpec({ + public async createMany(events: Event[]): Promise { + if (!events.length) { + return 0 + } + + const rows = events.map((event) => this.toInsertRow(event)) + + return this.masterDbClient('events') + .insert(rows) + .onConflict() + .ignore() + .then(prop('rowCount') as () => number, () => 0) + } + + private toInsertRow(event: Event) { + return applySpec({ event_id: pipe(prop('id'), toBuffer), event_pubkey: pipe(prop('pubkey'), toBuffer), event_created_at: prop('created_at'), @@ -187,6 +200,11 @@ export class EventRepository implements IEventRepository { always(null), ), })(event) + } + + private insert(event: Event) { + debug('inserting event: %o', event) + const row = this.toInsertRow(event) return this.masterDbClient('events') .insert(row) @@ -197,9 +215,50 @@ export class EventRepository implements IEventRepository { public upsert(event: Event): Promise { debug('upserting event: %o', event) + const row = this.toUpsertRow(event) + + const query = this.masterDbClient('events') + .insert(row) + // NIP-16: Replaceable Events + // NIP-33: Parameterized Replaceable Events + .onConflict( + this.masterDbClient.raw( + '(event_pubkey, event_kind, event_deduplication) WHERE (event_kind = 0 OR event_kind = 3 OR event_kind = 41 OR (event_kind >= 10000 AND event_kind < 20000)) OR (event_kind >= 30000 AND event_kind < 40000)' + ) + ) + .merge(omit(['event_pubkey', 'event_kind', 'event_deduplication'])(row)) + .where('events.event_created_at', '<', row.event_created_at) + + return { + then: (onfulfilled: (value: number) => T1 | PromiseLike, onrejected: (reason: any) => T2 | PromiseLike) => query.then(prop('rowCount') as () => number).then(onfulfilled, onrejected), + catch: (onrejected: (reason: any) => T | PromiseLike) => query.catch(onrejected), + toString: (): string => query.toString(), + } as Promise + } + + public async upsertMany(events: Event[]): Promise { + if (!events.length) { + return 0 + } + + const rows = events.map((event) => this.toUpsertRow(event)) + + return this.masterDbClient('events') + .insert(rows) + .onConflict( + this.masterDbClient.raw( + '(event_pubkey, event_kind, event_deduplication) WHERE (event_kind = 0 OR event_kind = 3 OR event_kind = 41 OR (event_kind >= 10000 AND event_kind < 20000)) OR (event_kind >= 30000 AND event_kind < 40000)' + ) + ) + .merge(['deleted_at', 'event_content', 'event_created_at', 'event_id', 'event_signature', 'event_tags', 'expires_at']) + .whereRaw('"events"."event_created_at" < "excluded"."event_created_at"') + .then(prop('rowCount') as () => number, () => 0) + } + + private toUpsertRow(event: Event) { const toJSON = (input: any) => JSON.stringify(input) - const row = applySpec({ + return applySpec({ event_id: pipe(prop('id'), toBuffer), event_pubkey: pipe(prop('pubkey'), toBuffer), event_created_at: prop('created_at'), @@ -220,24 +279,6 @@ export class EventRepository implements IEventRepository { ), deleted_at: always(null), })(event) - - const query = this.masterDbClient('events') - .insert(row) - // NIP-16: Replaceable Events - // NIP-33: Parameterized Replaceable Events - .onConflict( - this.masterDbClient.raw( - '(event_pubkey, event_kind, event_deduplication) WHERE (event_kind = 0 OR event_kind = 3 OR event_kind = 41 OR (event_kind >= 10000 AND event_kind < 20000)) OR (event_kind >= 30000 AND event_kind < 40000)' - ) - ) - .merge(omit(['event_pubkey', 'event_kind', 'event_deduplication'])(row)) - .where('events.event_created_at', '<', row.event_created_at) - - return { - then: (onfulfilled: (value: number) => T1 | PromiseLike, onrejected: (reason: any) => T2 | PromiseLike) => query.then(prop('rowCount') as () => number).then(onfulfilled, onrejected), - catch: (onrejected: (reason: any) => T | PromiseLike) => query.catch(onrejected), - toString: (): string => query.toString(), - } as Promise } public deleteByPubkeyAndIds(pubkey: string, eventIdsToDelete: EventId[]): Promise { diff --git a/src/services/event-import-service.ts b/src/services/event-import-service.ts new file mode 100644 index 00000000..83855a93 --- /dev/null +++ b/src/services/event-import-service.ts @@ -0,0 +1,230 @@ +import fs from 'fs' +import readline from 'readline' + +import { + getEventExpiration, + isDeleteEvent, + isEphemeralEvent, + isEventIdValid, + isEventSignatureValid, + isParameterizedReplaceableEvent, + isReplaceableEvent, +} from '../utils/event' +import { attemptValidation } from '../utils/validation' + +import { EventDeduplicationMetadataKey, EventExpirationTimeMetadataKey, EventTags } from '../constants/base' +import { Event } from '../@types/event' +import { eventSchema } from '../schemas/event-schema' +import { IEventRepository } from '../@types/repositories' + +const enrichEventMetadata = (event: Event): Event => { + let enriched: any = event + + const expiration = getEventExpiration(event) + if (expiration) { + enriched = { ...enriched, [EventExpirationTimeMetadataKey]: expiration } + } + + if (isParameterizedReplaceableEvent(event)) { + const [, ...deduplication] = event.tags.find( + (tag) => tag.length >= 2 && tag[0] === EventTags.Deduplication, + ) ?? [null, ''] + enriched = { ...enriched, [EventDeduplicationMetadataKey]: deduplication } + } + + return enriched as Event +} + +const DEFAULT_BATCH_SIZE = 1000 + +export interface EventImportStats { + errors: number + inserted: number + processed: number + skipped: number +} + +export interface EventImportLineError { + lineNumber: number + reason: string +} + +export interface EventImportOptions { + batchSize?: number + onLineError?: (lineError: EventImportLineError) => void + onProgress?: (stats: EventImportStats) => void +} + +const getErrorMessage = (error: unknown): string => { + if (error instanceof Error) { + return error.message + } + + return String(error) +} + +export const createEventBatchPersister = + (eventRepository: IEventRepository) => + async (events: Event[]): Promise => { + if (!events.length) { + return 0 + } + + let inserted = 0 + + const regularEvents: Event[] = [] + const replaceableEvents: Event[] = [] + + for (const event of events) { + if (isEphemeralEvent(event)) { + continue + } + + if (isDeleteEvent(event)) { + // flush pending batches before applying deletes + inserted += await eventRepository.createMany(regularEvents.splice(0)) + inserted += await eventRepository.upsertMany(replaceableEvents.splice(0)) + + const eventIdsToDelete = event.tags.reduce( + (ids, tag) => + tag.length >= 2 + && tag[0] === EventTags.Event + && /^[0-9a-f]{64}$/.test(tag[1]) + ? [...ids, tag[1]] + : ids, + [] as string[] + ) + + if (eventIdsToDelete.length) { + await eventRepository.deleteByPubkeyAndIds(event.pubkey, eventIdsToDelete) + } + + inserted += await eventRepository.create(enrichEventMetadata(event)) + continue + } + + const enrichedEvent = enrichEventMetadata(event) + + if (isReplaceableEvent(event) || isParameterizedReplaceableEvent(event)) { + replaceableEvents.push(enrichedEvent) + continue + } + + regularEvents.push(enrichedEvent) + } + + // flush remaining + inserted += await eventRepository.createMany(regularEvents) + inserted += await eventRepository.upsertMany(replaceableEvents) + + return inserted + } + +export class EventImportService { + public constructor( + private readonly persistBatch: (events: Event[]) => Promise, + ) {} + + public async importFromJsonl( + filePath: string, + options: EventImportOptions = {}, + ): Promise { + const batchSize = ( + typeof options.batchSize === 'number' + && Number.isInteger(options.batchSize) + && options.batchSize > 0 + ) ? options.batchSize : DEFAULT_BATCH_SIZE + + const onLineError = options.onLineError ?? (() => undefined) + const onProgress = options.onProgress ?? (() => undefined) + + const validateEventSchema = attemptValidation(eventSchema) + + const batch: Event[] = [] + const stats: EventImportStats = { + errors: 0, + inserted: 0, + processed: 0, + skipped: 0, + } + + let lineNumber = 0 + + const flushBatch = async () => { + if (!batch.length) { + return + } + + const batchSize = batch.length + const inserted = await this.persistBatch(batch) + + if (!Number.isInteger(inserted) || inserted < 0 || inserted > batchSize) { + throw new Error( + `Invalid insert count (${inserted}) for batch size ${batchSize}`, + ) + } + + stats.inserted += inserted + stats.skipped += batchSize - inserted + batch.length = 0 + + onProgress({ ...stats }) + } + + const stream = fs.createReadStream(filePath, { + encoding: 'utf-8', + }) + + const lineReader = readline.createInterface({ + crlfDelay: Infinity, + input: stream, + }) + + try { + for await (const line of lineReader) { + lineNumber += 1 + + const trimmedLine = line.trim() + if (!trimmedLine.length) { + continue + } + + stats.processed += 1 + + let event: Event + try { + event = validateEventSchema(JSON.parse(trimmedLine)) as Event + + if (!await isEventIdValid(event)) { + throw new Error('invalid: event id does not match') + } + + if (!await isEventSignatureValid(event)) { + throw new Error('invalid: event signature verification failed') + } + } catch (error) { + stats.errors += 1 + onLineError({ + lineNumber, + reason: getErrorMessage(error), + }) + + continue + } + + batch.push(event) + + if (batch.length >= batchSize) { + await flushBatch() + } + } + + await flushBatch() + + return stats + } finally { + lineReader.close() + stream.destroy() + } + } +} diff --git a/test/unit/services/event-import-service.spec.ts b/test/unit/services/event-import-service.spec.ts new file mode 100644 index 00000000..f6b5572c --- /dev/null +++ b/test/unit/services/event-import-service.spec.ts @@ -0,0 +1,181 @@ +import { join } from 'path' + +import fs from 'fs' +import os from 'os' + +import { + EventImportLineError, + EventImportService, + EventImportStats, +} from '../../../src/services/event-import-service' +import { Event } from '../../../src/@types/event' +import { expect } from 'chai' +import { getEvents } from '../data/events' + +describe('EventImportService', () => { + const tmpDirs: string[] = [] + + const createJsonlFile = (lines: string[]): string => { + const tmpDir = fs.mkdtempSync(join(os.tmpdir(), 'nostream-import-')) + tmpDirs.push(tmpDir) + + const filePath = join(tmpDir, 'events.jsonl') + + fs.writeFileSync(filePath, lines.join('\n'), { + encoding: 'utf-8', + }) + + return filePath + } + + afterEach(() => { + for (const tmpDir of tmpDirs.splice(0)) { + fs.rmSync(tmpDir, { + force: true, + recursive: true, + }) + } + }) + + it('imports valid events in batches and tracks skipped duplicates', async () => { + const [event] = getEvents() + const filePath = createJsonlFile([ + JSON.stringify(event), + JSON.stringify(event), + JSON.stringify(event), + ]) + + const batchCalls: Event[][] = [] + const persistBatch = async (events: Event[]): Promise => { + batchCalls.push([...events]) + + if (batchCalls.length === 1) { + return 2 + } + + return 0 + } + + const progressUpdates: EventImportStats[] = [] + + const importer = new EventImportService(persistBatch) + + const stats = await importer.importFromJsonl(filePath, { + batchSize: 2, + onProgress: (progress) => { + progressUpdates.push(progress) + }, + }) + + expect(stats).to.deep.equal({ + errors: 0, + inserted: 2, + processed: 3, + skipped: 1, + }) + + expect(batchCalls.length).to.equal(2) + + const [firstBatch, secondBatch] = batchCalls + + expect(firstBatch.map(({ id }) => id)).to.deep.equal([event.id, event.id]) + expect(secondBatch.map(({ id }) => id)).to.deep.equal([event.id]) + + expect(progressUpdates.length).to.equal(2) + + const finalProgress = progressUpdates[progressUpdates.length - 1] + + expect(finalProgress).to.deep.equal(stats) + }) + + it('counts malformed and invalid events as errors and keeps importing', async () => { + const [event] = getEvents() + + const invalidIdEvent: Event = { + ...event, + content: `${event.content} changed`, + } + + const invalidSignatureEvent: Event = { + ...event, + sig: 'f'.repeat(128), + } + + const filePath = createJsonlFile([ + JSON.stringify(event), + '{not-json}', + JSON.stringify(invalidIdEvent), + JSON.stringify(invalidSignatureEvent), + ]) + + const batchCalls: Event[][] = [] + const persistBatch = async (events: Event[]): Promise => { + batchCalls.push([...events]) + + return 1 + } + + const lineErrors: EventImportLineError[] = [] + + const importer = new EventImportService(persistBatch) + + const stats = await importer.importFromJsonl(filePath, { + batchSize: 10, + onLineError: (lineError) => { + lineErrors.push(lineError) + }, + }) + + expect(stats).to.deep.equal({ + errors: 3, + inserted: 1, + processed: 4, + skipped: 0, + }) + + expect(batchCalls.length).to.equal(1) + expect(batchCalls[0].length).to.equal(1) + expect(lineErrors.length).to.equal(3) + }) + + it('rejects when persistence returns an invalid insert count', async () => { + const [event] = getEvents() + const filePath = createJsonlFile([JSON.stringify(event)]) + + const persistBatch = async (): Promise => 2 + + const importer = new EventImportService(persistBatch) + + try { + await importer.importFromJsonl(filePath) + expect.fail('Expected import to reject when persistence returns invalid insert count') + } catch (error) { + expect((error as Error).message).to.include('Invalid insert count') + } + }) + + it('propagates persistence failures as import failures', async () => { + const [event] = getEvents() + const filePath = createJsonlFile([JSON.stringify(event)]) + + const persistBatch = async (): Promise => { + throw new Error('database unavailable') + } + + const lineErrors: EventImportLineError[] = [] + + const importer = new EventImportService(persistBatch) + + try { + await importer.importFromJsonl(filePath, { + onLineError: (lineError) => { + lineErrors.push(lineError) + }, + }) + expect.fail('Expected import to reject when persistence fails') + } catch (error) { + expect((error as Error).message).to.equal('database unavailable') + expect(lineErrors.length).to.equal(0) + } + }) +})