Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/backend/drizzle/0009_push_enabled.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE "user_devices" ADD COLUMN "push_enabled" boolean DEFAULT true NOT NULL;
2 changes: 2 additions & 0 deletions apps/backend/drizzle/meta/_journal.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
{
"idx": 9,
"version": "7",
"when": 1785000000000,
"tag": "0009_push_enabled",
"when": 1783500000000,
"tag": "0009_message_edits",
"breakpoints": true
Expand Down
2 changes: 2 additions & 0 deletions apps/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"@aws-sdk/s3-request-presigner": "^3.1075.0",
"@socket.io/redis-adapter": "^8.3.0",
"@stellar/stellar-sdk": "^15.1.0",
"@types/web-push": "^3.6.4",
"cors": "^2.8.6",
"dotenv": "^17.3.1",
"drizzle-orm": "^0.45.2",
Expand All @@ -40,6 +41,7 @@
"postgres": "^3.4.9",
"redis": "^6.0.0",
"socket.io": "^4.8.3",
"web-push": "^3.6.7",
"zod": "^4.4.3"
},
"devDependencies": {
Expand Down
1 change: 1 addition & 0 deletions apps/backend/src/db/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ export const userDevices = pgTable(
identityPublicKey: text('identity_public_key').notNull(),
registrationId: integer('registration_id'),
lastSeenAt: timestamp('last_seen_at'),
pushEnabled: boolean('push_enabled').notNull().default(true),
revokedAt: timestamp('revoked_at'),
createdAt: timestamp('created_at').notNull().defaultNow(),
},
Expand Down
53 changes: 50 additions & 3 deletions apps/backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ import { Server } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';
import dotenv from 'dotenv';
import { eq, isNull, and } from 'drizzle-orm';
import { db } from './db/index.js';
import { conversationMembers, users, userDevices } from './db/schema.js';
import { eq, inArray } from 'drizzle-orm';
import { db } from './db/index.js';
import { conversationMembers, users } from './db/schema.js';
Expand All @@ -12,7 +15,7 @@ import { registerMessagingHandlers } from './socket/messaging.js';
import { app } from './app.js';
import { redis as appRedis } from './lib/redis.js';
import { setSocketServer } from './lib/socket.js';
import { setOnline, setOffline } from './services/presence.js';
import { setOnline, setOffline, deriveDevicePresence } from './services/presence.js';
import { startHeartbeatTimer, clearHeartbeatTimer } from './services/heartbeat.js';
import {
registerDeviceSocket,
Expand Down Expand Up @@ -74,6 +77,7 @@ io.use(socketAuthMiddleware);
io.on('connection', async (socket: AuthSocket) => {
const userId = socket.auth!.userId;
const deviceId = socket.auth!.deviceId;
const identityPublicKey = socket.identityPublicKey;
console.log('User connected:', userId, socket.id);

// Register socket for device-revocation tracking (cross-instance via Redis pub/sub).
Expand All @@ -82,7 +86,25 @@ io.on('connection', async (socket: AuthSocket) => {
}

// Start the server-side heartbeat watchdog (90 s timeout).
startHeartbeatTimer(socket, userId, deviceId, appRedis, io);
startHeartbeatTimer(socket, userId, deviceId, appRedis, io, identityPublicKey);

// Update user_devices.lastSeenAt for device-based presence derivation.
if (identityPublicKey) {
try {
await db
.update(userDevices)
.set({ lastSeenAt: new Date() })
.where(
and(
eq(userDevices.userId, userId),
eq(userDevices.identityPublicKey, identityPublicKey),
isNull(userDevices.revokedAt),
),
);
} catch {
// Non-critical update; ignore errors.
}
}

// Per-socket middleware: intercept every incoming event before handlers.
const EXCLUDED_EVENTS = new Set(['heartbeat']);
Expand Down Expand Up @@ -172,6 +194,24 @@ io.on('connection', async (socket: AuthSocket) => {
unregisterForBackpressure(socket);
clearViolations(socket.id);

// Update user_devices.lastSeenAt on disconnect.
if (identityPublicKey) {
try {
await db
.update(userDevices)
.set({ lastSeenAt: new Date() })
.where(
and(
eq(userDevices.userId, userId),
eq(userDevices.identityPublicKey, identityPublicKey),
isNull(userDevices.revokedAt),
),
);
} catch {
// Non-critical update; ignore errors.
}
}

if (appRedis) {
const fullyOffline = await setOffline(appRedis, userId, deviceId);
if (fullyOffline) {
Expand All @@ -186,9 +226,16 @@ io.on('connection', async (socket: AuthSocket) => {
where: eq(conversationMembers.userId, userId),
columns: { conversationId: true },
});

const { lastSeen } = await deriveDevicePresence(userId);

for (const m of memberships) {
io.to(m.conversationId).emit('user_offline', { userId });
io.to(m.conversationId).emit('presence_update', { userId, online: false });
io.to(m.conversationId).emit('presence_update', {
userId,
online: false,
...(lastSeen ? { lastSeen } : {}),
});
}
await recordPresenceForCoMembers(
userId,
Expand Down
2 changes: 2 additions & 0 deletions apps/backend/src/middleware/socketAuth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { devices } from '../db/schema.js';

export interface AuthSocket extends Socket {
auth?: JwtPayload;
identityPublicKey?: string;
}

export async function socketAuthMiddleware(
Expand Down Expand Up @@ -40,5 +41,6 @@ export async function socketAuthMiddleware(
}

socket.auth = payload;
socket.identityPublicKey = device.identityPublicKey;
next();
}
20 changes: 15 additions & 5 deletions apps/backend/src/routes/users.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { db } from '../db/index.js';
import { users, wallets, devices, conversationMembers } from '../db/schema.js';
import { requireAuth, type AuthRequest } from '../middleware/auth.js';
import { redis } from '../lib/redis.js';
import { isOnline } from '../services/presence.js';
import { isOnline, deriveDevicePresence } from '../services/presence.js';
import { getSocketServer } from '../lib/socket.js';

export const usersRouter: RouterType = Router();
Expand Down Expand Up @@ -163,12 +163,22 @@ usersRouter.get('/:id/presence', async (req: AuthRequest, res) => {
return;
}

if (!redis) {
// Check Redis for active WS connections first.
if (redis) {
const online = await isOnline(redis, id);
if (online) {
res.json({ online: true });
return;
}
}

// Fall back to device-based presence from user_devices.lastSeenAt.
try {
const { online, lastSeen } = await deriveDevicePresence(id);
res.json({ online, ...(lastSeen ? { lastSeen } : {}) });
} catch {
res.json({ online: false });
return;
}
const online = await isOnline(redis, id);
res.json({ online });
} catch {
res.status(404).json({ error: 'User not found' });
}
Expand Down
23 changes: 21 additions & 2 deletions apps/backend/src/services/heartbeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import type { Server } from 'socket.io';
import type { Redis } from 'ioredis';
import type { AuthSocket } from '../middleware/socketAuth.js';
import { db } from '../db/index.js';
import { devices } from '../db/schema.js';
import { eq } from 'drizzle-orm';
import { devices, userDevices } from '../db/schema.js';
import { eq, and, isNull } from 'drizzle-orm';
import { refreshPresence, markDeviceOffline } from './presence.js';

const HEARTBEAT_TIMEOUT_MS = 90_000;
Expand All @@ -18,6 +18,7 @@ export function startHeartbeatTimer(
deviceId: string,
redis: Redis | null,
io: Server,
identityPublicKey?: string,
): void {
const schedule = () => {
clearTimeout(timers.get(socket.id));
Expand Down Expand Up @@ -61,6 +62,24 @@ export function startHeartbeatTimer(
} catch {
// Non-critical update; ignore errors.
}

// Update user_devices.lastSeenAt for device-based presence derivation.
if (identityPublicKey) {
try {
await db
.update(userDevices)
.set({ lastSeenAt: new Date() })
.where(
and(
eq(userDevices.userId, userId),
eq(userDevices.identityPublicKey, identityPublicKey),
isNull(userDevices.revokedAt),
),
);
} catch {
// Non-critical update; ignore errors.
}
}
}

schedule();
Expand Down
48 changes: 48 additions & 0 deletions apps/backend/src/services/presence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,23 @@
* device also has a small per-device key with its own TTL so heartbeat timeouts
* can remove that device entry without forcing the whole user offline.
*
* - On connect: add socketId to `presence:{userId}` set, set TTL 60s
* - On heartbeat: refresh TTL to 60s
* - On disconnect: remove socketId from set, if set empty → user_offline
* - GET /users/:id/presence → { online: boolean, lastSeen?: string }
*
* User presence is derived from device presence: a user is online when any
* non-expired device entry exists (Redis OR user_devices.lastSeenAt within
* the window). When offline, lastSeen reflects the most recent device activity.
* - On connect: upsert device entry in `presence:user:{userId}` and refresh TTL
* - On heartbeat: update lastSeen and refresh the device TTL
* - On disconnect/timeout: remove that device entry; if none remain → user offline
* - GET /users/:id/presence → { online: boolean }
*/
import type { Redis } from 'ioredis';
import { isNull, eq, and, gte, desc } from 'drizzle-orm';
import { db } from '../db/index.js';
import { userDevices } from '../db/schema.js';

const PRESENCE_TTL = 90; // seconds

Expand Down Expand Up @@ -110,3 +121,40 @@ export async function isOnline(redis: Redis, userId: string): Promise<boolean> {
const count = await redis.hlen(key);
return count > 0;
}

const DEVICE_PRESENCE_WINDOW_MS = 90_000;

/**
* Derive user presence from device presence: a user is considered online
* if any non-revoked device has a lastSeenAt within the presence window.
* When offline, returns the most recent lastSeenAt across all devices.
*/
export async function deriveDevicePresence(
userId: string,
): Promise<{ online: boolean; lastSeen: string | null }> {
const windowStart = new Date(Date.now() - DEVICE_PRESENCE_WINDOW_MS);

const activeDevice = await db.query.userDevices.findFirst({
where: and(
eq(userDevices.userId, userId),
isNull(userDevices.revokedAt),
gte(userDevices.lastSeenAt, windowStart),
),
columns: { id: true },
});

if (activeDevice) {
return { online: true, lastSeen: null };
}

const mostRecent = await db.query.userDevices.findFirst({
where: and(eq(userDevices.userId, userId), isNull(userDevices.revokedAt)),
orderBy: desc(userDevices.lastSeenAt),
columns: { lastSeenAt: true },
});

return {
online: false,
lastSeen: mostRecent?.lastSeenAt?.toISOString() ?? null,
};
}
83 changes: 83 additions & 0 deletions apps/backend/src/services/push.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import webpush from 'web-push';
import { eq, and, isNull } from 'drizzle-orm';
import { db } from '../db/index.js';
import { conversationMembers, pushSubscriptions, userDevices } from '../db/schema.js';
import { redis } from '../lib/redis.js';
import { isOnline } from './presence.js';

const VAPID_SUBJECT = process.env['VAPID_SUBJECT'] || 'mailto:admin@clicked.app';

if (process.env['VAPID_PUBLIC_KEY'] && process.env['VAPID_PRIVATE_KEY']) {
webpush.setVapidDetails(
VAPID_SUBJECT,
process.env['VAPID_PUBLIC_KEY'],
process.env['VAPID_PRIVATE_KEY'],
);
}

export interface PushContext {
conversationId: string;
messageId: string;
senderId: string;
}

export async function sendPushForMessage(ctx: PushContext): Promise<void> {
if (!process.env['VAPID_PUBLIC_KEY'] || !process.env['VAPID_PRIVATE_KEY']) {
return;
}

try {
const allMembers = await db.query.conversationMembers.findMany({
where: eq(conversationMembers.conversationId, ctx.conversationId),
columns: { userId: true, isMuted: true },
});

for (const member of allMembers) {
if (member.userId === ctx.senderId) continue;
if (member.isMuted) continue;

// Skip online users (active WS connection).
if (redis) {
const online = await isOnline(redis, member.userId);
if (online) continue;
}

// Get non-revoked devices with push enabled.
const devices = await db.query.userDevices.findMany({
where: and(
eq(userDevices.userId, member.userId),
eq(userDevices.pushEnabled, true),
isNull(userDevices.revokedAt),
),
columns: { id: true },
});

for (const device of devices) {
const sub = await db.query.pushSubscriptions.findFirst({
where: eq(pushSubscriptions.deviceId, device.id),
columns: { endpoint: true, p256dh: true, auth: true },
});

if (!sub) continue;

try {
await webpush.sendNotification(
{
endpoint: sub.endpoint,
keys: { p256dh: sub.p256dh, auth: sub.auth },
},
JSON.stringify({
type: 'new_message',
conversationId: ctx.conversationId,
messageId: ctx.messageId,
}),
);
} catch {
// Push delivery failures are non-critical.
}
}
}
} catch {
// Push is best-effort; never let it break message delivery.
}
}
Loading
Loading