Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions apps/api/src/.example.env
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,8 @@ CLICK_HOUSE_URL=http://127.0.0.1:8123
CLICK_HOUSE_USER=default
CLICK_HOUSE_PASSWORD=
CLICK_HOUSE_DATABASE=novu-local

# Cloudflare Scheduler (for delayed job scheduling)
SCHEDULER_URL=
SCHEDULER_API_KEY=
SCHEDULER_CALLBACK_API_KEY=
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import { HttpException, HttpStatus, Injectable, InternalServerErrorException, NotFoundException } from '@nestjs/common';
import {
AnalyticsService,
CloudflareSchedulerService,
CreateExecutionDetails,
CreateExecutionDetailsCommand,
DetailEnum,
FeatureFlagsService,
PinoLogger,
SchedulerJobType,
StandardQueueService,
} from '@novu/application-generic';
import {
Expand All @@ -18,9 +21,11 @@ import {
import {
ApiServiceLevelEnum,
ChannelTypeEnum,
CloudflareSchedulerMode,
ExecutionDetailsSourceEnum,
ExecutionDetailsStatusEnum,
FeatureNameEnum,
FeatureFlagsKeysEnum,
getFeatureForTierAsNumber,
JobStatusEnum,
} from '@novu/shared';
Expand All @@ -43,7 +48,9 @@ export class SnoozeNotification {
private organizationRepository: CommunityOrganizationRepository,
private createExecutionDetails: CreateExecutionDetails,
private markNotificationAs: MarkNotificationAs,
private analyticsService: AnalyticsService
private analyticsService: AnalyticsService,
private cloudflareSchedulerService: CloudflareSchedulerService,
private featureFlagsService: FeatureFlagsService
) {}

public async execute(command: SnoozeNotificationCommand): Promise<InboxNotification> {
Expand Down Expand Up @@ -92,15 +99,101 @@ export class SnoozeNotification {
}

public async enqueueJob(job: JobEntity, delay: number) {
this.logger.info({ jobId: job._id, delay }, 'Adding snooze job to Standard Queue');
this.logger.info({ jobId: job._id, delay }, 'Processing snooze job scheduling');

const organization = await this.getOrganization(job._organizationId);
if (!organization) {
this.logger.warn({ organizationId: job._organizationId }, 'Organization not found, falling back to BullMQ');
await this.addToBullMQ(job, delay, false);

return;
}

const schedulerMode = await this.featureFlagsService.getFlag<string>({
key: FeatureFlagsKeysEnum.CF_SCHEDULER_MODE,
defaultValue: CloudflareSchedulerMode.OFF,
organization: { _id: job._organizationId, apiServiceLevel: organization.apiServiceLevel },
environment: { _id: job._environmentId },
});

const hasDelay = delay > 0;
const shouldUseCFScheduler = schedulerMode !== CloudflareSchedulerMode.OFF && hasDelay;

this.logger.debug(
{
jobId: job._id,
schedulerMode,
hasDelay,
shouldUseCFScheduler,
delay,
},
'CF Scheduler mode evaluation for snooze job'
);

if (shouldUseCFScheduler) {
await this.handleCFSchedulerMode(job, delay, schedulerMode as CloudflareSchedulerMode);
} else {
await this.addToBullMQ(job, delay, false);
}
}

private async handleCFSchedulerMode(job: JobEntity, delay: number, mode: CloudflareSchedulerMode) {
const schedulerRequest = {
jobId: job._id,
type: SchedulerJobType.SNOOZE,
delayMs: delay,
data: {
_environmentId: job._environmentId,
_id: job._id,
_organizationId: job._organizationId,
_userId: job._userId,
},
metadata: {
mode,
workflowId: job._templateId,
subscriberId: job._subscriberId,
stepId: job.step?.stepId,
},
};

switch (mode) {
case CloudflareSchedulerMode.SHADOW:
this.logger.info({ jobId: job._id }, 'Shadow mode: BullMQ will process, CF Scheduler for validation');
await this.cloudflareSchedulerService.scheduleJob(schedulerRequest);
await this.addToBullMQ(job, delay, false); // No flag - this is the real one
break;

case CloudflareSchedulerMode.LIVE:
this.logger.info({ jobId: job._id }, 'Live mode: CF Scheduler will process, BullMQ is shadow');
await this.cloudflareSchedulerService.scheduleJob(schedulerRequest);
await this.addToBullMQ(job, delay, true); // skipProcessing: true - this is shadow
break;

case CloudflareSchedulerMode.COMPLETE:
this.logger.info({ jobId: job._id }, 'Complete mode: Adding snooze job only to CF Scheduler');
await this.cloudflareSchedulerService.scheduleJob(schedulerRequest);
break;

default:
this.logger.warn({ mode }, 'Unknown CF Scheduler mode for snooze, falling back to BullMQ');
await this.addToBullMQ(job, delay, false);
}
}

private async addToBullMQ(job: JobEntity, delay: number, skipProcessing: boolean) {
const jobData = {
_environmentId: job._environmentId,
_id: job._id,
_organizationId: job._organizationId,
_userId: job._userId,
...(skipProcessing && { skipProcessing: true }),
};

this.logger.info(
{ jobId: job._id, delay, skipProcessing },
skipProcessing ? 'Adding snooze job to BullMQ with skipProcessing flag' : 'Adding snooze job to BullMQ'
);

await this.standardQueueService.add({
name: job._id,
data: jobData,
Expand Down
35 changes: 35 additions & 0 deletions apps/api/src/app/internal/dtos/scheduler-callback.dto.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { IsDefined, IsNumber, IsObject, IsOptional, IsString } from 'class-validator';

export class SchedulerCallbackRequestDto {
@IsDefined()
@IsString()
jobId: string;

@IsDefined()
@IsString()
type: string;

@IsDefined()
@IsObject()
data: {
_environmentId: string;
_id: string;
_organizationId: string;
_userId: string;
};

@IsOptional()
@IsObject()
metadata?: {
mode?: string;
workflowId?: string;
subscriberId?: string;
stepId?: string;
};
}

export class SchedulerCallbackResponseDto {
success: boolean;
jobId: string;
}

27 changes: 27 additions & 0 deletions apps/api/src/app/internal/guards/scheduler-callback.guard.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { CanActivate, ExecutionContext, Injectable, UnauthorizedException } from '@nestjs/common';

@Injectable()
export class SchedulerCallbackGuard implements CanActivate {
canActivate(context: ExecutionContext): boolean {
const request = context.switchToHttp().getRequest();

const authHeader = request.headers['authorization'];
if (!authHeader) {
throw new UnauthorizedException('Authorization header is missing');
}

const token = authHeader.replace('Bearer ', '');
const expectedApiKey = process.env.SCHEDULER_CALLBACK_API_KEY;

if (!expectedApiKey) {
throw new UnauthorizedException('SCHEDULER_CALLBACK_API_KEY is not configured');
}

if (token !== expectedApiKey) {
throw new UnauthorizedException('Invalid scheduler callback API key');
}

return true;
}
}

23 changes: 22 additions & 1 deletion apps/api/src/app/internal/internal.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,24 @@ import { Body, Controller, HttpCode, HttpStatus, Post, UseGuards } from '@nestjs
import { AuthGuard } from '@nestjs/passport';
import { ApiExcludeController } from '@nestjs/swagger';
import { SubscriberSession } from '../shared/framework/user.decorator';
import { SchedulerCallbackRequestDto, SchedulerCallbackResponseDto } from './dtos/scheduler-callback.dto';
import {
UpdateSubscriberOnlineStateRequestDto,
UpdateSubscriberOnlineStateResponseDto,
} from './dtos/subscriber-online-state.dto';
import { SchedulerCallbackGuard } from './guards/scheduler-callback.guard';
import { HandleSchedulerCallbackCommand } from './usecases/handle-scheduler-callback/handle-scheduler-callback.command';
import { HandleSchedulerCallback } from './usecases/handle-scheduler-callback/handle-scheduler-callback.usecase';
import { UpdateSubscriberOnlineStateCommand } from './usecases/update-subscriber-online-state/update-subscriber-online-state.command';
import { UpdateSubscriberOnlineState } from './usecases/update-subscriber-online-state/update-subscriber-online-state.usecase';

@Controller('/internal')
@ApiExcludeController()
export class InternalController {
constructor(private readonly updateSubscriberOnlineStateUsecase: UpdateSubscriberOnlineState) {}
constructor(
private readonly updateSubscriberOnlineStateUsecase: UpdateSubscriberOnlineState,
private readonly handleSchedulerCallbackUsecase: HandleSchedulerCallback
) {}

@Post('/subscriber-online-state')
@UseGuards(AuthGuard('subscriberJwt'))
Expand All @@ -30,4 +37,18 @@ export class InternalController {

return await this.updateSubscriberOnlineStateUsecase.execute(command);
}

@Post('/scheduler/callback')
@UseGuards(SchedulerCallbackGuard)
@HttpCode(HttpStatus.OK)
async handleSchedulerCallback(@Body() body: SchedulerCallbackRequestDto): Promise<SchedulerCallbackResponseDto> {
const command = HandleSchedulerCallbackCommand.create({
jobId: body.jobId,
type: body.type,
data: body.data,
metadata: body.metadata,
});

return await this.handleSchedulerCallbackUsecase.execute(command);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { IsDefined, IsObject, IsOptional, IsString } from 'class-validator';

export class HandleSchedulerCallbackCommand {
@IsDefined()
@IsString()
jobId: string;

@IsDefined()
@IsString()
type: string;

@IsDefined()
@IsObject()
data: {
_environmentId: string;
_id: string;
_organizationId: string;
_userId: string;
};

@IsOptional()
@IsObject()
metadata?: {
mode?: string;
workflowId?: string;
subscriberId?: string;
stepId?: string;
};

static create(dto: HandleSchedulerCallbackCommand) {
const command = new HandleSchedulerCallbackCommand();
Object.assign(command, dto);

return command;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { Injectable } from '@nestjs/common';
import { PinoLogger, StandardQueueService } from '@novu/application-generic';
import { CloudflareSchedulerMode } from '@novu/shared';
import { HandleSchedulerCallbackCommand } from './handle-scheduler-callback.command';

@Injectable()
export class HandleSchedulerCallback {
constructor(
private standardQueueService: StandardQueueService,
private logger: PinoLogger
) {
this.logger.setContext(HandleSchedulerCallback.name);
}

async execute(command: HandleSchedulerCallbackCommand): Promise<{ success: boolean; jobId: string }> {
const modeFromMetadata = command.metadata?.mode;
const shouldSkipProcessing = modeFromMetadata === CloudflareSchedulerMode.SHADOW;

this.logger.info(
{
jobId: command.jobId,
type: command.type,
mode: modeFromMetadata,
shouldSkipProcessing,
},
'Received scheduler callback'
);

const jobData = {
_environmentId: command.data._environmentId,
_id: command.data._id,
_organizationId: command.data._organizationId,
_userId: command.data._userId,
...(shouldSkipProcessing && { skipProcessing: true }),
};

await this.standardQueueService.add({
name: command.jobId,
data: jobData,
groupId: command.data._organizationId,
options: { delay: 0 },
});

this.logger.info(
{
jobId: command.jobId,
skipProcessing: shouldSkipProcessing,
},
'Job enqueued to BullMQ from scheduler callback'
);

return { success: true, jobId: command.jobId };
}
}

5 changes: 3 additions & 2 deletions apps/api/src/app/internal/usecases/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { HandleSchedulerCallback } from './handle-scheduler-callback/handle-scheduler-callback.usecase';
import { UpdateSubscriberOnlineState } from './update-subscriber-online-state/update-subscriber-online-state.usecase';

export const USE_CASES = [UpdateSubscriberOnlineState];
export const USE_CASES = [UpdateSubscriberOnlineState, HandleSchedulerCallback];

export { UpdateSubscriberOnlineState };
export { HandleSchedulerCallback, UpdateSubscriberOnlineState };
2 changes: 2 additions & 0 deletions apps/api/src/app/shared/shared.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { JwtModule } from '@nestjs/jwt';
import {
analyticsService,
CacheServiceHealthIndicator,
CloudflareSchedulerService,
ComputeJobWaitDurationService,
CreateExecutionDetails,
cacheService,
Expand Down Expand Up @@ -128,6 +129,7 @@ const PROVIDERS = [
analyticsService,
cacheService,
CacheServiceHealthIndicator,
CloudflareSchedulerService,
ComputeJobWaitDurationService,
dalService,
DalServiceHealthIndicator,
Expand Down
3 changes: 3 additions & 0 deletions apps/api/src/config/env.validators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ export const envValidators = {
NOVU_REGION: str({ default: 'local' }),
NOVU_SECRET_KEY: str({ default: '' }),
INTERNAL_SERVICES_API_KEY: str({ default: undefined }),
SCHEDULER_URL: str({ default: undefined }),
SCHEDULER_API_KEY: str({ default: undefined }),
SCHEDULER_CALLBACK_API_KEY: str({ default: undefined }),
// Novu Cloud third party services
...(processEnv.IS_SELF_HOSTED !== 'true' &&
processEnv.NOVU_ENTERPRISE === 'true' && {
Expand Down
4 changes: 4 additions & 0 deletions apps/worker/src/.example.env
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,7 @@ CLICK_HOUSE_URL=http://127.0.0.1:8123
CLICK_HOUSE_USER=default
CLICK_HOUSE_PASSWORD=
CLICK_HOUSE_DATABASE=novu-local

# Cloudflare Scheduler (for delayed job scheduling)
SCHEDULER_URL=
SCHEDULER_API_KEY=
Loading
Loading