/** * WebRTC composable for Mediasoup client operations. * Handles device initialization, transport creation, and WebSocket signaling. */ import { logError, logWarn } from '../utils/logger.js' /** * Initialize Mediasoup device from router RTP capabilities. * @param {object} rtpCapabilities * @returns {Promise} Mediasoup device */ export async function createMediasoupDevice(rtpCapabilities) { // Dynamically import mediasoup-client only in browser if (typeof window === 'undefined') { throw new TypeError('Mediasoup device can only be created in browser') } // Use dynamic import for mediasoup-client const { Device } = await import('mediasoup-client') const device = new Device() await device.load({ routerRtpCapabilities: rtpCapabilities }) return device } /** * Create WebSocket connection for signaling. * @param {string} url - WebSocket URL (e.g., 'ws://localhost:3000/ws') * @returns {Promise} WebSocket connection */ export function createWebSocketConnection(url) { return new Promise((resolve, reject) => { const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:' const wsUrl = url.startsWith('ws') ? url : `${protocol}//${window.location.host}/ws` const ws = new WebSocket(wsUrl) ws.onopen = () => { resolve(ws) } ws.onerror = () => { reject(new Error('WebSocket connection failed')) } ws.onclose = () => { // Connection closed } }) } /** * Send WebSocket message and wait for response. * @param {WebSocket} ws * @param {string} sessionId * @param {string} type * @param {object} data * @returns {Promise} Response message */ export function sendWebSocketMessage(ws, sessionId, type, data = {}) { return new Promise((resolve, reject) => { if (ws.readyState !== WebSocket.OPEN) { reject(new Error('WebSocket not open')) return } const messageId = `${Date.now()}-${Math.random()}` const message = { sessionId, type, data, messageId } const timeout = setTimeout(() => { ws.removeEventListener('message', handler) reject(new Error('WebSocket message timeout')) }, 10000) const handler = (event) => { try { const response = JSON.parse(event.data) if (response.messageId === messageId || response.type) { clearTimeout(timeout) ws.removeEventListener('message', handler) if (response.error) { reject(new Error(response.error)) } else { resolve(response) } } } catch { // Not our message, continue waiting } } ws.addEventListener('message', handler) ws.send(JSON.stringify(message)) }) } /** * Create send transport (for publisher). * @param {object} device * @param {string} sessionId * @param {{ onConnectSuccess?: () => void, onConnectFailure?: (err: Error) => void }} [options] - Optional callbacks when transport connect succeeds or fails. * @returns {Promise} Transport with send method */ export async function createSendTransport(device, sessionId, options = {}) { const { onConnectSuccess, onConnectFailure } = options // Create transport via HTTP API const transportParams = await $fetch('/api/live/webrtc/create-transport', { method: 'POST', body: { sessionId, isProducer: true }, credentials: 'include', }) const transport = device.createSendTransport({ id: transportParams.id, iceParameters: transportParams.iceParameters, iceCandidates: transportParams.iceCandidates, dtlsParameters: transportParams.dtlsParameters, }) transport.on('connect', async ({ dtlsParameters }, callback, errback) => { try { await $fetch('/api/live/webrtc/connect-transport', { method: 'POST', body: { sessionId, transportId: transportParams.id, dtlsParameters }, credentials: 'include', }) onConnectSuccess?.() callback() } catch (err) { logError('useWebRTC: Send transport connect failed', { err: err.message || String(err), transportId: transportParams.id, connectionState: transport.connectionState, sessionId, }) onConnectFailure?.(err) errback(err) } }) transport.on('connectionstatechange', () => { const state = transport.connectionState if (state === 'failed' || state === 'disconnected' || state === 'closed') { logWarn('useWebRTC: Send transport connection state changed', { state, transportId: transportParams.id, sessionId, }) } }) transport.on('produce', async ({ kind, rtpParameters }, callback, errback) => { try { const { id } = await $fetch('/api/live/webrtc/create-producer', { method: 'POST', body: { sessionId, transportId: transportParams.id, kind, rtpParameters }, credentials: 'include', }) callback({ id }) } catch (err) { logError('useWebRTC: Producer creation failed', { err: err.message || String(err) }) errback(err) } }) return transport } /** * Create receive transport (for viewer). * @param {object} device * @param {string} sessionId * @returns {Promise} Transport with consume method */ export async function createRecvTransport(device, sessionId) { // Create transport via HTTP API const transportParams = await $fetch('/api/live/webrtc/create-transport', { method: 'POST', body: { sessionId, isProducer: false }, credentials: 'include', }) const transport = device.createRecvTransport({ id: transportParams.id, iceParameters: transportParams.iceParameters, iceCandidates: transportParams.iceCandidates, dtlsParameters: transportParams.dtlsParameters, }) // Set up connect handler (will be called by mediasoup-client when needed) transport.on('connect', async ({ dtlsParameters }, callback, errback) => { try { await $fetch('/api/live/webrtc/connect-transport', { method: 'POST', body: { sessionId, transportId: transportParams.id, dtlsParameters }, credentials: 'include', }) callback() } catch (err) { logError('useWebRTC: Recv transport connect failed', { err: err.message || String(err), transportId: transportParams.id, connectionState: transport.connectionState, sessionId, }) errback(err) } }) transport.on('connectionstatechange', () => { const state = transport.connectionState if (state === 'failed' || state === 'disconnected' || state === 'closed') { logWarn('useWebRTC: Recv transport connection state changed', { state, transportId: transportParams.id, sessionId, }) } }) return transport } /** * Consume producer's stream (for viewer). * @param {object} transport * @param {object} device * @param {string} sessionId * @returns {Promise} Consumer with track */ export async function consumeProducer(transport, device, sessionId) { const rtpCapabilities = device.rtpCapabilities const consumerParams = await $fetch('/api/live/webrtc/create-consumer', { method: 'POST', body: { sessionId, transportId: transport.id, rtpCapabilities }, credentials: 'include', }) const consumer = await transport.consume({ id: consumerParams.id, producerId: consumerParams.producerId, kind: consumerParams.kind, rtpParameters: consumerParams.rtpParameters, }) if (!consumer.track) { logWarn('useWebRTC: Consumer created but no track immediately', { consumerId: consumer.id }) await waitForCondition(() => consumer.track, 3000, 100) if (!consumer.track) { logError('useWebRTC: Track did not become available after 3s', { consumerId: consumer.id }) } } return consumer } /** * Resolve when condition() returns truthy, or after timeoutMs (then resolve anyway). * No mutable shared state; cleanup on first completion. * @param {() => unknown} condition * @param {number} timeoutMs * @param {number} intervalMs * @returns {Promise} */ function waitForCondition(condition, timeoutMs = 3000, intervalMs = 100) { return new Promise((resolve) => { const timeoutId = setTimeout(() => { clearInterval(intervalId) resolve() }, timeoutMs) const intervalId = setInterval(() => { if (condition()) { clearTimeout(timeoutId) clearInterval(intervalId) resolve() } }, intervalMs) if (condition()) { clearTimeout(timeoutId) clearInterval(intervalId) resolve() } }) } /** * Wait for transport connection state to reach a terminal state or timeout. * @param {object} transport - Mediasoup transport with connectionState and on/off * @param {number} timeoutMs * @returns {Promise} Final connection state */ export function waitForConnectionState(transport, timeoutMs = 10000) { const terminal = ['connected', 'failed', 'disconnected', 'closed'] return new Promise((resolve) => { let tid const handler = () => { const state = transport.connectionState if (terminal.includes(state)) { transport.off('connectionstatechange', handler) if (tid) clearTimeout(tid) resolve(state) } } transport.on('connectionstatechange', handler) handler() tid = setTimeout(() => { transport.off('connectionstatechange', handler) resolve(transport.connectionState) }, timeoutMs) }) }