Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/common/utils/db-errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { QueryFailedError } from 'typeorm';

/**
* Cross-dialect unique-constraint-violation check by driver code/message, for the two dialects we ship
* (sqlite dev, postgres prod). Lets insert-or-converge (RMW) paths distinguish a real duplicate from an
* unrelated failure without depending on a specific driver. Add another branch if a third driver is ever
* supported.
*/
export function isUniqueViolation(err: unknown): boolean {
if (!(err instanceof QueryFailedError)) return false;
const driver = err.driverError as { code?: string; message?: string } | undefined;
const code = driver?.code ?? '';
const message = driver?.message ?? err.message ?? '';
return code === '23505' /* postgres */ || /UNIQUE constraint failed|SQLITE_CONSTRAINT/i.test(message);
}
28 changes: 28 additions & 0 deletions src/database/migrations/1782100000000-WidenIngressDedupKey.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { MigrationInterface, QueryRunner } from 'typeorm';

/**
* Widen the inbound ingress dedup key from (instanceId, providerDeliveryId) to
* (pluginId, instanceId, providerDeliveryId).
*
* instanceId is a caller-supplied string that is only unique WITHIN a plugin, so two different plugins
* sharing the same instanceId string would collide on the old 2-column unique index and a legitimate
* delivery for the second plugin would be dropped as a false "duplicate". pluginId is already stored on
* every row (recordOrSkip inserts it), so this is a pure loosening of the constraint — no data loss, no
* false-negative risk. Kept as a stable-named DROP/CREATE unique index (portable to sqlite + postgres),
* mirroring AddIntegrationFabric's index style.
*/
export class WidenIngressDedupKey1782100000000 implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP INDEX IF EXISTS "UQ_ingress_events_instance_delivery"`);
await queryRunner.query(
`CREATE UNIQUE INDEX "UQ_ingress_events_instance_delivery" ON "ingress_events" ("pluginId", "instanceId", "providerDeliveryId")`,
);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP INDEX IF EXISTS "UQ_ingress_events_instance_delivery"`);
await queryRunner.query(
`CREATE UNIQUE INDEX "UQ_ingress_events_instance_delivery" ON "ingress_events" ("instanceId", "providerDeliveryId")`,
);
}
}
18 changes: 17 additions & 1 deletion src/modules/integration/conversation-mapping.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { DataSource } from 'typeorm';
import { ConversationMapping } from './entities/conversation-mapping.entity';
import { ConversationMappingService, MappingKey } from './conversation-mapping.service';
import { ConversationMappingConflict, ConversationMappingService, MappingKey } from './conversation-mapping.service';
import { AddIntegrationFabric1781900000000 } from '../../database/migrations/1781900000000-AddIntegrationFabric';

