Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ let medianodeData: MediaNodeData;
const shutdown = async (): Promise<void> => {
try {
await redisServer.sRem(
getRedisKey['medianodesRunning'](),
getRedisKey['medianodes'](),
JSON.stringify(medianodeData)
);
console.log('Delete medianode');
Expand Down
35 changes: 35 additions & 0 deletions src/lib/schema.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import z from 'zod';

const roomIdPeerIdSchema = z.object({
roomId: z.string(),
peerId: z.string(),
});

export const ValidationSchema = {
roomIdPeerId: roomIdPeerIdSchema,
createPeer: roomIdPeerIdSchema.extend({
rtpCapabilities: z.any(),
peerType: z.enum(['Participant', 'Recorder']),
}),

connectWebRtcTransport: roomIdPeerIdSchema.extend({
transportId: z.string(),
dtlsParameters: z.any(),
}),

createProducer: roomIdPeerIdSchema.extend({
rtpParameters: z.any(),
transportId: z.string(),
kind: z.enum(['audio', 'video']),
appData: z.any(),
}),

manageProducer: roomIdPeerIdSchema.extend({
producerId: z.string(),
source: z.enum(['mic', 'camera', 'screen']),
}),

restartIce: roomIdPeerIdSchema.extend({
transportId: z.string(),
}),
};
40 changes: 29 additions & 11 deletions src/lib/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,24 @@ import config from '../config';
import { redisServer } from '../servers/redis-server';
import { MediaNodeData } from '../types';

export const HEARTBEAT_TIMEOUT = 60000; // 90 seconds / 1.30mins

export const getRedisKey = {
room: (roomId: string): string => `room-${roomId}`,
lobby: (roomId: string): string => `lobby-${roomId}`,
roomPeers: (roomId: string): string => `room-${roomId}-peers`,
roomPeerIds: (roomId: string): string => `room-${roomId}-peerids`,
room: (roomId: string): string => `room:${roomId}`,
lobby: (roomId: string): string => `lobby:${roomId}`,
roomPeers: (roomId: string): string => `room:${roomId}:peers`,
roomPeerIds: (roomId: string): string => `room:${roomId}:peerids`,
roomActiveSpeakerPeerId: (roomId: string): string =>
`room-${roomId}-activespeakerpeerid`,
roomsOngoing: (): string => `rooms-ongoing`,
medianodesRunning: (): string => `medianodes-running`,
signalnodesRunning: (): string => `signalnodes-running`,
roomMedianodes: (roomId: string): string => `room-${roomId}-medianodes`,
roomSignalnodes: (roomId: string): string => `room-${roomId}-signalnodes`,
`room:${roomId}:activespeakerpeerid`,
rooms: (): string => `rooms`,
medianodes: (): string => `medianodes`,
signalnodes: (): string => `signalnodes`,
roomMedianodes: (roomId: string): string => `room:${roomId}:medianodes`,
roomSignalnodes: (roomId: string): string => `room:${roomId}:signalnodes`,
};

export const getPubSubChannel = {
room: (roomId: string): string => `room-${roomId}`,
};

export const registerMediaNode = async (): Promise<MediaNodeData> => {
Expand All @@ -27,11 +33,23 @@ export const registerMediaNode = async (): Promise<MediaNodeData> => {
grpcPort: `${config.grpcPort}`,
};
await redisServer.sAdd(
getRedisKey['medianodesRunning'](),
getRedisKey['medianodes'](),
JSON.stringify(medianodeData)
);
return medianodeData;
} catch (error) {
throw error;
}
};

export const parseArguments = (args?: string): { [key: string]: unknown } => {
let parsedArgs: { [key: string]: unknown } = {};
if (args) {
try {
parsedArgs = JSON.parse(args);
} catch (parseError) {
throw parseError;
}
}
return parsedArgs;
};
26 changes: 2 additions & 24 deletions src/servers/grpc-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { ProtoGrpcType } from '../protos/gen/media-signaling';

import config from '../config';
import SignalNode from '../services/signalnode';
import { MediaSignalingActions as MSA } from '../types/actions';
import { Actions as MSA } from '../types/actions';

interface ServerStats {
totalConnections: number;
Expand Down Expand Up @@ -301,7 +301,7 @@ class GrpcServer extends EventEmitter {

// Cleanup interval for stale connections
this.cleanupInterval = setInterval(() => {
this.cleanupStaleConnections();
// this.cleanupStaleConnections();
}, 60000); // Every minute
}

Expand Down Expand Up @@ -343,28 +343,6 @@ class GrpcServer extends EventEmitter {
this.emit('metricsReport', { stats, timestamp: new Date() });
}

private cleanupStaleConnections(): void {
try {
const nodes = SignalNode.getNodes();
const staleThreshold = 5 * 60 * 1000; // 5 minutes
let cleanedCount = 0;

nodes.forEach(node => {
if (node.isStale(staleThreshold)) {
console.log(`🧹 Cleaning up stale connection: ${node.id}`);
node.forceDisconnect('stale_connection_cleanup');
cleanedCount++;
}
});

if (cleanedCount > 0) {
console.log(`🧹 Cleaned up ${cleanedCount} stale connections`);
}
} catch (error) {
console.error('❌ Error during cleanup:', error);
}
}

async stop(): Promise<void> {
if (this.shutdownPromise) {
return this.shutdownPromise;
Expand Down
10 changes: 5 additions & 5 deletions src/servers/redis-server.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { createClient, RedisClientType, SetOptions } from 'redis';

import config from '../config';
import { PubSubActions } from '../types/actions';
import { Actions } from '../types/actions';

class RedisServer {
private static instance: RedisServer | null = null;
Expand Down Expand Up @@ -40,12 +40,12 @@ class RedisServer {
private async subscribe(): Promise<void> {
if (!this.isConnected)
throw new Error('Redis clients are not connected. Call connect() first');
await this.subClient.subscribe(PubSubActions.Message, message => {
await this.subClient.subscribe(Actions.Message, message => {
const {
event,
args,
}: {
event: PubSubActions;
event: Actions;
args: { [key: string]: unknown };
} = JSON.parse(message);

Expand All @@ -61,14 +61,14 @@ class RedisServer {
event,
args,
}: {
event: PubSubActions;
event: Actions;
args: { [key: string]: unknown };
}): Promise<void> {
if (!this.isConnected)
throw new Error('Redis clients are not connected. Call connect() first');

const message = JSON.stringify({ event, args });
await this.pubClient.publish(PubSubActions.Message, message);
await this.pubClient.publish(Actions.Message, message);
console.info(`Message published to channe ${message}`);
}

Expand Down
112 changes: 102 additions & 10 deletions src/services/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,24 @@ import EventEmitter from 'events';
import { types as mediasoupTypes } from 'mediasoup';
import { PeerType, ProducerSource } from '../types';
import { mediaSoupServer } from '../servers/mediasoup-server';
import SignalNode from './signalnode';
import { Actions } from '../types/actions';
import { MessageResponse } from '../protos/gen/mediaSignalingPackage/MessageResponse';
import Room from './room';

class Peer extends EventEmitter {
id: string;
roomId: string;
closed: boolean;
signalNodeId: string;
private signalnode: SignalNode;
type: PeerType;

rtpCapabilities: mediasoupTypes.RtpCapabilities;
transports: Map<string, mediasoupTypes.WebRtcTransport>;
producers: Map<string, mediasoupTypes.Producer>;
consumers: Map<string, mediasoupTypes.Consumer>;
private deviceRtpCapabilities: mediasoupTypes.RtpCapabilities;
private transports: Map<string, mediasoupTypes.WebRtcTransport>;
private producers: Map<string, mediasoupTypes.Producer>;
private consumers: Map<string, mediasoupTypes.Consumer>;

router: mediasoupTypes.Router;
private router: mediasoupTypes.Router;
workerPid: number;

static peers = new Map<string, Peer>();
Expand All @@ -25,27 +29,27 @@ class Peer extends EventEmitter {
roomId,
router,
rtpCapabilities,
signalNodeId,
signalnode,
type,
}: {
id: string;
roomId: string;
router: mediasoupTypes.Router;
rtpCapabilities: mediasoupTypes.RtpCapabilities;
signalNodeId: string;
signalnode: SignalNode;
type: PeerType;
}) {
super();
this.id = id;
this.roomId = roomId;
this.closed = false;
this.rtpCapabilities = rtpCapabilities;
this.deviceRtpCapabilities = rtpCapabilities;
this.router = router;
this.workerPid = (router.appData.worker as mediasoupTypes.Worker).pid;
this.transports = new Map();
this.producers = new Map();
this.consumers = new Map();
this.signalNodeId = signalNodeId;
this.signalnode = signalnode;
this.type = type;
// increment worker load
}
Expand Down Expand Up @@ -73,6 +77,35 @@ class Peer extends EventEmitter {
console.info('Peer closed');
}

getDeviceRTPCapabilities(): mediasoupTypes.RtpCapabilities {
return this.deviceRtpCapabilities;
}
getSignalnode(): SignalNode {
return this.signalnode;
}

sendMessage(action: Actions, args?: { [key: string]: unknown }): void {
this.signalnode.sendMessage(action, args);
}

async sendMessageForResponse(
action: Actions,
args?: { [key: string]: unknown }
): Promise<MessageResponse | null> {
return this.signalnode.sendMessageForResponse(action, args);
}

sendResponse(
action: Actions,
requestId: string,
response: { [key: string]: unknown }
): void {
this.signalnode.sendResponse(action, requestId, response);
}

sendError(action: Actions, requestId: string, error: Error | unknown): void {
this.signalnode.sendError(action, requestId, error);
}
// transport methods
addTransport(transport: mediasoupTypes.WebRtcTransport): void {
this.transports.set(transport.id, transport);
Expand All @@ -81,10 +114,31 @@ class Peer extends EventEmitter {
});
}

async pipeToRouter({
router,
producerId,
}: {
router: mediasoupTypes.Router;
producerId: string;
}): Promise<mediasoupTypes.PipeToRouterResult> {
return this.router.pipeToRouter({
router,
producerId,
});
}

getRouter(): mediasoupTypes.Router {
return this.router;
}

getTransport(id: string): mediasoupTypes.WebRtcTransport | undefined {
return this.transports.get(id);
}

getTransports(): mediasoupTypes.WebRtcTransport[] {
return Array.from(this.transports.values());
}

removeTransport(id: string): void {
this.transports.delete(id);
}
Expand All @@ -101,6 +155,10 @@ class Peer extends EventEmitter {
return this.producers.get(id);
}

getProducers(): mediasoupTypes.Producer[] {
return Array.from(this.producers.values());
}

removeProducer(id: string): void {
this.producers.delete(id);
}
Expand All @@ -113,6 +171,36 @@ class Peer extends EventEmitter {
return producers;
}

closeProducersBySource({
room,
source,
}: {
room: Room;
source: ProducerSource;
}): void {
try {
const producers = this.getProducersBySource(source);
producers.forEach(async producer => {
if (producer.kind === 'audio' && source === 'mic') {
const audioLevelObserver = room.audioLevelObservers.get(
this.router.id
);
if (audioLevelObserver) {
audioLevelObserver
.removeProducer({
producerId: producer.id,
})
.catch(error => {
console.log(error);
});
}
}
producer.close();
});
} catch (error) {
console.error('closeProducersBySource fialed ', { error });
}
}
// Consumers methods
addConsumer(consumer: mediasoupTypes.Consumer): void {
this.consumers.set(consumer.id, consumer);
Expand All @@ -125,6 +213,10 @@ class Peer extends EventEmitter {
return this.consumers.get(id);
}

getConsumers(): mediasoupTypes.Consumer[] {
return Array.from(this.consumers.values());
}

getConsumerByProducerId(
producerId: string
): mediasoupTypes.Consumer | undefined {
Expand Down
Loading