diff --git a/src/lib/schema.ts b/src/lib/schema.ts index 325077a..0820949 100644 --- a/src/lib/schema.ts +++ b/src/lib/schema.ts @@ -8,17 +8,23 @@ const roomIdPeerIdSchema = z.object({ export const ValidationSchema = { roomIdPeerId: roomIdPeerIdSchema, createPeer: roomIdPeerIdSchema.extend({ - rtpCapabilities: z.any(), + deviceRtpCapabilities: z + .any() + .refine(value => value, 'Can not be null or undefined'), peerType: z.enum(['Participant', 'Recorder']), }), connectWebRtcTransport: roomIdPeerIdSchema.extend({ transportId: z.string(), - dtlsParameters: z.any(), + dtlsParameters: z + .any() + .refine(value => value, 'Can not be null or undefined'), }), createProducer: roomIdPeerIdSchema.extend({ - rtpParameters: z.any(), + rtpParameters: z + .any() + .refine(value => value, 'Can not be null or undefined'), transportId: z.string(), kind: z.enum(['audio', 'video']), appData: z.any(), diff --git a/src/servers/grpc-server.ts b/src/servers/grpc-server.ts index b9edc28..d7686f3 100644 --- a/src/servers/grpc-server.ts +++ b/src/servers/grpc-server.ts @@ -310,9 +310,9 @@ class GrpcServer extends EventEmitter { const stats = this.getStats(); const memoryUsage = process.memoryUsage(); - console.log( - `💗 Health Check - Active: ${stats.activeConnections}, Memory: ${Math.round(memoryUsage.heapUsed / 1024 / 1024)}MB` - ); + // console.log( + // `💗 Health Check - Active: ${stats.activeConnections}, Memory: ${Math.round(memoryUsage.heapUsed / 1024 / 1024)}MB` + // ); this.emit('healthCheck', { stats, @@ -323,8 +323,8 @@ class GrpcServer extends EventEmitter { // Check for memory leaks if (memoryUsage.heapUsed > 512 * 1024 * 1024) { // 512MB threshold - console.warn('⚠️ High memory usage detected:', memoryUsage); - this.emit('memoryWarning', { memoryUsage, timestamp: new Date() }); + // console.warn('⚠️ High memory usage detected:', memoryUsage); + // this.emit('memoryWarning', { memoryUsage, timestamp: new Date() }); } } catch (error) { console.error('❌ Health check failed:', error); @@ -333,12 +333,12 @@ class GrpcServer extends EventEmitter { private reportMetrics(): void { const stats = this.getStats(); - console.log(`📊 Server Metrics:`, { - uptime: `${Math.round(stats.uptime / 1000 / 60)}min`, - connections: `${stats.activeConnections}/${stats.totalConnections}`, - messages: `R:${stats.totalMessagesReceived} S:${stats.totalMessagesSent}`, - errors: stats.errors, - }); + // console.log(`📊 Server Metrics:`, { + // uptime: `${Math.round(stats.uptime / 1000 / 60)}min`, + // connections: `${stats.activeConnections}/${stats.totalConnections}`, + // messages: `R:${stats.totalMessagesReceived} S:${stats.totalMessagesSent}`, + // errors: stats.errors, + // }); this.emit('metricsReport', { stats, timestamp: new Date() }); } diff --git a/src/services/peer.ts b/src/services/peer.ts index 01d6805..6326832 100644 --- a/src/services/peer.ts +++ b/src/services/peer.ts @@ -1,10 +1,9 @@ import EventEmitter from 'events'; import { types as mediasoupTypes } from 'mediasoup'; -import { PeerType, ProducerSource } from '../types'; +import { PeerType, ProducerSource, ResponseData } from '../types'; import { mediaSoupServer } from '../servers/mediasoup-server'; import SignalNode from './signalnode'; import { Actions } from '../types/actions'; -import { MessageResponse } from '../protos/gen/mediaSignalingPackage/MessageResponse'; import Room from './room'; class Peer extends EventEmitter { @@ -22,20 +21,20 @@ class Peer extends EventEmitter { private router: mediasoupTypes.Router; workerPid: number; - static peers = new Map(); + // static peers = new Map(); constructor({ id, roomId, router, - rtpCapabilities, + deviceRtpCapabilities, signalnode, type, }: { id: string; roomId: string; router: mediasoupTypes.Router; - rtpCapabilities: mediasoupTypes.RtpCapabilities; + deviceRtpCapabilities: mediasoupTypes.RtpCapabilities; signalnode: SignalNode; type: PeerType; }) { @@ -43,7 +42,7 @@ class Peer extends EventEmitter { this.id = id; this.roomId = roomId; this.closed = false; - this.deviceRtpCapabilities = rtpCapabilities; + this.deviceRtpCapabilities = deviceRtpCapabilities; this.router = router; this.workerPid = (router.appData.worker as mediasoupTypes.Worker).pid; this.transports = new Map(); @@ -91,7 +90,7 @@ class Peer extends EventEmitter { async sendMessageForResponse( action: Actions, args?: { [key: string]: unknown } - ): Promise { + ): Promise { return this.signalnode.sendMessageForResponse(action, args); } @@ -146,6 +145,7 @@ class Peer extends EventEmitter { // Producer methods addProducer(producer: mediasoupTypes.Producer): void { this.producers.set(producer.id, producer); + console.log('Add Producer', producer.id); producer.observer.on('close', () => { this.producers.delete(producer.id); }); diff --git a/src/services/signalnode.ts b/src/services/signalnode.ts index 616f853..0d5e204 100644 --- a/src/services/signalnode.ts +++ b/src/services/signalnode.ts @@ -11,6 +11,7 @@ import { ValidationSchema } from '../lib/schema'; import Room from './room'; import Peer from './peer'; import { + ResponseData, ConnectionState, PendingRequest, ProducerSource, @@ -153,7 +154,8 @@ class SignalNode extends EventEmitter { pendingRequest.reject(parsedArgs.error as Error); } else { console.log(action, 'pending request Returned success'); - pendingRequest.resolve(message); + const response = parsedArgs.resolve as ResponseData; + pendingRequest.resolve(response); } this.pendingRequests.delete(requestId); return; @@ -301,12 +303,9 @@ class SignalNode extends EventEmitter { async sendMessageForResponse( action: Actions, args?: { [key: string]: unknown } - ): Promise { + ): Promise { if (!this.call) { - console.warn( - `⚠️ Cannot send message to MediaNode ${this.id}: not connected` - ); - return null; + throw `Cannot send message to MediaNode ${this.id}: not connected`; } try { @@ -317,7 +316,7 @@ class SignalNode extends EventEmitter { requestId, }; - return new Promise((resolve, reject) => { + return new Promise((resolve, reject) => { if (this.call) { this.pendingRequests.set(requestId, { resolve, @@ -494,7 +493,7 @@ class SignalNode extends EventEmitter { router: mediasoupTypes.Router, type: TransportKind = 'consumer' ): Promise { - if (router.appData.webRtcServer) throw 'Webrtc server not found'; + if (!router.appData.webRtcServer) throw 'Webrtc server not found'; const webRtcServer = router.appData .webRtcServer as mediasoupTypes.WebRtcServer; const transport = await router.createWebRtcTransport({ @@ -520,6 +519,10 @@ class SignalNode extends EventEmitter { room: Room; }): Promise { try { + console.log( + 'consumingPeer.getDeviceRTPCapabilities()', + consumingPeer.getDeviceRTPCapabilities() + ); if ( !consumingPeer.getRouter().canConsume({ producerId: producer.id, @@ -570,26 +573,36 @@ class SignalNode extends EventEmitter { consumingPeer.sendMessage(Actions.ResumeConsumer, options); }); - consumingPeer.sendMessageForResponse(Actions.CreateConsumer, { - peerId: consumingPeer.id, - peerType: consumingPeer.type, - roomId: room.roomId, - producerPeerId: producerPeerId, - producerId: producer.id, - transportId: transport.id, - producerSource: producer.appData.source, - id: consumer.id, - kind: consumer.kind, - rtpParameters: consumer.rtpParameters, - type: consumer.type, - producerPaused: consumer.producerPaused, - - appData: { - ...producer.appData, - producerPeerId, + consumingPeer + .sendMessageForResponse(Actions.ConsumerCreated, { + peerId: consumingPeer.id, + peerType: consumingPeer.type, + roomId: room.roomId, + producerPeerId: producerPeerId, + producerId: producer.id, transportId: transport.id, - }, - }); + producerSource: producer.appData.source, + id: consumer.id, + kind: consumer.kind, + rtpParameters: consumer.rtpParameters, + type: consumer.type, + producerPaused: consumer.producerPaused, + + appData: { + ...producer.appData, + producerPeerId, + transportId: transport.id, + }, + }) + .then(async res => { + console.log('CreateConsumer Response -> resolved request', res); + // if (producer.appData.source === 'camera') return; + await consumer.resume(); + console.log('resume consumer'); + }) + .catch(error => { + console.log('CreateConsumer | Resume Consumer', error); + }); console.log('Create consumer ---', producer.appData.source); } catch (error) { // callback('createConsumersForExistingPeers fialed') @@ -629,8 +642,9 @@ class SignalNode extends EventEmitter { if (!requestId) throw 'Request Id requested'; const data = ValidationSchema.createPeer.parse(args); - const { roomId, peerId, peerType, rtpCapabilities } = data; + const { roomId, peerId, peerType, deviceRtpCapabilities } = data; const room = Room.getRoom(roomId) ?? (await Room.create(roomId)); + console.log(Actions.CreatePeer, '=>', deviceRtpCapabilities); const router = await room.assignRouterToPeer(); if (!router) throw 'Router not assigned to peer'; @@ -638,7 +652,7 @@ class SignalNode extends EventEmitter { id: peerId, roomId, router, - rtpCapabilities, + deviceRtpCapabilities, signalnode: this, type: peerType, }); @@ -691,14 +705,14 @@ class SignalNode extends EventEmitter { peer.addTransport(consumerTransport); this.sendResponse(Actions.CreateWebrtcTransports, requestId, { - producerTransportParams: { + sendTransportParams: { id: producerTransport.id, iceParameters: producerTransport.iceParameters, iceCandidates: producerTransport.iceCandidates, dtlsParameters: producerTransport.dtlsParameters, sctpParameters: producerTransport.sctpParameters, }, - consumerTransportParams: { + recvTransportParams: { id: consumerTransport.id, iceParameters: consumerTransport.iceParameters, iceCandidates: consumerTransport.iceCandidates, @@ -736,7 +750,7 @@ class SignalNode extends EventEmitter { const { roomId, peerId } = data; const room = Room.getRoom(roomId); const peer = room?.getPeer(peerId); - + console.log(Actions.CreateConsumersOfAllProducers, 'START'); if (!room || !peer) throw 'Failed to create webrtc transport: Peer/room not found'; const existingPeers = room.getPeers(); @@ -774,6 +788,7 @@ class SignalNode extends EventEmitter { }); }); }); + console.log(Actions.CreateConsumersOfAllProducers, 'END'); } catch (error) { console.log(error); } @@ -822,7 +837,8 @@ class SignalNode extends EventEmitter { // Create server-side consumer for each existing peers const existingPeers = room.getPeers(); existingPeers.forEach(consumingPeer => { - if (consumingPeer.id === peerId) return; + if (consumingPeer.id === peerId) + return console.log('You are the one producing'); this.createConsumer({ consumingPeer, producer: producer, diff --git a/src/types/actions.ts b/src/types/actions.ts index 5b27ce2..72ebe1b 100644 --- a/src/types/actions.ts +++ b/src/types/actions.ts @@ -18,9 +18,18 @@ export enum Actions { CloseConsumer = 'close_consumer', ResumeConsumer = 'resume_consumer', PauseConsumer = 'pause_consumer', + + // consumed by client + ConsumerCreated = 'consumer_created', + ConsumerPaused = 'consumer_paused', + ConsumerResumed = 'consumer_resumed', + ConsumerClosed = 'consumer_closed', + RestartIce = 'restart_ice', CreatePeer = 'create_peer', + PeerAdded = 'peer_added', + PeerLeft = 'peer_left', ClosePeer = 'close_peer', Mute = 'mute', diff --git a/src/types/index.ts b/src/types/index.ts index bfa93e9..30bad34 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -1,5 +1,4 @@ import { types as mediasoupTypes } from 'mediasoup'; -import { MessageResponse } from '../protos/gen/mediaSignalingPackage/MessageResponse'; export type ProducerSource = 'mic' | 'camera' | 'screen' | 'screenAudio'; @@ -10,6 +9,7 @@ export type AckCallback = (res: { response?: T; }) => void; export type PeerType = 'Recorder' | 'Participant'; +export type ResponseData = { [key: string]: unknown }; export enum Role { Moderator = 'Moderator', @@ -167,7 +167,7 @@ export enum ConnectionState { export type AppDataWithRouterId = mediasoupTypes.AppData & { routerId: string }; export interface PendingRequest { - resolve: (response: MessageResponse) => void; + resolve: (response: ResponseData) => void; reject: (error: Error) => void; } diff --git a/tsconfig.json b/tsconfig.json index 450add8..8820fba 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -4,7 +4,8 @@ "outDir": "./dist", "rootDir": "./src", "forceConsistentCasingInFileNames": true, - "isolatedModules": true + "isolatedModules": true, + "strict": true }, "include": ["src/**/*.ts"], "exclude": ["node_modules", "dist", "tests", "eslint.config.mts"]