describe('ConversationMappingService', () => {
Expand Down Expand Up @@ -51,6 +51,22 @@ describe('ConversationMappingService', () => {
expect(stale).toBeNull();
});

it('rethrows ConversationMappingConflict when a providerConversationId is already bound to a different chat', async () => {
// Reverse unique key (pluginId, instanceId, providerConversationId): binding conv-1 to chat-1 then to
// a different chat-2 for the same plugin+instance is a genuine conflict with no forward row to
// converge onto — it must surface, not silently corrupt or fail-soft to a nonexistent row.
await service.upsert(key, 'conv-1');
await expect(
service.upsert({ sessionId: 'sess-1', chatId: 'chat-2', pluginId: 'chatwoot', instanceId: 'acct1' }, 'conv-1'),
).rejects.toBeInstanceOf(ConversationMappingConflict);
});

it('converges (updates, does not throw) when the same forward key already exists', async () => {
await service.upsert(key, 'conv-1');
await expect(service.upsert(key, 'conv-9')).resolves.toBeUndefined();
expect((await service.get(key))?.providerConversationId).toBe('conv-9');
});

it('findHandoverForChat returns any human/closed row for the chat, ignoring pluginId', async () => {
// faq-bot keeps a bot mapping; chatwoot-adapter takes the same chat over (human).
await service.upsert({ sessionId: 'sess-1', chatId: 'chat-1', pluginId: 'faq-bot', instanceId: 'i1' }, 'convA');
Expand Down
54 changes: 51 additions & 3 deletions src/modules/integration/conversation-mapping.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { QueryDeepPartialEntity, Repository } from 'typeorm';
import { ConversationMapping, HandoverState } from './entities/conversation-mapping.entity';
import { isUniqueViolation } from '../../common/utils/db-errors';

export interface MappingKey {
sessionId: string;
Expand All @@ -10,20 +11,67 @@ export interface MappingKey {
instanceId: string;
}

/**
* Thrown when a providerConversationId is already bound to a DIFFERENT chat for the same plugin+instance
* (the reverse unique key). Unlike a forward-key race — which converges by updating the existing row —
* this is a genuine conflict with no row to fall back to, so it surfaces instead of corrupting state.
*/
export class ConversationMappingConflict extends Error {
constructor(
readonly key: MappingKey,
readonly providerConversationId: string,
) {
super(
`conversation mapping conflict: providerConversationId "${providerConversationId}" is already bound to ` +
`a different chat for plugin "${key.pluginId}" instance "${key.instanceId}"`,
);
this.name = 'ConversationMappingConflict';
}
}

@Injectable()
export class ConversationMappingService {
constructor(@InjectRepository(ConversationMapping, 'data') private readonly repo: Repository<ConversationMapping>) {}

async upsert(key: MappingKey, providerConversationId: string, patch?: Partial<ConversationMapping>): Promise<void> {
const existing = await this.repo.findOne({ where: key });
if (existing) {
await this.repo.update({ id: existing.id }, {
await this.updateById(existing.id, key, providerConversationId, patch);
return;
}
try {
await this.repo.save(this.repo.create({ ...key, providerConversationId, handoverState: 'bot', ...patch }));
} catch (err) {
if (!isUniqueViolation(err)) throw err;
// A concurrent writer inserted between our findOne and save (forward-key race) OR the reverse
// unique (pluginId,instanceId,providerConversationId) is bound to another chat. Re-read the
// FORWARD key: found → converge by updating it; not found → genuine reverse conflict → surface.
const raced = await this.repo.findOne({ where: key });
if (raced) {
await this.updateById(raced.id, key, providerConversationId, patch);
return;
}
throw new ConversationMappingConflict(key, providerConversationId);
}
}

// Update guarded against a reverse-unique collision: moving a row's providerConversationId onto a value
// already bound to another chat throws ConversationMappingConflict rather than a raw QueryFailedError.
private async updateById(
id: string,
key: MappingKey,
providerConversationId: string,
patch?: Partial<ConversationMapping>,
): Promise<void> {
try {
await this.repo.update({ id }, {
providerConversationId,
...patch,
} as QueryDeepPartialEntity<ConversationMapping>);
return;
} catch (err) {
if (isUniqueViolation(err)) throw new ConversationMappingConflict(key, providerConversationId);
throw err;
}
await this.repo.save(this.repo.create({ ...key, providerConversationId, handoverState: 'bot', ...patch }));
}

get(key: MappingKey): Promise<ConversationMapping | null> {
Expand Down
6 changes: 4 additions & 2 deletions src/modules/integration/entities/ingress-event.entity.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { Column, CreateDateColumn, Entity, Index, PrimaryColumn } from 'typeorm';
import { jsonColumnType } from '../../../common/utils/column-types';

// Persist-before-ack durable row + inbound dedup oracle. UNIQUE(instanceId, providerDeliveryId).
// Persist-before-ack durable row + inbound dedup oracle. UNIQUE(pluginId, instanceId, providerDeliveryId):
// instanceId is only unique within a plugin, so pluginId must be part of the key or two plugins sharing an
// instanceId string would drop each other's deliveries as false duplicates.
@Entity('ingress_events')
@Index('UQ_ingress_events_instance_delivery', ['instanceId', 'providerDeliveryId'], { unique: true })
@Index('UQ_ingress_events_instance_delivery', ['pluginId', 'instanceId', 'providerDeliveryId'], { unique: true })
@Index('IDX_ingress_events_createdAt', ['createdAt'])
export class IngressEvent {
// Host-minted uuid (crypto.randomUUID()), NOT DB-generated — the id and the jobId (= deliveryId)
Expand Down
12 changes: 6 additions & 6 deletions src/modules/integration/ingress-enqueue.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ describe('IngressEnqueueService', () => {
(config.get as jest.Mock).mockReturnValue(true);
const svc = new IngressEnqueueService(loader as PluginLoaderService, config as ConfigService, queue as never);

await svc.enqueue(data, 'd1');
expect(await svc.enqueue(data, 'd1')).toEqual({ outcome: 'queued' });

expect(queue.add).toHaveBeenCalledWith('ingress', data, { jobId: 'd1' });
expect(loader.dispatchWebhookForInstance).not.toHaveBeenCalled();
Expand All @@ -36,7 +36,7 @@ describe('IngressEnqueueService', () => {
(config.get as jest.Mock).mockReturnValue(false);
const svc = new IngressEnqueueService(loader as PluginLoaderService, config as ConfigService, queue as never);

await svc.enqueue(data, 'd1');
expect(await svc.enqueue(data, 'd1')).toEqual({ outcome: 'dispatched' });

expect(queue.add).not.toHaveBeenCalled();
expect(loader.dispatchWebhookForInstance).toHaveBeenCalledWith(data);
Expand All @@ -46,17 +46,17 @@ describe('IngressEnqueueService', () => {
(config.get as jest.Mock).mockReturnValue(true);
const svc = new IngressEnqueueService(loader as PluginLoaderService, config as ConfigService, undefined);

await svc.enqueue(data, 'd1');
expect(await svc.enqueue(data, 'd1')).toEqual({ outcome: 'dispatched' });

expect(loader.dispatchWebhookForInstance).toHaveBeenCalledWith(data);
});

it('swallows an inline dispatch error and logs it rather than throwing (row already persisted for redrive)', async () => {
it('swallows an inline dispatch error and returns outcome "failed" rather than throwing (row stays redrivable)', async () => {
(loader.dispatchWebhookForInstance as jest.Mock).mockRejectedValue(new Error('boom'));
(config.get as jest.Mock).mockReturnValue(false);
const svc = new IngressEnqueueService(loader as PluginLoaderService, config as ConfigService, undefined);

await expect(svc.enqueue(data, 'd1')).resolves.toBeUndefined();
expect(await svc.enqueue(data, 'd1')).toEqual({ outcome: 'failed' });
});

it('falls back to inline dispatch (never throws) when queue.add() fails, e.g. Redis unreachable', async () => {
Expand All @@ -66,7 +66,7 @@ describe('IngressEnqueueService', () => {
queue.add.mockRejectedValue(new Error('Redis connection is closed'));
const svc = new IngressEnqueueService(loader as PluginLoaderService, config as ConfigService, queue as never);

await expect(svc.enqueue(data, 'd1')).resolves.toBeUndefined();
expect(await svc.enqueue(data, 'd1')).toEqual({ outcome: 'dispatched' });
expect(queue.add).toHaveBeenCalledWith('ingress', data, { jobId: 'd1' });
expect(loader.dispatchWebhookForInstance).toHaveBeenCalledWith(data);
});
Expand Down
14 changes: 12 additions & 2 deletions src/modules/integration/ingress-enqueue.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ import { IngressJobData } from '../queue/processors/ingress.processor';
import { QUEUE_NAMES } from '../queue/queue-names';
import { createLogger } from '../../common/services/logger.service';

/**
* Outcome of an enqueue attempt. 'queued' = handed to BullMQ; 'dispatched' = delivered inline; 'failed'
* = inline dispatch threw and was swallowed (the row stays durable for a redrive). enqueue() never
* throws, so callers use the outcome (not exceptions) to decide durability follow-up (e.g. redrive).
*/
export type EnqueueOutcome = { outcome: 'queued' | 'dispatched' | 'failed' };

/**
* Shared queue-or-inline enqueue for inbound ingress jobs. Extracted out of IngressService's DI
* factory (integration.module.ts) so RedriveService can reuse the exact same behavior when replaying
Expand All @@ -24,15 +31,15 @@ export class IngressEnqueueService {
@Optional() @InjectQueue(QUEUE_NAMES.INGRESS) private readonly ingressQueue?: Queue<IngressJobData>,
) {}

async enqueue(data: IngressJobData, jobId: string): Promise<void> {
async enqueue(data: IngressJobData, jobId: string): Promise<EnqueueOutcome> {
const queueEnabled = this.config.get<boolean>('queue.enabled', false);
const useQueue = queueEnabled && !!this.ingressQueue;

if (useQueue && this.ingressQueue) {
try {
// jobId = deliveryId gives BullMQ exactly-once enqueue semantics.
await this.ingressQueue.add('ingress', data, { jobId });
return;
return { outcome: 'queued' };
} catch (err) {
// Redis unreachable (enableOfflineQueue:false makes add() reject) — fall through to inline
// dispatch. Without this, the already-persisted event would be lost forever: the throw would
Expand All @@ -55,17 +62,20 @@ export class IngressEnqueueService {
// (persist-before-dispatch still holds), mirroring the webhook direct-delivery fallback.
try {
await this.loader.dispatchWebhookForInstance(data);
return { outcome: 'dispatched' };
} catch (err) {
// A duplicate delivery already 200s before this point, so a failure here is a real
// dispatch error. Log and swallow: the row is durably persisted for a later redrive,
// and the provider still gets its 202 (at-least-once, like the webhook fallback).
// The 'failed' outcome lets RedriveService avoid marking a DLQ row handled on a silent drop.
this.logger.error('Inline ingress dispatch failed', err instanceof Error ? err.message : String(err), {
pluginId: data.pluginId,
instanceId: data.instanceId,
route: data.route,
deliveryId: data.deliveryId,
action: 'ingress_inline_dispatch_failed',
});
return { outcome: 'failed' };
}
}
}
8 changes: 8 additions & 0 deletions src/modules/integration/ingress-event.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { DataSource } from 'typeorm';
import { IngressEvent } from './entities/ingress-event.entity';
import { IngressEventService } from './ingress-event.service';
import { AddIntegrationFabric1781900000000 } from '../../database/migrations/1781900000000-AddIntegrationFabric';
import { WidenIngressDedupKey1782100000000 } from '../../database/migrations/1782100000000-WidenIngressDedupKey';

describe('IngressEventService.recordOrSkip', () => {
let ds: DataSource;
Expand All @@ -11,6 +12,7 @@ describe('IngressEventService.recordOrSkip', () => {
await ds.initialize();
const runner = ds.createQueryRunner();
await new AddIntegrationFabric1781900000000().up(runner);
await new WidenIngressDedupKey1782100000000().up(runner);
await runner.release();
service = new IngressEventService(ds.getRepository(IngressEvent));
});
Expand All @@ -36,4 +38,10 @@ describe('IngressEventService.recordOrSkip', () => {
expect(await service.recordOrSkip(row())).toBe(true);
expect(await service.recordOrSkip({ ...row(), instanceId: 'inst2' })).toBe(true);
});

it('treats the same instance+delivery id under a different plugin as new (dedup key includes pluginId)', async () => {
// instanceId is only unique within a plugin; two plugins sharing the string must not collide.
expect(await service.recordOrSkip(row())).toBe(true);
expect(await service.recordOrSkip({ ...row(), pluginId: 'other-plug' })).toBe(true);
});
});
13 changes: 2 additions & 11 deletions src/modules/integration/ingress-event.service.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,9 @@
import { randomUUID } from 'node:crypto';
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { QueryFailedError, Repository } from 'typeorm';
import { Repository } from 'typeorm';
import { IngressEvent } from './entities/ingress-event.entity';

// Cross-dialect unique-violation check by driver code/message — the two dialects we ship
// (sqlite dev, postgres prod). Add another branch if a third driver is ever supported.
export function isUniqueViolation(err: unknown): boolean {
if (!(err instanceof QueryFailedError)) return false;
const driver = err.driverError as { code?: string; message?: string } | undefined;
const code = driver?.code ?? '';
const message = driver?.message ?? err.message ?? '';
return code === '23505' /* postgres */ || /UNIQUE constraint failed|SQLITE_CONSTRAINT/i.test(message);
}
import { isUniqueViolation } from '../../common/utils/db-errors';

export interface IngressEventInput {
instanceId: string;
Expand Down
14 changes: 13 additions & 1 deletion src/modules/integration/ingress.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ describe('IngressService.handle', () => {
expect((await svc.handle(req)).status).toBe(404);
});

it('falls back to a generated delivery id when the dedup header is absent', async () => {
it('derives a DETERMINISTIC delivery id from the body when the dedup header is absent', async () => {
// A random UUID here would defeat both persist-dedup and BullMQ jobId idempotency, so a provider
// retry of the same body must produce the SAME id, and a different body a DIFFERENT id.
const d = deps();
const svc = new IngressService(d);
const res = await svc.handle({ ...req, headers: {} });
Expand All @@ -202,6 +204,16 @@ describe('IngressService.handle', () => {
expect(typeof jobId).toBe('string');
expect(jobId.length).toBeGreaterThan(0);
expect(jobData.deliveryId).toBe(jobId);

// same body → same id (retry dedups)
const d2 = deps();
await new IngressService(d2).handle({ ...req, headers: {} });
expect((d2.enqueue.mock.calls[0] as [unknown, string])[1]).toBe(jobId);

// different body → different id
const d3 = deps();
await new IngressService(d3).handle({ ...req, headers: {}, rawBody: '{"a":1}' });
expect((d3.enqueue.mock.calls[0] as [unknown, string])[1]).not.toBe(jobId);
});
});

Expand Down
18 changes: 15 additions & 3 deletions src/modules/integration/ingress.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { randomUUID } from 'node:crypto';
import { createHash } from 'node:crypto';
import { verifyIngressSignature } from './ingress-signature';
import { PluginIngressRoute } from '../../core/plugins/plugin.interfaces';
import { IngressJobData } from '../queue/processors/ingress.processor';
Expand Down Expand Up @@ -40,7 +40,9 @@ export interface IngressDeps {
sessionId: string | null;
}): Promise<boolean>;
};
enqueue: (data: IngressJobData, jobId: string) => Promise<void>;
// Returns an enqueue outcome (queued/dispatched/failed); handle() ignores it — only durability
// follow-up paths like redrive act on it. Typed as unknown here to keep this pure module decoupled.
enqueue: (data: IngressJobData, jobId: string) => Promise<unknown>;
now: () => number;
}

Expand Down Expand Up @@ -79,7 +81,7 @@ export class IngressService {
if (!verdict.ok) return { status: 401, body: verdict.reason ?? 'signature verification failed' };

const dedupHeader = (route.dedupHeader ?? route.signature.dedupHeader ?? 'x-delivery').toLowerCase();
const deliveryId = req.headers[dedupHeader] ?? randomUUID();
const deliveryId = req.headers[dedupHeader] ?? deriveDeliveryId(req);
const payload = { headers: req.headers, query: req.query, body: req.rawBody, rawBody: req.rawBody };
const isNew = await this.deps.events.recordOrSkip({
instanceId: req.instanceId,
Expand Down Expand Up @@ -110,6 +112,16 @@ export class IngressService {
}
}

/**
* Derives a DETERMINISTIC delivery id when the provider sends no dedup header, so a provider retry of
* the same delivery dedups instead of being treated as new. A random UUID would silently disable both
* the persist-dedup and BullMQ's jobId idempotency, causing duplicate downstream WhatsApp sends. Keyed
* on pluginId + instanceId + route + rawBody ONLY — never a server timestamp, which would defeat dedup.
*/
function deriveDeliveryId(req: IngressRequest): string {
return createHash('sha256').update([req.pluginId, req.instanceId, req.route, req.rawBody].join('\0')).digest('hex');
}

/**
* Extracts the provider conversation id from a declared header or a JSON pointer into the body.
* Returns undefined when no pointer is declared or extraction fails — the P1 lock then keys per
Expand Down
Loading
Loading