Files
kestrelos/app/composables/useWebRTC.js
Keli Grubb 17f28401ba
All checks were successful
ci/woodpecker/push/push Pipeline was successful
minor: heavily simplify server and app content. unify styling (#4)
Co-authored-by: Madison Grubb <madison@elastiflow.com>
Reviewed-on: #4
2026-02-14 04:52:18 +00:00

206 lines
6.5 KiB
JavaScript

/** WebRTC/Mediasoup client utilities. */
import { logError, logWarn } from '../utils/logger.js'
const FETCH_OPTS = { credentials: 'include' }
export async function createMediasoupDevice(rtpCapabilities) {
if (typeof window === 'undefined') throw new TypeError('Mediasoup device can only be created in browser')
const { Device } = await import('mediasoup-client')
const device = new Device()
await device.load({ routerRtpCapabilities: rtpCapabilities })
return device
}
export function createWebSocketConnection(url) {
return new Promise((resolve, reject) => {
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'
const wsUrl = url.startsWith('ws') ? url : `${protocol}//${window.location.host}/ws`
const ws = new WebSocket(wsUrl)
ws.onopen = () => resolve(ws)
ws.onerror = () => reject(new Error('WebSocket connection failed'))
})
}
export function sendWebSocketMessage(ws, sessionId, type, data = {}) {
return new Promise((resolve, reject) => {
if (ws.readyState !== WebSocket.OPEN) {
reject(new Error('WebSocket not open'))
return
}
const messageId = `${Date.now()}-${Math.random()}`
const message = { sessionId, type, data, messageId }
const timeout = setTimeout(() => {
ws.removeEventListener('message', handler)
reject(new Error('WebSocket message timeout'))
}, 10000)
const handler = (event) => {
try {
const response = JSON.parse(event.data)
if (response.messageId === messageId || response.type) {
clearTimeout(timeout)
ws.removeEventListener('message', handler)
if (response.error) {
reject(new Error(response.error))
}
else {
resolve(response)
}
}
}
catch {
// Not our message, continue waiting
}
}
ws.addEventListener('message', handler)
ws.send(JSON.stringify(message))
})
}
function attachTransportHandlers(transport, transportParams, sessionId, label, { onConnectSuccess, onConnectFailure } = {}) {
transport.on('connect', async ({ dtlsParameters }, callback, errback) => {
try {
await $fetch('/api/live/webrtc/connect-transport', {
method: 'POST',
body: { sessionId, transportId: transportParams.id, dtlsParameters },
...FETCH_OPTS,
})
onConnectSuccess?.()
callback()
}
catch (err) {
logError(`useWebRTC: ${label} transport connect failed`, {
err: err?.message ?? String(err),
transportId: transportParams.id,
connectionState: transport.connectionState,
sessionId,
})
onConnectFailure?.(err)
errback(err)
}
})
transport.on('connectionstatechange', () => {
const state = transport.connectionState
if (['failed', 'disconnected', 'closed'].includes(state)) {
logWarn(`useWebRTC: ${label} transport connection state changed`, { state, transportId: transportParams.id, sessionId })
}
})
}
export async function createSendTransport(device, sessionId, options = {}) {
const transportParams = await $fetch('/api/live/webrtc/create-transport', {
method: 'POST',
body: { sessionId, isProducer: true },
...FETCH_OPTS,
})
const transport = device.createSendTransport({
id: transportParams.id,
iceParameters: transportParams.iceParameters,
iceCandidates: transportParams.iceCandidates,
dtlsParameters: transportParams.dtlsParameters,
})
attachTransportHandlers(transport, transportParams, sessionId, 'Send', options)
transport.on('produce', async ({ kind, rtpParameters }, callback, errback) => {
try {
const { id } = await $fetch('/api/live/webrtc/create-producer', {
method: 'POST',
body: { sessionId, transportId: transportParams.id, kind, rtpParameters },
...FETCH_OPTS,
})
callback({ id })
}
catch (err) {
logError('useWebRTC: Producer creation failed', { err: err?.message ?? String(err) })
errback(err)
}
})
return transport
}
export async function createRecvTransport(device, sessionId) {
const transportParams = await $fetch('/api/live/webrtc/create-transport', {
method: 'POST',
body: { sessionId, isProducer: false },
...FETCH_OPTS,
})
const transport = device.createRecvTransport({
id: transportParams.id,
iceParameters: transportParams.iceParameters,
iceCandidates: transportParams.iceCandidates,
dtlsParameters: transportParams.dtlsParameters,
})
attachTransportHandlers(transport, transportParams, sessionId, 'Recv')
return transport
}
export async function consumeProducer(transport, device, sessionId) {
const consumerParams = await $fetch('/api/live/webrtc/create-consumer', {
method: 'POST',
body: { sessionId, transportId: transport.id, rtpCapabilities: device.rtpCapabilities },
...FETCH_OPTS,
})
const consumer = await transport.consume({
id: consumerParams.id,
producerId: consumerParams.producerId,
kind: consumerParams.kind,
rtpParameters: consumerParams.rtpParameters,
})
if (!consumer.track) {
logWarn('useWebRTC: Consumer created but no track immediately', { consumerId: consumer.id })
await waitForCondition(() => consumer.track, 3000, 100)
if (!consumer.track) {
logError('useWebRTC: Track did not become available after 3s', { consumerId: consumer.id })
}
}
return consumer
}
function waitForCondition(condition, timeoutMs = 3000, intervalMs = 100) {
return new Promise((resolve) => {
const timeoutId = setTimeout(() => {
clearInterval(intervalId)
resolve()
}, timeoutMs)
const intervalId = setInterval(() => {
if (condition()) {
clearTimeout(timeoutId)
clearInterval(intervalId)
resolve()
}
}, intervalMs)
if (condition()) {
clearTimeout(timeoutId)
clearInterval(intervalId)
resolve()
}
})
}
export function waitForConnectionState(transport, timeoutMs = 10000) {
const terminal = ['connected', 'failed', 'disconnected', 'closed']
return new Promise((resolve) => {
let tid
const handler = () => {
const state = transport.connectionState
if (terminal.includes(state)) {
transport.off('connectionstatechange', handler)
if (tid) clearTimeout(tid)
resolve(state)
}
}
transport.on('connectionstatechange', handler)
handler()
tid = setTimeout(() => {
transport.off('connectionstatechange', handler)
resolve(transport.connectionState)
}, timeoutMs)
})
}