diff --git a/src/app.ts b/src/app.ts index e354a9e..eeda1b1 100644 --- a/src/app.ts +++ b/src/app.ts @@ -42,7 +42,7 @@ let medianodeData: MediaNodeData; const shutdown = async (): Promise => { try { await redisServer.sRem( - getRedisKey['medianodesRunning'](), + getRedisKey['medianodes'](), JSON.stringify(medianodeData) ); console.log('Delete medianode'); diff --git a/src/lib/schema.ts b/src/lib/schema.ts new file mode 100644 index 0000000..325077a --- /dev/null +++ b/src/lib/schema.ts @@ -0,0 +1,35 @@ +import z from 'zod'; + +const roomIdPeerIdSchema = z.object({ + roomId: z.string(), + peerId: z.string(), +}); + +export const ValidationSchema = { + roomIdPeerId: roomIdPeerIdSchema, + createPeer: roomIdPeerIdSchema.extend({ + rtpCapabilities: z.any(), + peerType: z.enum(['Participant', 'Recorder']), + }), + + connectWebRtcTransport: roomIdPeerIdSchema.extend({ + transportId: z.string(), + dtlsParameters: z.any(), + }), + + createProducer: roomIdPeerIdSchema.extend({ + rtpParameters: z.any(), + transportId: z.string(), + kind: z.enum(['audio', 'video']), + appData: z.any(), + }), + + manageProducer: roomIdPeerIdSchema.extend({ + producerId: z.string(), + source: z.enum(['mic', 'camera', 'screen']), + }), + + restartIce: roomIdPeerIdSchema.extend({ + transportId: z.string(), + }), +}; diff --git a/src/lib/utils.ts b/src/lib/utils.ts index 020d748..28ca682 100644 --- a/src/lib/utils.ts +++ b/src/lib/utils.ts @@ -2,18 +2,24 @@ import config from '../config'; import { redisServer } from '../servers/redis-server'; import { MediaNodeData } from '../types'; +export const HEARTBEAT_TIMEOUT = 60000; // 90 seconds / 1.30mins + export const getRedisKey = { - room: (roomId: string): string => `room-${roomId}`, - lobby: (roomId: string): string => `lobby-${roomId}`, - roomPeers: (roomId: string): string => `room-${roomId}-peers`, - roomPeerIds: (roomId: string): string => `room-${roomId}-peerids`, + room: (roomId: string): string => `room:${roomId}`, + lobby: (roomId: string): string => `lobby:${roomId}`, + roomPeers: (roomId: string): string => `room:${roomId}:peers`, + roomPeerIds: (roomId: string): string => `room:${roomId}:peerids`, roomActiveSpeakerPeerId: (roomId: string): string => - `room-${roomId}-activespeakerpeerid`, - roomsOngoing: (): string => `rooms-ongoing`, - medianodesRunning: (): string => `medianodes-running`, - signalnodesRunning: (): string => `signalnodes-running`, - roomMedianodes: (roomId: string): string => `room-${roomId}-medianodes`, - roomSignalnodes: (roomId: string): string => `room-${roomId}-signalnodes`, + `room:${roomId}:activespeakerpeerid`, + rooms: (): string => `rooms`, + medianodes: (): string => `medianodes`, + signalnodes: (): string => `signalnodes`, + roomMedianodes: (roomId: string): string => `room:${roomId}:medianodes`, + roomSignalnodes: (roomId: string): string => `room:${roomId}:signalnodes`, +}; + +export const getPubSubChannel = { + room: (roomId: string): string => `room-${roomId}`, }; export const registerMediaNode = async (): Promise => { @@ -27,7 +33,7 @@ export const registerMediaNode = async (): Promise => { grpcPort: `${config.grpcPort}`, }; await redisServer.sAdd( - getRedisKey['medianodesRunning'](), + getRedisKey['medianodes'](), JSON.stringify(medianodeData) ); return medianodeData; @@ -35,3 +41,15 @@ export const registerMediaNode = async (): Promise => { throw error; } }; + +export const parseArguments = (args?: string): { [key: string]: unknown } => { + let parsedArgs: { [key: string]: unknown } = {}; + if (args) { + try { + parsedArgs = JSON.parse(args); + } catch (parseError) { + throw parseError; + } + } + return parsedArgs; +}; diff --git a/src/servers/grpc-server.ts b/src/servers/grpc-server.ts index 5052762..b9edc28 100644 --- a/src/servers/grpc-server.ts +++ b/src/servers/grpc-server.ts @@ -11,7 +11,7 @@ import { ProtoGrpcType } from '../protos/gen/media-signaling'; import config from '../config'; import SignalNode from '../services/signalnode'; -import { MediaSignalingActions as MSA } from '../types/actions'; +import { Actions as MSA } from '../types/actions'; interface ServerStats { totalConnections: number; @@ -301,7 +301,7 @@ class GrpcServer extends EventEmitter { // Cleanup interval for stale connections this.cleanupInterval = setInterval(() => { - this.cleanupStaleConnections(); + // this.cleanupStaleConnections(); }, 60000); // Every minute } @@ -343,28 +343,6 @@ class GrpcServer extends EventEmitter { this.emit('metricsReport', { stats, timestamp: new Date() }); } - private cleanupStaleConnections(): void { - try { - const nodes = SignalNode.getNodes(); - const staleThreshold = 5 * 60 * 1000; // 5 minutes - let cleanedCount = 0; - - nodes.forEach(node => { - if (node.isStale(staleThreshold)) { - console.log(`🧹 Cleaning up stale connection: ${node.id}`); - node.forceDisconnect('stale_connection_cleanup'); - cleanedCount++; - } - }); - - if (cleanedCount > 0) { - console.log(`🧹 Cleaned up ${cleanedCount} stale connections`); - } - } catch (error) { - console.error('❌ Error during cleanup:', error); - } - } - async stop(): Promise { if (this.shutdownPromise) { return this.shutdownPromise; diff --git a/src/servers/redis-server.ts b/src/servers/redis-server.ts index fce89d4..9de56a1 100644 --- a/src/servers/redis-server.ts +++ b/src/servers/redis-server.ts @@ -1,7 +1,7 @@ import { createClient, RedisClientType, SetOptions } from 'redis'; import config from '../config'; -import { PubSubActions } from '../types/actions'; +import { Actions } from '../types/actions'; class RedisServer { private static instance: RedisServer | null = null; @@ -40,12 +40,12 @@ class RedisServer { private async subscribe(): Promise { if (!this.isConnected) throw new Error('Redis clients are not connected. Call connect() first'); - await this.subClient.subscribe(PubSubActions.Message, message => { + await this.subClient.subscribe(Actions.Message, message => { const { event, args, }: { - event: PubSubActions; + event: Actions; args: { [key: string]: unknown }; } = JSON.parse(message); @@ -61,14 +61,14 @@ class RedisServer { event, args, }: { - event: PubSubActions; + event: Actions; args: { [key: string]: unknown }; }): Promise { if (!this.isConnected) throw new Error('Redis clients are not connected. Call connect() first'); const message = JSON.stringify({ event, args }); - await this.pubClient.publish(PubSubActions.Message, message); + await this.pubClient.publish(Actions.Message, message); console.info(`Message published to channe ${message}`); } diff --git a/src/services/peer.ts b/src/services/peer.ts index 102e20b..01d6805 100644 --- a/src/services/peer.ts +++ b/src/services/peer.ts @@ -2,20 +2,24 @@ import EventEmitter from 'events'; import { types as mediasoupTypes } from 'mediasoup'; import { PeerType, ProducerSource } from '../types'; import { mediaSoupServer } from '../servers/mediasoup-server'; +import SignalNode from './signalnode'; +import { Actions } from '../types/actions'; +import { MessageResponse } from '../protos/gen/mediaSignalingPackage/MessageResponse'; +import Room from './room'; class Peer extends EventEmitter { id: string; roomId: string; closed: boolean; - signalNodeId: string; + private signalnode: SignalNode; type: PeerType; - rtpCapabilities: mediasoupTypes.RtpCapabilities; - transports: Map; - producers: Map; - consumers: Map; + private deviceRtpCapabilities: mediasoupTypes.RtpCapabilities; + private transports: Map; + private producers: Map; + private consumers: Map; - router: mediasoupTypes.Router; + private router: mediasoupTypes.Router; workerPid: number; static peers = new Map(); @@ -25,27 +29,27 @@ class Peer extends EventEmitter { roomId, router, rtpCapabilities, - signalNodeId, + signalnode, type, }: { id: string; roomId: string; router: mediasoupTypes.Router; rtpCapabilities: mediasoupTypes.RtpCapabilities; - signalNodeId: string; + signalnode: SignalNode; type: PeerType; }) { super(); this.id = id; this.roomId = roomId; this.closed = false; - this.rtpCapabilities = rtpCapabilities; + this.deviceRtpCapabilities = rtpCapabilities; this.router = router; this.workerPid = (router.appData.worker as mediasoupTypes.Worker).pid; this.transports = new Map(); this.producers = new Map(); this.consumers = new Map(); - this.signalNodeId = signalNodeId; + this.signalnode = signalnode; this.type = type; // increment worker load } @@ -73,6 +77,35 @@ class Peer extends EventEmitter { console.info('Peer closed'); } + getDeviceRTPCapabilities(): mediasoupTypes.RtpCapabilities { + return this.deviceRtpCapabilities; + } + getSignalnode(): SignalNode { + return this.signalnode; + } + + sendMessage(action: Actions, args?: { [key: string]: unknown }): void { + this.signalnode.sendMessage(action, args); + } + + async sendMessageForResponse( + action: Actions, + args?: { [key: string]: unknown } + ): Promise { + return this.signalnode.sendMessageForResponse(action, args); + } + + sendResponse( + action: Actions, + requestId: string, + response: { [key: string]: unknown } + ): void { + this.signalnode.sendResponse(action, requestId, response); + } + + sendError(action: Actions, requestId: string, error: Error | unknown): void { + this.signalnode.sendError(action, requestId, error); + } // transport methods addTransport(transport: mediasoupTypes.WebRtcTransport): void { this.transports.set(transport.id, transport); @@ -81,10 +114,31 @@ class Peer extends EventEmitter { }); } + async pipeToRouter({ + router, + producerId, + }: { + router: mediasoupTypes.Router; + producerId: string; + }): Promise { + return this.router.pipeToRouter({ + router, + producerId, + }); + } + + getRouter(): mediasoupTypes.Router { + return this.router; + } + getTransport(id: string): mediasoupTypes.WebRtcTransport | undefined { return this.transports.get(id); } + getTransports(): mediasoupTypes.WebRtcTransport[] { + return Array.from(this.transports.values()); + } + removeTransport(id: string): void { this.transports.delete(id); } @@ -101,6 +155,10 @@ class Peer extends EventEmitter { return this.producers.get(id); } + getProducers(): mediasoupTypes.Producer[] { + return Array.from(this.producers.values()); + } + removeProducer(id: string): void { this.producers.delete(id); } @@ -113,6 +171,36 @@ class Peer extends EventEmitter { return producers; } + closeProducersBySource({ + room, + source, + }: { + room: Room; + source: ProducerSource; + }): void { + try { + const producers = this.getProducersBySource(source); + producers.forEach(async producer => { + if (producer.kind === 'audio' && source === 'mic') { + const audioLevelObserver = room.audioLevelObservers.get( + this.router.id + ); + if (audioLevelObserver) { + audioLevelObserver + .removeProducer({ + producerId: producer.id, + }) + .catch(error => { + console.log(error); + }); + } + } + producer.close(); + }); + } catch (error) { + console.error('closeProducersBySource fialed ', { error }); + } + } // Consumers methods addConsumer(consumer: mediasoupTypes.Consumer): void { this.consumers.set(consumer.id, consumer); @@ -125,6 +213,10 @@ class Peer extends EventEmitter { return this.consumers.get(id); } + getConsumers(): mediasoupTypes.Consumer[] { + return Array.from(this.consumers.values()); + } + getConsumerByProducerId( producerId: string ): mediasoupTypes.Consumer | undefined { diff --git a/src/services/room.ts b/src/services/room.ts index 7580efb..7271650 100644 --- a/src/services/room.ts +++ b/src/services/room.ts @@ -5,7 +5,7 @@ import config from '../config'; import { mediaSoupServer } from '../servers/mediasoup-server'; import { redisServer } from '../servers/redis-server'; import { getRedisKey } from '../lib/utils'; -import { ServiceActions } from '../types/actions'; +import { Actions } from '../types/actions'; import MediaNode from './medianode'; import { AppDataWithRouterId } from '../types'; @@ -209,21 +209,21 @@ class Room extends EventEmitter { ): Promise { try { const peersToPipe = Array.from(this.peers.values()).filter( - peer => peer.router.id !== router.id + peer => peer.getRouter().id !== router.id ); for (const peer of peersToPipe) { - const srcRouter = peer.router; + const srcRouter = peer.getRouter(); if (srcRouter) { - for (const producerId of peer.producers.keys()) { + for (const producer of peer.getProducers()) { if ( ( router.appData.producers as Map - ).has(producerId) + ).has(producer.id) ) { continue; } await srcRouter.pipeToRouter({ - producerId, + producerId: producer.id, router, }); } @@ -313,7 +313,7 @@ class Room extends EventEmitter { }; private handlePeerEvents(peer: Peer): void { - peer.on(ServiceActions.Close, () => { + peer.on(Actions.Close, () => { if (!this.getPeer(peer.id)) return; this.removePeer(peer.id); }); @@ -344,7 +344,7 @@ class Room extends EventEmitter { }): Promise { const peers = this.getPeers(); for (const peer of peers) { - const peerProducers = peer.producers.values(); + const peerProducers = peer.getProducers().values(); for (const producer of peerProducers) { this.createPipeConsumer({ producer, @@ -374,7 +374,7 @@ class Room extends EventEmitter { addMediaNode(mediaNode: MediaNode): void { this.mediaNodes.set(mediaNode.id, mediaNode); - mediaNode.on(ServiceActions.Close, () => { + mediaNode.on(Actions.Close, () => { this.mediaNodes.delete(mediaNode.id); }); } diff --git a/src/services/signalnode.ts b/src/services/signalnode.ts index b108ecb..616f853 100644 --- a/src/services/signalnode.ts +++ b/src/services/signalnode.ts @@ -1,38 +1,22 @@ import EventEmitter from 'events'; import * as grpc from '@grpc/grpc-js'; +import { types as mediasoupTypes } from 'mediasoup'; -import { MediaSignalingActions as MSA } from '../types/actions'; +import { Actions } from '../types/actions'; import { MessageRequest } from '../protos/gen/mediaSignalingPackage/MessageRequest'; import { MessageResponse } from '../protos/gen/mediaSignalingPackage/MessageResponse'; import { grpcServer } from '../servers/grpc-server'; import { mediaSoupServer } from '../servers/mediasoup-server'; - -enum ConnectionState { - CONNECTING = 'CONNECTING', - CONNECTED = 'CONNECTED', - DISCONNECTING = 'DISCONNECTING', - DISCONNECTED = 'DISCONNECTED', - ERROR = 'ERROR', -} - -interface ConnectionMetrics { - connectedAt: number; - lastActivity: number; - lastHeartbeat: number; - messagesSent: number; - messagesReceived: number; - errors: number; - heartbeatsReceived: number; - heartbeatsMissed: number; -} - -interface MessageQueueItem { - action: MSA; - args?: { [key: string]: unknown }; - timestamp: number; - retries: number; - id: string; -} +import { ValidationSchema } from '../lib/schema'; +import Room from './room'; +import Peer from './peer'; +import { + ConnectionState, + PendingRequest, + ProducerSource, + TransportKind, +} from '../types'; +import { parseArguments as parseArguments } from '../lib/utils'; class SignalNode extends EventEmitter { id: string; @@ -40,16 +24,11 @@ class SignalNode extends EventEmitter { call: grpc.ServerDuplexStream; metadata: grpc.Metadata; private connectionState: ConnectionState; - private metrics: ConnectionMetrics; + private lastHeartbeat: number; private heartbeatInterval?: NodeJS.Timeout; - private messageQueue: MessageQueueItem[] = []; - private maxQueueSize: number = 100; - private messageTimeout: number = 30000; private heartbeatTimeout: number = 60000; - private maxConsecutiveErrors: number = 5; - private consecutiveErrors: number = 0; private isShuttingDown: boolean = false; - private pendingRequests: Map void>; + private pendingRequests: Map; static signalNodes = new Map(); @@ -68,26 +47,13 @@ class SignalNode extends EventEmitter { this.connectionId = connectionId || this.generateConnectionId(); this.call = call; this.metadata = call.metadata; - this.connectionState = ConnectionState.CONNECTING; + this.connectionState = ConnectionState.Connecting; this.pendingRequests = new Map(); - const now = Date.now(); - this.metrics = { - connectedAt: now, - lastActivity: now, - lastHeartbeat: now, - messagesSent: 0, - messagesReceived: 0, - errors: 0, - heartbeatsReceived: 0, - heartbeatsMissed: 0, - }; + this.lastHeartbeat = Date.now(); // Add to static collection with duplicate handling if (SignalNode.signalNodes.has(id)) { - console.warn( - `⚠️ SignalNode with ID ${id} already exists, removing old instance` - ); const oldNode = SignalNode.signalNodes.get(id); oldNode?.forceDisconnect('duplicate_connection'); } @@ -97,7 +63,7 @@ class SignalNode extends EventEmitter { this.initialize(); console.log( - `βœ… New SignalNode created - ID: ${this.id}, Connection: ${this.connectionId}` + `New SignalNode created - ID: ${this.id}, Connection: ${this.connectionId}` ); } @@ -105,10 +71,10 @@ class SignalNode extends EventEmitter { try { this.setupMessageHandlers(); this.setupHeartbeat(); - this.setState(ConnectionState.CONNECTED); + this.setState(ConnectionState.Connected); this.sendConnectionConfirmation(); } catch (error) { - console.error(`❌ Error initializing SignalNode ${this.id}:`, error); + console.error(`Error initializing SignalNode ${this.id}:`, error); this.handleError(error as Error, 'initialization_error'); } } @@ -121,51 +87,12 @@ class SignalNode extends EventEmitter { if (this.connectionState !== newState) { const oldState = this.connectionState; this.connectionState = newState; - console.log(`πŸ“‘ SignalNode ${this.id} state: ${oldState} -> ${newState}`); + console.log(`SignalNode ${this.id} state: ${oldState} -> ${newState}`); this.emit('stateChanged', { oldState, newState, nodeId: this.id }); } } - private setupHeartbeat(): void { - this.clearHeartbeat(); - - this.heartbeatInterval = setInterval(() => { - if (!this.isActive()) { - return; - } - - const timeSinceLastActivity = Date.now() - this.metrics.lastActivity; - // const timeSinceLastHeartbeat = Date.now() - this.metrics.lastHeartbeat; - - // Check for stale connection - if (timeSinceLastActivity > this.heartbeatTimeout) { - console.warn( - `πŸ’” Connection ${this.id} is stale (${timeSinceLastActivity}ms since last activity)` - ); - this.metrics.heartbeatsMissed++; - - if (this.metrics.heartbeatsMissed >= 3) { - this.handleError(new Error('Heartbeat timeout'), 'heartbeat_timeout'); - return; - } - } - - // Send heartbeat - try { - if ( - this.sendMessage(MSA.Heartbeat, { - timestamp: Date.now(), - connectionId: this.connectionId, - }) - ) { - this.metrics.lastHeartbeat = Date.now(); - } - } catch (error) { - console.error(`❌ Error sending heartbeat to ${this.id}:`, error); - this.handleError(error as Error, 'heartbeat_error'); - } - }, 30000); // Send heartbeat every 30 seconds - } + private setupHeartbeat(): void {} private clearHeartbeat(): void { if (this.heartbeatInterval) { @@ -186,23 +113,23 @@ class SignalNode extends EventEmitter { // Handle connection events this.call.on('end', () => { - console.log(`πŸ“€ Client ${this.id} ended the connection gracefully`); + console.log(`client ${this.id} ended the connection gracefully`); this.handleClientDisconnection('client_ended'); }); this.call.on('cancelled', () => { - console.log(`🚫 Client ${this.id} cancelled the connection`); + console.log(`Client ${this.id} cancelled the connection`); this.handleClientDisconnection('cancelled'); }); this.call.on('error', (error: Error) => { - console.error(`πŸ’₯ Stream error for client ${this.id}:`, error); + console.error(`Stream error for client ${this.id}:`, error); this.handleError(error, 'stream_error'); }); this.call.on('close', () => { - console.log(`πŸ”Œ Stream closed for client ${this.id}`); - if (this.connectionState === ConnectionState.CONNECTED) { + console.log(`Stream closed for client ${this.id}`); + if (this.connectionState === ConnectionState.Connected) { this.handleClientDisconnection('stream_closed'); } }); @@ -210,62 +137,45 @@ class SignalNode extends EventEmitter { private handleIncomingMessage(message: MessageRequest): void { try { - this.metrics.messagesReceived++; - this.metrics.lastActivity = Date.now(); - this.consecutiveErrors = 0; // Reset error count on successful message - const { action, args, requestId } = message; + if (!action) return; + + const parsedArgs = parseArguments(args); if (requestId?.length) { - const resolver = this.pendingRequests.get(requestId); - if (resolver) { + console.log('Got a request expecting response'); + const pendingRequest = this.pendingRequests.get(requestId); + if (pendingRequest) { // this means this instance initiated this request for response . // resolve and return - resolver(message); + if (parsedArgs.status === 'error') { + console.log(action, 'pending request Returned error'); + pendingRequest.reject(parsedArgs.error as Error); + } else { + console.log(action, 'pending request Returned success'); + pendingRequest.resolve(message); + } this.pendingRequests.delete(requestId); return; } } - if (!action) { - console.warn(`⚠️ Received message without action from ${this.id}`); - this.handleError( - new Error('Missing action in message'), - 'protocol_error' - ); - return; - } - - console.log(`πŸ“¨ Received message from ${this.id}: ${action}`); - - let parsedArgs: { [key: string]: unknown } = {}; - if (args) { - try { - parsedArgs = JSON.parse(args); - } catch (parseError) { - console.error( - `❌ Failed to parse message args from ${this.id}:`, - parseError - ); - this.handleError(parseError as Error, 'parse_error'); - return; - } - } + console.log(`Received message from ${this.id}: ${action}`); // Handle special system messages - if (action === MSA.Heartbeat) { + if (action === Actions.Heartbeat) { this.handleHeartbeat(parsedArgs); return; } // Find and execute handler - const handler = this.actionHandlers[action as MSA]; + const handler = this.actionHandlers[action as Actions]; if (handler) { try { - handler(parsedArgs); + handler(parsedArgs, requestId); } catch (handlerError) { console.error( - `❌ Error in handler for action ${action} from ${this.id}:`, + ` Error in handler for action ${action} from ${this.id}:`, handlerError ); this.handleError(handlerError as Error, 'handler_error'); @@ -286,72 +196,43 @@ class SignalNode extends EventEmitter { timestamp: Date.now(), }); } catch (error) { - console.error(`πŸ’₯ Error handling message from ${this.id}:`, error); + console.error(`Error handling message from ${this.id}:`, error); this.handleError(error as Error, 'message_handling_error'); } } private handleHeartbeat(args: { [key: string]: unknown }): void { console.log(args); - this.metrics.heartbeatsReceived++; - this.metrics.heartbeatsMissed = 0; // Reset missed heartbeats // Respond to heartbeat - this.sendMessage(MSA.HeartbeatAck, { + this.sendMessage(Actions.HeartbeatAck, { timestamp: Date.now(), connectionId: this.connectionId, - metrics: { - messagesSent: this.metrics.messagesSent, - messagesReceived: this.metrics.messagesReceived, - uptime: Date.now() - this.metrics.connectedAt, - }, }); } private handleError(error: Error, context: string): void { - this.metrics.errors++; - this.consecutiveErrors++; - - console.error( - `πŸ’₯ SignalNode ${this.id} error [${context}]:`, - error.message - ); + console.error(`SignalNode ${this.id} error [${context}]:`, error.message); this.emit('error', { nodeId: this.id, error, context, - consecutiveErrors: this.consecutiveErrors, timestamp: Date.now(), }); - - // Disconnect if too many consecutive errors - if (this.consecutiveErrors >= this.maxConsecutiveErrors) { - console.error( - `🚫 Too many consecutive errors (${this.consecutiveErrors}) for ${this.id}, disconnecting` - ); - this.handleClientDisconnection('too_many_errors', error); - } } private handleClientDisconnection(reason: string, error?: Error): void { if (this.isShuttingDown) { return; // Already handled } - - console.log( - `πŸ”Œ Client ${this.id} disconnected (${reason})`, - error ? `: ${error.message}` : '' - ); - - this.setState(ConnectionState.DISCONNECTED); + this.setState(ConnectionState.Disconnected); this.cleanup(); this.emit('disconnected', { nodeId: this.id, reason, error, - metrics: this.getMetrics(), timestamp: Date.now(), }); } @@ -362,21 +243,18 @@ class SignalNode extends EventEmitter { // Clear heartbeat this.clearHeartbeat(); - // Clear message queue - this.messageQueue = []; - // Remove from static collection SignalNode.signalNodes.delete(this.id); // Remove all listeners this.removeAllListeners(); - console.log(`🧹 Cleaned up SignalNode ${this.id}`); + console.log(`Cleaned up SignalNode ${this.id}`); } private async sendConnectionConfirmation(): Promise { const rtpCapabilities = await mediaSoupServer.getRouterRtpCapabilities(); - this.sendMessage(MSA.Connected, { + this.sendMessage(Actions.Connected, { status: 'success', nodeId: this.id, connectionId: this.connectionId, @@ -388,7 +266,7 @@ class SignalNode extends EventEmitter { } // Public methods - sendMessage(action: MSA, args?: { [key: string]: unknown }): boolean { + sendMessage(action: Actions, args?: { [key: string]: unknown }): boolean { if (!this.isActive()) { console.warn(`⚠️ Cannot send message to ${this.id}: node is inactive`); return false; @@ -408,22 +286,20 @@ class SignalNode extends EventEmitter { this.call.write(message); - this.metrics.messagesSent++; - this.metrics.lastActivity = Date.now(); grpcServer.incrementMessageStats(1, 0); - console.log(`πŸ“€ Sent ${action} to ${this.id} (${messageId})`); + console.log(`Sent ${action} to ${this.id} (${messageId})`); return true; } catch (error) { - console.error(`❌ Error sending message to ${this.id}:`, error); + console.error(`Error sending message to ${this.id}:`, error); this.handleError(error as Error, 'send_message_error'); return false; } } async sendMessageForResponse( - action: MSA, + action: Actions, args?: { [key: string]: unknown } ): Promise { if (!this.call) { @@ -441,91 +317,90 @@ class SignalNode extends EventEmitter { requestId, }; - return new Promise(resolve => { + return new Promise((resolve, reject) => { if (this.call) { - this.pendingRequests.set(requestId, resolve); // save resolve + this.pendingRequests.set(requestId, { + resolve, + reject, + }); // save resolve this.call.write(message); } }); } catch (error) { - console.error(`❌ Error sending message to MediaNode ${this.id}:`, error); + console.error(`Error sending message to MediaNode ${this.id}:`, error); throw error; } } - - queueMessage(action: MSA, args?: { [key: string]: unknown }): boolean { - if (this.messageQueue.length >= this.maxQueueSize) { + sendResponse( + action: Actions, + requestId: string, + response: { [key: string]: unknown } + ): void { + if (!this.call) { console.warn( - `⚠️ Message queue full for ${this.id}, dropping oldest message` + `⚠️Cannot send message to MediaNode ${this.id}: not connected` ); - this.messageQueue.shift(); // Remove oldest message + return; } - const messageItem: MessageQueueItem = { - action, - args, - timestamp: Date.now(), - retries: 0, - id: this.generateMessageId(), - }; - - this.messageQueue.push(messageItem); - console.log( - `πŸ“ Queued message ${action} for ${this.id} (queue size: ${this.messageQueue.length})` - ); + try { + const message: MessageRequest = { + action, + requestId, + args: JSON.stringify({ + status: 'success', + response, + }), + }; - return true; + this.call.write(message); + } catch (error) { + console.error(`Error sending message to MediaNode ${this.id}:`, error); + throw error; + } } - - flushMessageQueue(): number { - if (!this.isActive() || this.messageQueue.length === 0) { - return 0; + sendError(action: Actions, requestId: string, error: Error | unknown): void { + if (!this.call) { + console.warn( + `⚠️Cannot send message to MediaNode ${this.id}: not connected` + ); + return; } + try { + const message: MessageRequest = { + action, + requestId, + args: JSON.stringify({ + status: 'error', + error, + }), + }; - let sentCount = 0; - const messages = [...this.messageQueue]; - this.messageQueue = []; - - messages.forEach(messageItem => { - if (this.sendMessage(messageItem.action, messageItem.args)) { - sentCount++; - } else { - // Re-queue failed messages if under retry limit - if (messageItem.retries < 3) { - messageItem.retries++; - this.messageQueue.push(messageItem); - } - } - }); - - if (sentCount > 0) { - console.log(`πŸ“€ Flushed ${sentCount} queued messages for ${this.id}`); + this.call.write(message); + } catch (error) { + console.error(`Error sending message to MediaNode ${this.id}:`, error); + throw error; } - - return sentCount; } async gracefulDisconnect( reason: string = 'graceful_shutdown' ): Promise { - if (this.connectionState === ConnectionState.DISCONNECTED) { + if (this.connectionState === ConnectionState.Disconnected) { return; } console.log(`πŸ‘‹ Gracefully disconnecting ${this.id} (${reason})`); - this.setState(ConnectionState.DISCONNECTING); + this.setState(ConnectionState.Disconnecting); try { // Send disconnect notification - this.sendMessage(MSA.ServerShutdown, { + this.sendMessage(Actions.ServerShutdown, { message: 'Server is shutting down', reason, timestamp: Date.now(), }); - // Flush any remaining messages - this.flushMessageQueue(); - // Wait a bit for messages to be sent await new Promise(resolve => setTimeout(resolve, 1000)); @@ -544,7 +419,7 @@ class SignalNode extends EventEmitter { } forceDisconnect(reason: string = 'force_disconnect'): void { - console.log(`πŸ’₯ Force disconnecting ${this.id} (${reason})`); + console.log(`Force disconnecting ${this.id} (${reason})`); try { if (this.call) { @@ -560,64 +435,18 @@ class SignalNode extends EventEmitter { // Utility methods isActive(): boolean { return ( - this.connectionState === ConnectionState.CONNECTED && !this.isShuttingDown + this.connectionState === ConnectionState.Connected && !this.isShuttingDown ); } - isStale(threshold: number = 300000): boolean { - // 5 minutes default - return Date.now() - this.metrics.lastActivity > threshold; - } - - getMetrics(): ConnectionMetrics { - return { ...this.metrics }; - } - getState(): ConnectionState { return this.connectionState; } - getUptime(): number { - return Date.now() - this.metrics.connectedAt; - } - - getQueueSize(): number { - return this.messageQueue.length; - } - private generateMessageId(): string { return `msg_${Date.now()}_${Math.random().toString(36).substr(2, 6)}`; } - // Action handlers for different message types - private actionHandlers: { - [key in MSA]?: ( - args: { [key: string]: unknown }, - requestId?: string - ) => void; - } = { - [MSA.Connected]: args => { - console.log(`βœ… Connection confirmed from ${this.id}:`, args); - this.emit('connectionConfirmed', { nodeId: this.id, args }); - }, - - [MSA.Ping]: (args, requestId) => { - console.log('Signal Server Pinged Mediaserver'); - this.call.write({ - action: MSA.Pong, - args: JSON.stringify(args), - requestId, - }); - }, - - [MSA.HeartbeatAck]: args => { - console.log(`πŸ’— Heartbeat acknowledged by ${this.id}`, args); - this.metrics.lastActivity = Date.now(); - }, - - // Add more handlers as needed for your specific actions - }; - // Static methods for managing all nodes static getNodes(): SignalNode[] { return Array.from(SignalNode.signalNodes.values()); @@ -659,136 +488,466 @@ class SignalNode extends EventEmitter { console.log('βœ… All signal nodes disconnected'); } - static broadcastMessage( - action: MSA, - args?: { [key: string]: unknown }, - options?: { - excludeIds?: string[]; - onlyActive?: boolean; - queueIfUnavailable?: boolean; - } - ): { sent: number; queued: number; failed: number } { - const opts = { - excludeIds: [], - onlyActive: true, - queueIfUnavailable: false, - ...options, - }; - - let nodes = Array.from(SignalNode.signalNodes.values()); - - // Filter nodes based on options - if (opts.excludeIds.length > 0) { - nodes = nodes.filter(node => !opts.excludeIds!.includes(node.id)); - } + static broadcastMessage(): void {} + + private async createTransport( + router: mediasoupTypes.Router, + type: TransportKind = 'consumer' + ): Promise { + if (router.appData.webRtcServer) throw 'Webrtc server not found'; + const webRtcServer = router.appData + .webRtcServer as mediasoupTypes.WebRtcServer; + const transport = await router.createWebRtcTransport({ + webRtcServer, + appData: { + isConsumer: type === 'consumer', + isProducer: type === 'producer', + }, + }); + + return transport; + } - if (opts.onlyActive) { - nodes = nodes.filter(node => node.isActive()); + private async createConsumer({ + consumingPeer, + producerPeerId, + producer, + room, + }: { + consumingPeer: Peer; + producerPeerId: string; + producer: mediasoupTypes.Producer; + room: Room; + }): Promise { + try { + if ( + !consumingPeer.getRouter().canConsume({ + producerId: producer.id, + rtpCapabilities: consumingPeer.getDeviceRTPCapabilities(), + }) + ) { + return console.log(`Can not consmer with producerId - ${producer.id}`); + } + // GET CONSUMER PEER CONSUMER TRANSPORT + const transport = consumingPeer + .getTransports() + .find(tp => tp.appData.isConsumer === true); + + if (!transport) return; + + const consumer = await transport.consume({ + producerId: producer.id, + rtpCapabilities: consumingPeer.getDeviceRTPCapabilities(), + paused: true, + appData: producer.appData, + }); + // Find out on this + if (producer.kind === 'audio' && producer.appData.source === 'mic') + await consumer.setPriority(255); + + consumingPeer.addConsumer(consumer); + + const options = { + peerId: consumingPeer.id, + peerType: consumingPeer.type, + meetingId: room.roomId, + consumerId: consumer.id, + producerPeerId, + producerSource: producer.appData.source, + fromProducer: true, + }; + + consumer.observer.on('close', () => { + consumingPeer.removeConsumer(consumer.id); + consumingPeer.sendMessage(Actions.CloseConsumer, options); + }); + + consumer.on('producerpause', () => { + consumingPeer.sendMessage(Actions.PauseConsumer, options); + }); + + consumer.on('producerresume', () => { + consumingPeer.sendMessage(Actions.ResumeConsumer, options); + }); + + consumingPeer.sendMessageForResponse(Actions.CreateConsumer, { + peerId: consumingPeer.id, + peerType: consumingPeer.type, + roomId: room.roomId, + producerPeerId: producerPeerId, + producerId: producer.id, + transportId: transport.id, + producerSource: producer.appData.source, + id: consumer.id, + kind: consumer.kind, + rtpParameters: consumer.rtpParameters, + type: consumer.type, + producerPaused: consumer.producerPaused, + + appData: { + ...producer.appData, + producerPeerId, + transportId: transport.id, + }, + }); + console.log('Create consumer ---', producer.appData.source); + } catch (error) { + // callback('createConsumersForExistingPeers fialed') + console.error('createConsumer fialed ', { error }); } + } - let sent = 0; - let queued = 0; - let failed = 0; + // Action handlers for different message types + private actionHandlers: { + [key in Actions]?: ( + args: { [key: string]: unknown }, + requestId?: string + ) => void; + } = { + [Actions.Connected]: args => { + console.log(`βœ… Connection confirmed from ${this.id}:`, args); + this.emit('connectionConfirmed', { nodeId: this.id, args }); + }, - nodes.forEach(node => { - if (node.isActive()) { - if (node.sendMessage(action, args)) { - sent++; - } else { - failed++; - } - } else if (opts.queueIfUnavailable) { - if (node.queueMessage(action, args)) { - queued++; - } else { - failed++; + [Actions.Ping]: (args, requestId) => { + console.log('Signal Server Pinged Mediaserver requestId', requestId); + this.call.write({ + action: Actions.Pong, + args: JSON.stringify(args), + requestId, + }); + }, + + [Actions.HeartbeatAck]: args => { + console.log(`πŸ’— Heartbeat acknowledged by ${this.id}`, args); + }, + + // Add more handlers as needed for your specific actions + + [Actions.CreatePeer]: async (args, requestId) => { + try { + if (!requestId) throw 'Request Id requested'; + + const data = ValidationSchema.createPeer.parse(args); + const { roomId, peerId, peerType, rtpCapabilities } = data; + const room = Room.getRoom(roomId) ?? (await Room.create(roomId)); + + const router = await room.assignRouterToPeer(); + if (!router) throw 'Router not assigned to peer'; + const peer = new Peer({ + id: peerId, + roomId, + router, + rtpCapabilities, + signalnode: this, + type: peerType, + }); + + room.addPeer(peer); + this.sendResponse(Actions.CreatePeer, requestId, { + routerId: router.id, + }); + } catch (error) { + console.log(error); + this.sendError(Actions.CreatePeer, requestId as string, error); + } + }, + + [Actions.ClosePeer]: async args => { + try { + const data = ValidationSchema.roomIdPeerId.parse(args); + const { roomId, peerId } = data; + const room = Room.getRoom(roomId); + const peer = room?.getPeer(peerId); + peer?.close(); + + console.log('Close Peer'); + } catch (error) { + console.log(error); + } + }, + + [Actions.CreateWebrtcTransports]: async (args, requestId) => { + try { + if (!requestId) throw 'Request Id requested'; + const data = ValidationSchema.roomIdPeerId.parse(args); + const { roomId, peerId } = data; + const room = Room.getRoom(roomId); + const peer = room?.getPeer(peerId); + + if (!peer) + throw 'Failed to create webrtc transport: Peer/room not found'; + + const producerTransport = await this.createTransport( + peer.getRouter(), + 'producer' + ); + const consumerTransport = await this.createTransport( + peer.getRouter(), + 'consumer' + ); + + peer.addTransport(producerTransport); + peer.addTransport(consumerTransport); + + this.sendResponse(Actions.CreateWebrtcTransports, requestId, { + producerTransportParams: { + id: producerTransport.id, + iceParameters: producerTransport.iceParameters, + iceCandidates: producerTransport.iceCandidates, + dtlsParameters: producerTransport.dtlsParameters, + sctpParameters: producerTransport.sctpParameters, + }, + consumerTransportParams: { + id: consumerTransport.id, + iceParameters: consumerTransport.iceParameters, + iceCandidates: consumerTransport.iceCandidates, + dtlsParameters: consumerTransport.dtlsParameters, + sctpParameters: consumerTransport.sctpParameters, + }, + }); + console.log('Close Peer'); + } catch (error) { + console.log(error); + this.sendError(Actions.CreatePeer, requestId as string, error); + } + }, + + [Actions.ConnectWebrtcTransports]: async args => { + try { + const data = ValidationSchema.connectWebRtcTransport.parse(args); + const { roomId, peerId, transportId, dtlsParameters } = data; + const room = Room.getRoom(roomId); + const peer = room?.getPeer(peerId); + + if (!peer) + throw 'Failed to create webrtc transport: Peer/room not found'; + const transport = peer.getTransport(transportId); + if (!transport) throw 'Transport was not found'; + transport.connect({ dtlsParameters }); + } catch (error) { + console.log(error); + } + }, + + [Actions.CreateConsumersOfAllProducers]: async args => { + try { + const data = ValidationSchema.roomIdPeerId.parse(args); + const { roomId, peerId } = data; + const room = Room.getRoom(roomId); + const peer = room?.getPeer(peerId); + + if (!room || !peer) + throw 'Failed to create webrtc transport: Peer/room not found'; + const existingPeers = room.getPeers(); + + // create consumer from producers of existing peers + existingPeers.forEach(existingPeer => { + // ingore the peer that requested this + if (existingPeer.id === peerId) return; + const peerProducers = existingPeer.getProducers(); + peerProducers.forEach(producer => { + this.createConsumer({ + consumingPeer: peer, + producerPeerId: existingPeer.id, + producer: producer, + room, + }).catch(error => { + console.log(error); + }); + }); + }); + + // create consumer from producer in connected media + const medianodes = room.getMediaNodes(); + medianodes.forEach(medianode => { + const producers = medianode.getProducers(); + producers.forEach(producer => { + room + .createPipeConsumer({ + consumingMediaNode: medianode, + producer: producer, + producerPeerId: producer.appData.peerId as string, + }) + .catch(error => { + console.error('create pipeConsumer fialed', { error }); + }); + }); + }); + } catch (error) { + console.log(error); + } + }, + + [Actions.CreateProducer]: async (args, requestId) => { + try { + if (!requestId) throw 'Request Id requested'; + + const data = ValidationSchema.createProducer.parse(args); + const { roomId, peerId, rtpParameters, kind, transportId, appData } = + data; + + const room = Room.getRoom(roomId); + const peer = room?.getPeer(peerId); + if (!room || !peer) + throw 'Failed to create webrtc transport: Peer/room not found'; + + const source = appData.source as ProducerSource; + peer.closeProducersBySource({ room, source }); + + const transport = peer.getTransport(transportId); + if (!transport) throw 'Transport not found'; + const producer = await transport.produce({ + kind, + rtpParameters, + appData: { ...appData, peerId }, + }); + peer.addProducer(producer); + + this.sendResponse(Actions.CreateProducer, requestId, { + producerId: producer.id, + }); + + // Pipe Producer From This Router to other routers + const routersToPipeTo = room.getRoutersToPipeTo(peer.getRouter()); + routersToPipeTo.forEach(router => { + peer + .pipeToRouter({ + router, + producerId: producer.id, + }) + .catch(error => console.log(error)); + }); + + // Create server-side consumer for each existing peers + const existingPeers = room.getPeers(); + existingPeers.forEach(consumingPeer => { + if (consumingPeer.id === peerId) return; + this.createConsumer({ + consumingPeer, + producer: producer, + room, + producerPeerId: producer.appData.peerId as string, + }).catch(error => { + console.error('create pipeConsumer fialed', { error }); + }); + }); + + // create PipeConsumer for all connected MediaNode + const medianodes = room.getMediaNodes(); + medianodes.forEach(medianode => { + room + .createPipeConsumer({ + producer, + producerPeerId: peer.id, + consumingMediaNode: medianode, + }) + .catch(error => { + console.error('newPipeconsumer createPipeConsumer failed', error); + }); + }); + + if (producer.kind === 'audio' && appData.source === 'mic') { + const audioLevelObserver = room.audioLevelObservers.get( + peer.getRouter().id + ); + if (audioLevelObserver) { + audioLevelObserver.addProducer({ producerId: producer.id }); + } } - } else { - failed++; + } catch (error) { + console.log(error); + this.sendError(Actions.CreateProducer, requestId as string, error); } - }); + }, - console.log( - `πŸ“’ Broadcast ${action}: ${sent} sent, ${queued} queued, ${failed} failed` - ); - return { sent, queued, failed }; - } + [Actions.CloseProducer]: args => { + try { + const data = ValidationSchema.manageProducer.parse(args); + const { peerId, roomId, source } = data; + const room = Room.getRoom(roomId); + const peer = room?.getPeer(peerId); - static getConnectionStats(): { - total: number; - active: number; - connecting: number; - disconnecting: number; - disconnected: number; - error: number; - } { - const nodes = Array.from(SignalNode.signalNodes.values()); + if (!room || !peer) + throw 'Failed to create webrtc transport: Peer/room not found'; - return { - total: nodes.length, - active: nodes.filter(n => n.connectionState === ConnectionState.CONNECTED) - .length, - connecting: nodes.filter( - n => n.connectionState === ConnectionState.CONNECTING - ).length, - disconnecting: nodes.filter( - n => n.connectionState === ConnectionState.DISCONNECTING - ).length, - disconnected: nodes.filter( - n => n.connectionState === ConnectionState.DISCONNECTED - ).length, - error: nodes.filter(n => n.connectionState === ConnectionState.ERROR) - .length, - }; - } + peer.closeProducersBySource({ room, source }); + } catch (error) { + console.log(error); + } + }, - static getDetailedMetrics(): { - totalNodes: number; - activeNodes: number; - totalMessagesSent: number; - totalMessagesReceived: number; - totalErrors: number; - avgUptime: number; - nodes: Array<{ - id: string; - connectionId: string; - state: ConnectionState; - metrics: ConnectionMetrics; - uptime: number; - queueSize: number; - }>; - } { - const nodes = Array.from(SignalNode.signalNodes.values()); - const activeNodes = nodes.filter(n => n.isActive()); - - return { - totalNodes: nodes.length, - activeNodes: activeNodes.length, - totalMessagesSent: nodes.reduce( - (sum, node) => sum + node.metrics.messagesSent, - 0 - ), - totalMessagesReceived: nodes.reduce( - (sum, node) => sum + node.metrics.messagesReceived, - 0 - ), - totalErrors: nodes.reduce((sum, node) => sum + node.metrics.errors, 0), - avgUptime: - nodes.length > 0 - ? nodes.reduce((sum, node) => sum + node.getUptime(), 0) / - nodes.length - : 0, - nodes: nodes.map(node => ({ - id: node.id, - connectionId: node.connectionId, - state: node.connectionState, - metrics: node.getMetrics(), - uptime: node.getUptime(), - queueSize: node.getQueueSize(), - })), - }; - } + [Actions.PauseProducer]: args => { + try { + const data = ValidationSchema.manageProducer.parse(args); + const { peerId, roomId, source } = data; + const room = Room.getRoom(roomId); + const peer = room?.getPeer(peerId); + + if (!room || !peer) + throw 'Failed to create webrtc transport: Peer/room not found'; + + const producers = peer.getProducersBySource(source); + producers.forEach(producer => { + producer.pause().catch(error => { + console.log(error); + }); + }); + } catch (error) { + console.log(error); + } + }, + + [Actions.ResumeProducer]: args => { + try { + const data = ValidationSchema.manageProducer.parse(args); + const { peerId, roomId, producerId } = data; + const room = Room.getRoom(roomId); + const peer = room?.getPeer(peerId); + + if (!room || !peer) + throw 'Failed to create webrtc transport: Peer/room not found'; + + const producer = peer.getProducer(producerId); + producer?.resume(); + } catch (error) { + console.log(error); + } + }, + + [Actions.PauseConsumer]: args => { + console.log('PauseConsumer', args); + }, + + [Actions.ResumeConsumer]: args => { + console.log('ResumeConsumer', args); + }, + + [Actions.RestartIce]: async (args, requestId) => { + try { + if (!requestId) throw 'Require request id'; + const data = ValidationSchema.restartIce.parse(args); + const { roomId, peerId, transportId } = data; + + const room = Room.getRoom(roomId); + const peer = room?.getPeer(peerId); + + if (!room || !peer) + throw 'Failed to create webrtc transport: Peer/room not found'; + const transport = peer.getTransport(transportId); + if (!transport) return; + const iceParameters = await transport.restartIce(); + this.sendResponse(Actions.RestartIce, requestId, { + iceParameters, + }); + } catch (error) { + console.log(error); + this.sendError(Actions.RestartIce, requestId as string, error); + } + }, + }; } export default SignalNode; + export { ConnectionState, SignalNode }; diff --git a/src/types/actions.ts b/src/types/actions.ts index 343cea6..5b27ce2 100644 --- a/src/types/actions.ts +++ b/src/types/actions.ts @@ -1,35 +1,52 @@ -export enum SignalingClientActions { +export enum Actions { Message = 'message', - Heartbeat = 'heartbeat', Connected = 'connected', - JoinRoom = 'join-room', - JoinVisitors = 'join-visitors', - JoinWaiters = 'join-waiters', - GetRoomData = 'get-room-data', - GetRtpCapabilities = 'get-rtp-capabilities', -} -export enum PubSubActions { - Message = 'message', - EndMeeting = 'end-meeting', -} - -export enum ServiceActions { - Close = 'close', -} - -export enum MediaSignalingActions { - // Connection lifecycle - Connected = 'connected', - Disconnect = 'disconnect', - Reconnect = 'reconnect', + JoinRoom = 'join_room', + JoinVisitors = 'join_visitors', + JoinWaiters = 'join_waiters', + GetRoomData = 'get_room_data', + + GetRouterRtpCapabilities = 'get_router_rtp_capabilities', + CreateWebrtcTransports = 'create_webrtc_transports', + ConnectWebrtcTransports = 'connect_webrtc_transports', + CreateConsumersOfAllProducers = 'create_consumers_of_all_producers', + CreateProducer = 'create_producer', + CloseProducer = 'close_producer', + PauseProducer = 'pause_producer', + ResumeProducer = 'resume_producer', + CreateConsumer = 'create_consumer', + CloseConsumer = 'close_consumer', + ResumeConsumer = 'resume_consumer', + PauseConsumer = 'pause_consumer', + RestartIce = 'restart_ice', + + CreatePeer = 'create_peer', + ClosePeer = 'close_peer', + + Mute = 'mute', + OffCamera = 'off_camera', + StopScreen = 'stop_screen', + RaiseHand = 'raise_hand', + LowerHands = 'lower_hands', + SendChat = 'send_chat', + SendReaction = 'send_reaction', + GetWaiters = 'get_waiters', + AdmitWaiters = 'admit_waiters', + DeclineWaiters = 'decline_waiters', + Record = 'Record', + + RemovePeer = 'remove_peer', + AddRole = 'add_role', + RemoveRole = 'remove_role', + + EndMeeting = 'end_meeting', - // Health monitoring Heartbeat = 'heartbeat', HeartbeatAck = 'heartbeat_ack', Ping = 'ping', Pong = 'pong', - // Server management` + // Server management ServerShutdown = 'server_shutdown', ServerRestart = 'server_restart', @@ -37,20 +54,5 @@ export enum MediaSignalingActions { Error = 'error', ConnectionError = 'connection_error', - // Media specific actions (add your custom actions here) - MediaOffer = 'media_offer', - MediaAnswer = 'media_answer', - IceCandidate = 'ice_candidate', - MediaStreamStart = 'media_stream_start', - MediaStreamStop = 'media_stream_stop', - - // Room/channel management - JoinRoom = 'join_room', - LeaveRoom = 'leave_room', - RoomUpdate = 'room_update', - - // Custom actions placeholder - Custom = 'custom', - - RtpCapabilities = 'rtp_capabilities', + Close = 'close', } diff --git a/src/types/index.ts b/src/types/index.ts index e1effb1..bfa93e9 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -1,4 +1,5 @@ import { types as mediasoupTypes } from 'mediasoup'; +import { MessageResponse } from '../protos/gen/mediaSignalingPackage/MessageResponse'; export type ProducerSource = 'mic' | 'camera' | 'screen' | 'screenAudio'; @@ -155,6 +156,19 @@ export interface TransportConnectionParams { srtpParameters?: mediasoupTypes.SrtpParameters; } +export enum ConnectionState { + Connecting = 'connecting', + Connected = 'connected', + Disconnecting = 'disconnecting', + Disconnected = 'disconnected', + Error = 'error', +} + export type AppDataWithRouterId = mediasoupTypes.AppData & { routerId: string }; +export interface PendingRequest { + resolve: (response: MessageResponse) => void; + reject: (error: Error) => void; +} + // WorkerData, RouterData, TransportData, ConsumerData, Producer