Co-authored-by: Madison Grubb <madison@elastiflow.com> Reviewed-on: #1
This commit was merged in pull request #1.
This commit is contained in:
@@ -26,7 +26,7 @@ export default defineEventHandler(async (event) => {
|
||||
const url = getRequestURL(event)
|
||||
const requestHost = url.hostname
|
||||
const router = await getRouter(sessionId)
|
||||
const { transport, params } = await createTransport(router, Boolean(isProducer), requestHost)
|
||||
const { transport, params } = await createTransport(router, requestHost)
|
||||
|
||||
if (isProducer) {
|
||||
updateLiveSession(sessionId, {
|
||||
|
||||
@@ -1,14 +1,11 @@
|
||||
import { getDb, closeDb } from '../utils/db.js'
|
||||
import { migrateFeedsToDevices } from '../utils/migrateFeedsToDevices.js'
|
||||
|
||||
/**
|
||||
* Initialize DB (and run bootstrap if no users) at server startup
|
||||
* so credentials are printed in the terminal before any request.
|
||||
* Initialize DB at server startup.
|
||||
* Close DB on server shutdown to avoid native sqlite3 crashes in worker teardown.
|
||||
*/
|
||||
// eslint-disable-next-line no-undef
|
||||
export default defineNitroPlugin((nitroApp) => {
|
||||
void getDb().then(() => migrateFeedsToDevices())
|
||||
void getDb()
|
||||
nitroApp.hooks.hook('close', () => {
|
||||
closeDb()
|
||||
})
|
||||
|
||||
@@ -86,7 +86,6 @@ export function broadcastToSession(sessionId, message) {
|
||||
}
|
||||
}
|
||||
|
||||
// eslint-disable-next-line no-undef
|
||||
export default defineNitroPlugin((nitroApp) => {
|
||||
nitroApp.hooks.hook('ready', async () => {
|
||||
const server = nitroApp.h3App.server || nitroApp.h3App.nodeServer
|
||||
|
||||
29
server/utils/bootstrap.js
vendored
Normal file
29
server/utils/bootstrap.js
vendored
Normal file
@@ -0,0 +1,29 @@
|
||||
import { randomBytes } from 'node:crypto'
|
||||
import { hashPassword } from './password.js'
|
||||
|
||||
const DEFAULT_ADMIN_IDENTIFIER = 'admin'
|
||||
const PASSWORD_CHARS = 'abcdefghjkmnopqrstuvwxyzABCDEFGHJKMNPQRSTUVWXYZ23456789'
|
||||
|
||||
const generateRandomPassword = () => {
|
||||
const bytes = randomBytes(14)
|
||||
return Array.from(bytes, b => PASSWORD_CHARS[b % PASSWORD_CHARS.length]).join('')
|
||||
}
|
||||
|
||||
export async function bootstrapAdmin(run, get) {
|
||||
const row = await get('SELECT COUNT(*) as n FROM users')
|
||||
if (row?.n !== 0) return
|
||||
|
||||
const email = process.env.BOOTSTRAP_EMAIL?.trim()
|
||||
const password = process.env.BOOTSTRAP_PASSWORD
|
||||
const identifier = (email && password) ? email : DEFAULT_ADMIN_IDENTIFIER
|
||||
const plainPassword = (email && password) ? password : generateRandomPassword()
|
||||
|
||||
await run(
|
||||
'INSERT INTO users (id, identifier, password_hash, role, created_at, auth_provider, oidc_issuer, oidc_sub) VALUES (?, ?, ?, ?, ?, ?, ?, ?)',
|
||||
[crypto.randomUUID(), identifier, hashPassword(plainPassword), 'admin', new Date().toISOString(), 'local', null, null],
|
||||
)
|
||||
|
||||
if (!email || !password) {
|
||||
console.log(`\n[KestrelOS] No bootstrap admin configured. Default admin created. Sign in at /login with:\n\n Identifier: ${identifier}\n Password: ${plainPassword}\n\n Set BOOTSTRAP_EMAIL and BOOTSTRAP_PASSWORD to use your own credentials on first run.\n`)
|
||||
}
|
||||
}
|
||||
@@ -2,154 +2,172 @@ import { join } from 'node:path'
|
||||
import { mkdirSync, existsSync } from 'node:fs'
|
||||
import { createRequire } from 'node:module'
|
||||
import { promisify } from 'node:util'
|
||||
import { randomBytes } from 'node:crypto'
|
||||
import { hashPassword } from './password.js'
|
||||
|
||||
const DEFAULT_ADMIN_IDENTIFIER = 'admin'
|
||||
const DEFAULT_PASSWORD_LENGTH = 14
|
||||
const PASSWORD_CHARS = 'abcdefghjkmnopqrstuvwxyzABCDEFGHJKMNPQRSTUVWXYZ23456789'
|
||||
|
||||
function generateRandomPassword() {
|
||||
const bytes = randomBytes(DEFAULT_PASSWORD_LENGTH)
|
||||
let s = ''
|
||||
for (let i = 0; i < DEFAULT_PASSWORD_LENGTH; i++) {
|
||||
s += PASSWORD_CHARS[bytes[i] % PASSWORD_CHARS.length]
|
||||
}
|
||||
return s
|
||||
}
|
||||
import { bootstrapAdmin } from './bootstrap.js'
|
||||
|
||||
const require = createRequire(import.meta.url)
|
||||
const sqlite3 = require('sqlite3')
|
||||
|
||||
const SCHEMA_VERSION = 2
|
||||
const DB_BUSY_TIMEOUT_MS = 5000
|
||||
|
||||
let dbInstance = null
|
||||
/** Set by tests to use :memory: or a temp path */
|
||||
let testPath = null
|
||||
|
||||
const USERS_SQL = `CREATE TABLE IF NOT EXISTS users (
|
||||
id TEXT PRIMARY KEY,
|
||||
identifier TEXT UNIQUE NOT NULL,
|
||||
password_hash TEXT NOT NULL,
|
||||
role TEXT NOT NULL DEFAULT 'member',
|
||||
created_at TEXT NOT NULL
|
||||
)`
|
||||
const SCHEMA = {
|
||||
schema_version: 'CREATE TABLE IF NOT EXISTS schema_version (version INTEGER PRIMARY KEY)',
|
||||
users: `CREATE TABLE IF NOT EXISTS users (
|
||||
id TEXT PRIMARY KEY,
|
||||
identifier TEXT UNIQUE NOT NULL,
|
||||
password_hash TEXT NOT NULL,
|
||||
role TEXT NOT NULL DEFAULT 'member',
|
||||
created_at TEXT NOT NULL
|
||||
)`,
|
||||
users_v2: `CREATE TABLE users_new (
|
||||
id TEXT PRIMARY KEY,
|
||||
identifier TEXT UNIQUE NOT NULL,
|
||||
password_hash TEXT,
|
||||
role TEXT NOT NULL DEFAULT 'member',
|
||||
created_at TEXT NOT NULL,
|
||||
auth_provider TEXT NOT NULL DEFAULT 'local',
|
||||
oidc_issuer TEXT,
|
||||
oidc_sub TEXT
|
||||
)`,
|
||||
users_oidc_index: `CREATE UNIQUE INDEX IF NOT EXISTS users_oidc_unique ON users(oidc_issuer, oidc_sub) WHERE oidc_issuer IS NOT NULL AND oidc_sub IS NOT NULL`,
|
||||
sessions: `CREATE TABLE IF NOT EXISTS sessions (
|
||||
id TEXT PRIMARY KEY,
|
||||
user_id TEXT NOT NULL,
|
||||
created_at TEXT NOT NULL,
|
||||
expires_at TEXT NOT NULL,
|
||||
FOREIGN KEY(user_id) REFERENCES users(id)
|
||||
)`,
|
||||
pois: `CREATE TABLE IF NOT EXISTS pois (
|
||||
id TEXT PRIMARY KEY,
|
||||
lat REAL NOT NULL,
|
||||
lng REAL NOT NULL,
|
||||
label TEXT NOT NULL DEFAULT '',
|
||||
icon_type TEXT NOT NULL DEFAULT 'pin'
|
||||
)`,
|
||||
devices: `CREATE TABLE IF NOT EXISTS devices (
|
||||
id TEXT PRIMARY KEY,
|
||||
name TEXT NOT NULL DEFAULT '',
|
||||
device_type TEXT NOT NULL,
|
||||
vendor TEXT,
|
||||
lat REAL NOT NULL,
|
||||
lng REAL NOT NULL,
|
||||
stream_url TEXT NOT NULL DEFAULT '',
|
||||
source_type TEXT NOT NULL DEFAULT 'mjpeg',
|
||||
config TEXT
|
||||
)`,
|
||||
}
|
||||
|
||||
const USERS_V2_SQL = `CREATE TABLE users_new (
|
||||
id TEXT PRIMARY KEY,
|
||||
identifier TEXT UNIQUE NOT NULL,
|
||||
password_hash TEXT,
|
||||
role TEXT NOT NULL DEFAULT 'member',
|
||||
created_at TEXT NOT NULL,
|
||||
auth_provider TEXT NOT NULL DEFAULT 'local',
|
||||
oidc_issuer TEXT,
|
||||
oidc_sub TEXT
|
||||
)`
|
||||
const USERS_OIDC_UNIQUE = `CREATE UNIQUE INDEX IF NOT EXISTS users_oidc_unique ON users(oidc_issuer, oidc_sub) WHERE oidc_issuer IS NOT NULL AND oidc_sub IS NOT NULL`
|
||||
const SESSIONS_SQL = `CREATE TABLE IF NOT EXISTS sessions (
|
||||
id TEXT PRIMARY KEY,
|
||||
user_id TEXT NOT NULL,
|
||||
created_at TEXT NOT NULL,
|
||||
expires_at TEXT NOT NULL,
|
||||
FOREIGN KEY(user_id) REFERENCES users(id)
|
||||
)`
|
||||
const POIS_SQL = `CREATE TABLE IF NOT EXISTS pois (
|
||||
id TEXT PRIMARY KEY,
|
||||
lat REAL NOT NULL,
|
||||
lng REAL NOT NULL,
|
||||
label TEXT NOT NULL DEFAULT '',
|
||||
icon_type TEXT NOT NULL DEFAULT 'pin'
|
||||
)`
|
||||
const DEVICES_SQL = `CREATE TABLE IF NOT EXISTS devices (
|
||||
id TEXT PRIMARY KEY,
|
||||
name TEXT NOT NULL DEFAULT '',
|
||||
device_type TEXT NOT NULL,
|
||||
vendor TEXT,
|
||||
lat REAL NOT NULL,
|
||||
lng REAL NOT NULL,
|
||||
stream_url TEXT NOT NULL DEFAULT '',
|
||||
source_type TEXT NOT NULL DEFAULT 'mjpeg',
|
||||
config TEXT
|
||||
)`
|
||||
|
||||
function getDbPath() {
|
||||
const getDbPath = () => {
|
||||
if (testPath) return testPath
|
||||
if (process.env.DB_PATH) return process.env.DB_PATH
|
||||
const dir = join(process.cwd(), 'data')
|
||||
if (!existsSync(dir)) mkdirSync(dir, { recursive: true })
|
||||
return join(dir, 'kestrelos.db')
|
||||
}
|
||||
|
||||
async function bootstrap(db) {
|
||||
if (testPath) return
|
||||
const row = await db.get('SELECT COUNT(*) as n FROM users')
|
||||
if (row?.n !== 0) return
|
||||
const email = process.env.BOOTSTRAP_EMAIL?.trim()
|
||||
const password = process.env.BOOTSTRAP_PASSWORD
|
||||
const identifier = (email && password) ? email : DEFAULT_ADMIN_IDENTIFIER
|
||||
const plainPassword = (email && password) ? password : generateRandomPassword()
|
||||
const id = crypto.randomUUID()
|
||||
const now = new Date().toISOString()
|
||||
await db.run(
|
||||
'INSERT INTO users (id, identifier, password_hash, role, created_at, auth_provider, oidc_issuer, oidc_sub) VALUES (?, ?, ?, ?, ?, ?, ?, ?)',
|
||||
[id, identifier, hashPassword(plainPassword), 'admin', now, 'local', null, null],
|
||||
)
|
||||
if (!email || !password) {
|
||||
console.log('\n[KestrelOS] No bootstrap admin configured. Default admin created. Sign in at /login with:\n')
|
||||
|
||||
console.log(` Identifier: ${identifier}\n Password: ${plainPassword}\n`)
|
||||
|
||||
console.log(' Set BOOTSTRAP_EMAIL and BOOTSTRAP_PASSWORD to use your own credentials on first run.\n')
|
||||
const getSchemaVersion = async (get) => {
|
||||
try {
|
||||
const row = await get('SELECT version FROM schema_version ORDER BY version DESC LIMIT 1')
|
||||
return row?.version || 0
|
||||
}
|
||||
catch {
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
async function migrateUsersIfNeeded(run, all) {
|
||||
const setSchemaVersion = (run, version) => run('INSERT OR REPLACE INTO schema_version (version) VALUES (?)', [version])
|
||||
|
||||
const migrateToV2 = async (run, all) => {
|
||||
const info = await all('PRAGMA table_info(users)')
|
||||
if (info.some(c => c.name === 'auth_provider')) return
|
||||
await run(USERS_V2_SQL)
|
||||
await run(
|
||||
`INSERT INTO users_new (id, identifier, password_hash, role, created_at, auth_provider, oidc_issuer, oidc_sub)
|
||||
SELECT id, identifier, password_hash, role, created_at, 'local', NULL, NULL FROM users`,
|
||||
)
|
||||
await run('DROP TABLE users')
|
||||
await run('ALTER TABLE users_new RENAME TO users')
|
||||
await run(USERS_OIDC_UNIQUE)
|
||||
|
||||
await run('BEGIN TRANSACTION')
|
||||
try {
|
||||
await run(SCHEMA.users_v2)
|
||||
await run('INSERT INTO users_new (id, identifier, password_hash, role, created_at, auth_provider, oidc_issuer, oidc_sub) SELECT id, identifier, password_hash, role, created_at, ?, ?, ? FROM users', ['local', null, null])
|
||||
await run('DROP TABLE users')
|
||||
await run('ALTER TABLE users_new RENAME TO users')
|
||||
await run(SCHEMA.users_oidc_index)
|
||||
await run('COMMIT')
|
||||
}
|
||||
catch (error) {
|
||||
await run('ROLLBACK').catch(() => {})
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
const runMigrations = async (run, all, get) => {
|
||||
const version = await getSchemaVersion(get)
|
||||
if (version >= SCHEMA_VERSION) return
|
||||
if (version < 2) {
|
||||
await migrateToV2(run, all)
|
||||
await setSchemaVersion(run, 2)
|
||||
}
|
||||
}
|
||||
|
||||
const initDb = async (db, run, all, get) => {
|
||||
try {
|
||||
await run('PRAGMA journal_mode = WAL')
|
||||
}
|
||||
catch {
|
||||
// WAL not supported (e.g., network filesystem)
|
||||
}
|
||||
db.configure('busyTimeout', DB_BUSY_TIMEOUT_MS)
|
||||
|
||||
await run(SCHEMA.schema_version)
|
||||
await run(SCHEMA.users)
|
||||
await runMigrations(run, all, get)
|
||||
await run(SCHEMA.sessions)
|
||||
await run(SCHEMA.pois)
|
||||
await run(SCHEMA.devices)
|
||||
|
||||
if (!testPath) await bootstrapAdmin(run, get)
|
||||
}
|
||||
|
||||
export async function getDb() {
|
||||
if (dbInstance) return dbInstance
|
||||
const path = getDbPath()
|
||||
const db = new sqlite3.Database(path)
|
||||
|
||||
const db = new sqlite3.Database(getDbPath(), (err) => {
|
||||
if (err) {
|
||||
console.error('[db] Failed to open database:', err.message)
|
||||
throw err
|
||||
}
|
||||
})
|
||||
|
||||
const run = promisify(db.run.bind(db))
|
||||
const all = promisify(db.all.bind(db))
|
||||
const get = promisify(db.get.bind(db))
|
||||
await run(USERS_SQL)
|
||||
await migrateUsersIfNeeded(run, all)
|
||||
await run(SESSIONS_SQL)
|
||||
await run(POIS_SQL)
|
||||
await run(DEVICES_SQL)
|
||||
await bootstrap({ run, get })
|
||||
|
||||
try {
|
||||
await initDb(db, run, all, get)
|
||||
}
|
||||
catch (error) {
|
||||
db.close()
|
||||
console.error('[db] Database initialization failed:', error.message)
|
||||
throw error
|
||||
}
|
||||
|
||||
dbInstance = { db, run, all, get }
|
||||
return dbInstance
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the DB connection. Call on server shutdown to avoid native sqlite3 crashes in worker teardown.
|
||||
*/
|
||||
export function closeDb() {
|
||||
if (dbInstance) {
|
||||
try {
|
||||
dbInstance.db.close()
|
||||
}
|
||||
catch {
|
||||
// ignore if already closed
|
||||
}
|
||||
dbInstance = null
|
||||
if (!dbInstance) return
|
||||
try {
|
||||
dbInstance.db.close((err) => {
|
||||
if (err) console.error('[db] Error closing database:', err.message)
|
||||
})
|
||||
}
|
||||
catch (error) {
|
||||
console.error('[db] Error closing database:', error.message)
|
||||
}
|
||||
dbInstance = null
|
||||
}
|
||||
|
||||
/**
|
||||
* For tests: use in-memory DB and reset singleton.
|
||||
* @param {string} path - e.g. ':memory:'
|
||||
*/
|
||||
export function setDbPathForTest(path) {
|
||||
testPath = path
|
||||
testPath = path || null
|
||||
closeDb()
|
||||
}
|
||||
|
||||
@@ -1,8 +1,12 @@
|
||||
import { sanitizeStreamUrl } from './feedUtils.js'
|
||||
|
||||
const DEVICE_TYPES = Object.freeze(['alpr', 'nvr', 'doorbell', 'feed', 'traffic', 'ip', 'drone'])
|
||||
const SOURCE_TYPES = Object.freeze(['mjpeg', 'hls'])
|
||||
|
||||
const sanitizeStreamUrl = (url) => {
|
||||
if (typeof url !== 'string' || !url.trim()) return ''
|
||||
const u = url.trim()
|
||||
return (u.startsWith('https://') || u.startsWith('http://')) ? u : ''
|
||||
}
|
||||
|
||||
/** @typedef {{ id: string, name: string, device_type: string, vendor: string | null, lat: number, lng: number, stream_url: string, source_type: string, config: string | null }} DeviceRow */
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,54 +0,0 @@
|
||||
/**
|
||||
* Validates a single feed object shape (pure function).
|
||||
* @param {unknown} item
|
||||
* @returns {boolean} True if item has id, name, lat, lng with correct types.
|
||||
*/
|
||||
export function isValidFeed(item) {
|
||||
if (!item || typeof item !== 'object') return false
|
||||
const o = /** @type {Record<string, unknown>} */ (item)
|
||||
return (
|
||||
typeof o.id === 'string'
|
||||
&& typeof o.name === 'string'
|
||||
&& typeof o.lat === 'number'
|
||||
&& typeof o.lng === 'number'
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a safe stream URL (http/https only) or empty string. Prevents javascript:, data:, etc.
|
||||
* @param {unknown} url
|
||||
* @returns {string} Safe http(s) URL or empty string.
|
||||
*/
|
||||
export function sanitizeStreamUrl(url) {
|
||||
if (typeof url !== 'string' || !url.trim()) return ''
|
||||
const u = url.trim()
|
||||
if (u.startsWith('https://') || u.startsWith('http://')) return u
|
||||
return ''
|
||||
}
|
||||
|
||||
/**
|
||||
* Sanitizes a validated feed for API response: safe streamUrl and sourceType only.
|
||||
* @param {{ id: string, name: string, lat: number, lng: number, [key: string]: unknown }} feed
|
||||
* @returns {{ id: string, name: string, lat: number, lng: number, streamUrl: string, sourceType: string, description?: string }} Sanitized feed for API.
|
||||
*/
|
||||
export function sanitizeFeedForResponse(feed) {
|
||||
return {
|
||||
id: feed.id,
|
||||
name: feed.name,
|
||||
lat: feed.lat,
|
||||
lng: feed.lng,
|
||||
streamUrl: sanitizeStreamUrl(feed.streamUrl),
|
||||
sourceType: feed.sourceType === 'hls' ? 'hls' : 'mjpeg',
|
||||
...(typeof feed.description === 'string' ? { description: feed.description } : {}),
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Filters and returns only valid feeds from an array (pure function).
|
||||
* @param {unknown[]} list
|
||||
* @returns {Array<{ id: string, name: string, lat: number, lng: number }>} Array of valid feed objects.
|
||||
*/
|
||||
export function getValidFeeds(list) {
|
||||
if (!Array.isArray(list)) return []
|
||||
return list.filter(isValidFeed)
|
||||
}
|
||||
@@ -1,43 +1,17 @@
|
||||
/**
|
||||
* In-memory store for live sharing sessions (camera + location).
|
||||
* Sessions expire after TTL_MS without an update.
|
||||
*/
|
||||
|
||||
import { closeRouter, getProducer, getTransport } from './mediasoup.js'
|
||||
|
||||
const TTL_MS = 60_000 // 60 seconds without update = inactive
|
||||
|
||||
const TTL_MS = 60_000
|
||||
const sessions = new Map()
|
||||
|
||||
/**
|
||||
* @typedef {{
|
||||
* id: string
|
||||
* userId: string
|
||||
* label: string
|
||||
* lat: number
|
||||
* lng: number
|
||||
* updatedAt: number
|
||||
* routerId: string | null
|
||||
* producerId: string | null
|
||||
* transportId: string | null
|
||||
* }} LiveSession
|
||||
*/
|
||||
|
||||
/**
|
||||
* @param {string} userId
|
||||
* @param {string} [label]
|
||||
* @returns {LiveSession} The created live session.
|
||||
*/
|
||||
export function createSession(userId, label = '') {
|
||||
export const createSession = (userId, label = '') => {
|
||||
const id = crypto.randomUUID()
|
||||
const now = Date.now()
|
||||
const session = {
|
||||
id,
|
||||
userId,
|
||||
label: (label || 'Live').trim() || 'Live',
|
||||
lat: 0,
|
||||
lng: 0,
|
||||
updatedAt: now,
|
||||
updatedAt: Date.now(),
|
||||
routerId: null,
|
||||
producerId: null,
|
||||
transportId: null,
|
||||
@@ -46,34 +20,16 @@ export function createSession(userId, label = '') {
|
||||
return session
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} id
|
||||
* @returns {LiveSession | undefined} The session or undefined.
|
||||
*/
|
||||
export function getLiveSession(id) {
|
||||
return sessions.get(id)
|
||||
}
|
||||
export const getLiveSession = id => sessions.get(id)
|
||||
|
||||
/**
|
||||
* Get an existing active session for a user (for replacing with a new one).
|
||||
* @param {string} userId
|
||||
* @returns {LiveSession | undefined} The first active session for the user, or undefined.
|
||||
*/
|
||||
export function getActiveSessionByUserId(userId) {
|
||||
export const getActiveSessionByUserId = (userId) => {
|
||||
const now = Date.now()
|
||||
for (const [, s] of sessions) {
|
||||
if (s.userId === userId && now - s.updatedAt <= TTL_MS) {
|
||||
return s
|
||||
}
|
||||
for (const s of sessions.values()) {
|
||||
if (s.userId === userId && now - s.updatedAt <= TTL_MS) return s
|
||||
}
|
||||
return undefined
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} id
|
||||
* @param {{ lat?: number, lng?: number, routerId?: string | null, producerId?: string | null, transportId?: string | null }} updates
|
||||
*/
|
||||
export function updateLiveSession(id, updates) {
|
||||
export const updateLiveSession = (id, updates) => {
|
||||
const session = sessions.get(id)
|
||||
if (!session) return
|
||||
const now = Date.now()
|
||||
@@ -85,74 +41,52 @@ export function updateLiveSession(id, updates) {
|
||||
session.updatedAt = now
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} id
|
||||
*/
|
||||
export function deleteLiveSession(id) {
|
||||
sessions.delete(id)
|
||||
export const deleteLiveSession = id => sessions.delete(id)
|
||||
|
||||
export const clearSessions = () => sessions.clear()
|
||||
|
||||
const cleanupSession = async (session) => {
|
||||
if (session.producerId) {
|
||||
const producer = getProducer(session.producerId)
|
||||
producer?.close()
|
||||
}
|
||||
if (session.transportId) {
|
||||
const transport = getTransport(session.transportId)
|
||||
transport?.close()
|
||||
}
|
||||
if (session.routerId) {
|
||||
await closeRouter(session.id).catch((err) => {
|
||||
console.error(`[liveSessions] Error closing router for expired session ${session.id}:`, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear all sessions (for tests only).
|
||||
*/
|
||||
export function clearSessions() {
|
||||
sessions.clear()
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns sessions updated within TTL_MS (active only).
|
||||
* Also cleans up expired sessions.
|
||||
* @returns {Promise<Array<{ id: string, userId: string, label: string, lat: number, lng: number, updatedAt: number, hasStream: boolean }>>} Active sessions with hasStream flag.
|
||||
*/
|
||||
export async function getActiveSessions() {
|
||||
export const getActiveSessions = async () => {
|
||||
const now = Date.now()
|
||||
const result = []
|
||||
const expiredIds = []
|
||||
for (const [id, s] of sessions) {
|
||||
if (now - s.updatedAt <= TTL_MS) {
|
||||
result.push({
|
||||
id: s.id,
|
||||
userId: s.userId,
|
||||
label: s.label,
|
||||
lat: s.lat,
|
||||
lng: s.lng,
|
||||
updatedAt: s.updatedAt,
|
||||
hasStream: Boolean(s.producerId),
|
||||
const active = []
|
||||
const expired = []
|
||||
|
||||
for (const session of sessions.values()) {
|
||||
if (now - session.updatedAt <= TTL_MS) {
|
||||
active.push({
|
||||
id: session.id,
|
||||
userId: session.userId,
|
||||
label: session.label,
|
||||
lat: session.lat,
|
||||
lng: session.lng,
|
||||
updatedAt: session.updatedAt,
|
||||
hasStream: Boolean(session.producerId),
|
||||
})
|
||||
}
|
||||
else {
|
||||
expiredIds.push(id)
|
||||
expired.push(session)
|
||||
}
|
||||
}
|
||||
// Clean up expired sessions and their WebRTC resources
|
||||
for (const id of expiredIds) {
|
||||
const session = sessions.get(id)
|
||||
if (session) {
|
||||
// Clean up producer if it exists
|
||||
if (session.producerId) {
|
||||
const producer = getProducer(session.producerId)
|
||||
if (producer) {
|
||||
producer.close()
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up transport if it exists
|
||||
if (session.transportId) {
|
||||
const transport = getTransport(session.transportId)
|
||||
if (transport) {
|
||||
transport.close()
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up router
|
||||
if (session.routerId) {
|
||||
await closeRouter(id).catch((err) => {
|
||||
console.error(`[liveSessions] Error closing router for expired session ${id}:`, err)
|
||||
})
|
||||
}
|
||||
|
||||
sessions.delete(id)
|
||||
}
|
||||
for (const session of expired) {
|
||||
await cleanupSession(session)
|
||||
sessions.delete(session.id)
|
||||
}
|
||||
return result
|
||||
|
||||
return active
|
||||
}
|
||||
|
||||
@@ -1,21 +1,18 @@
|
||||
/**
|
||||
* Mediasoup SFU (Selective Forwarding Unit) setup and management.
|
||||
* Handles WebRTC router, transport, producer, and consumer creation.
|
||||
*/
|
||||
|
||||
import os from 'node:os'
|
||||
import mediasoup from 'mediasoup'
|
||||
|
||||
let worker = null
|
||||
const routers = new Map() // sessionId -> Router
|
||||
const transports = new Map() // transportId -> WebRtcTransport
|
||||
export const producers = new Map() // producerId -> Producer
|
||||
const routers = new Map()
|
||||
const transports = new Map()
|
||||
export const producers = new Map()
|
||||
|
||||
/**
|
||||
* Initialize Mediasoup worker (singleton).
|
||||
* @returns {Promise<mediasoup.types.Worker>} The Mediasoup worker.
|
||||
*/
|
||||
export async function getWorker() {
|
||||
const MEDIA_CODECS = [
|
||||
{ kind: 'video', mimeType: 'video/H264', clockRate: 90000, parameters: { 'packetization-mode': 1, 'profile-level-id': '42e01f' } },
|
||||
{ kind: 'video', mimeType: 'video/VP8', clockRate: 90000 },
|
||||
{ kind: 'video', mimeType: 'video/VP9', clockRate: 90000 },
|
||||
]
|
||||
|
||||
export const getWorker = async () => {
|
||||
if (worker) return worker
|
||||
worker = await mediasoup.createWorker({
|
||||
logLevel: process.env.NODE_ENV === 'development' ? 'debug' : 'warn',
|
||||
@@ -30,50 +27,15 @@ export async function getWorker() {
|
||||
return worker
|
||||
}
|
||||
|
||||
/**
|
||||
* Create or get a router for a live session.
|
||||
* @param {string} sessionId
|
||||
* @returns {Promise<mediasoup.types.Router>} Router for the session.
|
||||
*/
|
||||
export async function getRouter(sessionId) {
|
||||
if (routers.has(sessionId)) {
|
||||
return routers.get(sessionId)
|
||||
}
|
||||
const w = await getWorker()
|
||||
const router = await w.createRouter({
|
||||
mediaCodecs: [
|
||||
{
|
||||
kind: 'video',
|
||||
mimeType: 'video/H264',
|
||||
clockRate: 90000,
|
||||
parameters: {
|
||||
'packetization-mode': 1,
|
||||
'profile-level-id': '42e01f',
|
||||
},
|
||||
},
|
||||
{
|
||||
kind: 'video',
|
||||
mimeType: 'video/VP8',
|
||||
clockRate: 90000,
|
||||
},
|
||||
{
|
||||
kind: 'video',
|
||||
mimeType: 'video/VP9',
|
||||
clockRate: 90000,
|
||||
},
|
||||
],
|
||||
})
|
||||
export const getRouter = async (sessionId) => {
|
||||
const existing = routers.get(sessionId)
|
||||
if (existing) return existing
|
||||
const router = await (await getWorker()).createRouter({ mediaCodecs: MEDIA_CODECS })
|
||||
routers.set(sessionId, router)
|
||||
return router
|
||||
}
|
||||
|
||||
/**
|
||||
* True if the string is a valid IPv4 address (numeric a.b.c.d, each octet 0-255).
|
||||
* Used to accept request Host as announced IP only when it's safe (no hostnames/DNS rebinding).
|
||||
* @param {string} host
|
||||
* @returns {boolean} True if host is a valid IPv4 address.
|
||||
*/
|
||||
function isIPv4(host) {
|
||||
const isIPv4 = (host) => {
|
||||
if (typeof host !== 'string' || !host) return false
|
||||
const parts = host.split('.')
|
||||
if (parts.length !== 4) return false
|
||||
@@ -84,45 +46,24 @@ function isIPv4(host) {
|
||||
return true
|
||||
}
|
||||
|
||||
/**
|
||||
* First non-internal IPv4 from network interfaces (no env read).
|
||||
* @returns {string | null} First non-internal IPv4 address or null.
|
||||
*/
|
||||
function getAnnouncedIpFromInterfaces() {
|
||||
const ifaces = os.networkInterfaces()
|
||||
for (const addrs of Object.values(ifaces)) {
|
||||
const getAnnouncedIpFromInterfaces = () => {
|
||||
for (const addrs of Object.values(os.networkInterfaces())) {
|
||||
if (!addrs) continue
|
||||
for (const addr of addrs) {
|
||||
if (addr.family === 'IPv4' && !addr.internal) {
|
||||
return addr.address
|
||||
}
|
||||
if (addr.family === 'IPv4' && !addr.internal) return addr.address
|
||||
}
|
||||
}
|
||||
return null
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve announced IP: env override, then request host if IPv4, then auto-detect. Pure and deterministic.
|
||||
* @param {string | undefined} requestHost - Host header from the client.
|
||||
* @returns {string | null} The IP to announce in ICE, or null for localhost-only.
|
||||
*/
|
||||
function resolveAnnouncedIp(requestHost) {
|
||||
const resolveAnnouncedIp = (requestHost) => {
|
||||
const envIp = process.env.MEDIASOUP_ANNOUNCED_IP?.trim()
|
||||
if (envIp) return envIp
|
||||
if (requestHost && isIPv4(requestHost)) return requestHost
|
||||
return getAnnouncedIpFromInterfaces()
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a WebRTC transport for a router.
|
||||
* @param {mediasoup.types.Router} router
|
||||
* @param {boolean} _isProducer - true for publisher, false for consumer (reserved for future use)
|
||||
* @param {string} [requestHost] - Hostname from the request (e.g. getRequestURL(event).hostname). If a valid IPv4, used as announced IP so the client can reach the server.
|
||||
* @returns {Promise<{ transport: mediasoup.types.WebRtcTransport, params: object }>} Transport and connection params.
|
||||
*/
|
||||
// eslint-disable-next-line no-unused-vars
|
||||
export async function createTransport(router, _isProducer = false, requestHost = undefined) {
|
||||
// LAN first so the phone (and remote viewers) try the reachable IP before 127.0.0.1 (loopback on the client).
|
||||
export const createTransport = async (router, requestHost = undefined) => {
|
||||
const announcedIp = resolveAnnouncedIp(requestHost)
|
||||
const listenIps = announcedIp
|
||||
? [{ ip: '0.0.0.0', announcedIp }, { ip: '127.0.0.1' }]
|
||||
@@ -138,10 +79,10 @@ export async function createTransport(router, _isProducer = false, requestHost =
|
||||
console.error('[mediasoup] Transport creation failed:', err)
|
||||
throw new Error(`Failed to create transport: ${err.message || String(err)}`)
|
||||
})
|
||||
|
||||
transports.set(transport.id, transport)
|
||||
transport.on('close', () => {
|
||||
transports.delete(transport.id)
|
||||
})
|
||||
transport.on('close', () => transports.delete(transport.id))
|
||||
|
||||
return {
|
||||
transport,
|
||||
params: {
|
||||
@@ -153,61 +94,22 @@ export async function createTransport(router, _isProducer = false, requestHost =
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get transport by ID.
|
||||
* @param {string} transportId
|
||||
* @returns {mediasoup.types.WebRtcTransport | undefined} Transport or undefined.
|
||||
*/
|
||||
export function getTransport(transportId) {
|
||||
return transports.get(transportId)
|
||||
}
|
||||
export const getTransport = transportId => transports.get(transportId)
|
||||
|
||||
/**
|
||||
* Create a producer (publisher's video track).
|
||||
* @param {mediasoup.types.WebRtcTransport} transport
|
||||
* @param {MediaStreamTrack} track
|
||||
* @returns {Promise<mediasoup.types.Producer>} The producer.
|
||||
*/
|
||||
export async function createProducer(transport, track) {
|
||||
export const createProducer = async (transport, track) => {
|
||||
const producer = await transport.produce({ track })
|
||||
producers.set(producer.id, producer)
|
||||
producer.on('close', () => {
|
||||
producers.delete(producer.id)
|
||||
})
|
||||
producer.on('close', () => producers.delete(producer.id))
|
||||
return producer
|
||||
}
|
||||
|
||||
/**
|
||||
* Get producer by ID.
|
||||
* @param {string} producerId
|
||||
* @returns {mediasoup.types.Producer | undefined} Producer or undefined.
|
||||
*/
|
||||
export function getProducer(producerId) {
|
||||
return producers.get(producerId)
|
||||
}
|
||||
export const getProducer = producerId => producers.get(producerId)
|
||||
|
||||
/**
|
||||
* Get transports Map (for cleanup).
|
||||
* @returns {Map<string, mediasoup.types.WebRtcTransport>} Map of transport ID to transport.
|
||||
*/
|
||||
export function getTransports() {
|
||||
return transports
|
||||
}
|
||||
export const getTransports = () => transports
|
||||
|
||||
/**
|
||||
* Create a consumer (viewer subscribes to producer's stream).
|
||||
* @param {mediasoup.types.WebRtcTransport} transport
|
||||
* @param {mediasoup.types.Producer} producer
|
||||
* @param {boolean} rtpCapabilities
|
||||
* @returns {Promise<{ consumer: mediasoup.types.Consumer, params: object }>} Consumer and connection params.
|
||||
*/
|
||||
export async function createConsumer(transport, producer, rtpCapabilities) {
|
||||
if (producer.closed) {
|
||||
throw new Error('Producer is closed')
|
||||
}
|
||||
if (producer.paused) {
|
||||
await producer.resume()
|
||||
}
|
||||
export const createConsumer = async (transport, producer, rtpCapabilities) => {
|
||||
if (producer.closed) throw new Error('Producer is closed')
|
||||
if (producer.paused) await producer.resume()
|
||||
|
||||
const consumer = await transport.consume({
|
||||
producerId: producer.id,
|
||||
@@ -229,11 +131,7 @@ export async function createConsumer(transport, producer, rtpCapabilities) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up router for a session.
|
||||
* @param {string} sessionId
|
||||
*/
|
||||
export async function closeRouter(sessionId) {
|
||||
export const closeRouter = async (sessionId) => {
|
||||
const router = routers.get(sessionId)
|
||||
if (router) {
|
||||
router.close()
|
||||
@@ -241,10 +139,4 @@ export async function closeRouter(sessionId) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all active routers (for debugging/monitoring).
|
||||
* @returns {Array<string>} Session IDs with active routers
|
||||
*/
|
||||
export function getActiveRouters() {
|
||||
return Array.from(routers.keys())
|
||||
}
|
||||
export const getActiveRouters = () => Array.from(routers.keys())
|
||||
|
||||
@@ -1,27 +0,0 @@
|
||||
import { join } from 'node:path'
|
||||
import { readFileSync, existsSync } from 'node:fs'
|
||||
import { getDb } from './db.js'
|
||||
import { sanitizeStreamUrl } from './feedUtils.js'
|
||||
|
||||
/**
|
||||
* One-time migration: insert entries from server/data/feeds.json into devices (device_type = 'feed').
|
||||
* No-op if devices table already has rows or feeds file is missing.
|
||||
*/
|
||||
export async function migrateFeedsToDevices() {
|
||||
const db = await getDb()
|
||||
const row = await db.get('SELECT COUNT(*) as n FROM devices')
|
||||
if (row?.n > 0) return
|
||||
const path = join(process.cwd(), 'server/data/feeds.json')
|
||||
if (!existsSync(path)) return
|
||||
const data = JSON.parse(readFileSync(path, 'utf8'))
|
||||
const list = Array.isArray(data) ? data : []
|
||||
for (const feed of list) {
|
||||
if (!feed?.id || typeof feed.name !== 'string' || typeof feed.lat !== 'number' || typeof feed.lng !== 'number') continue
|
||||
const streamUrl = sanitizeStreamUrl(feed.streamUrl) ?? ''
|
||||
const sourceType = feed.sourceType === 'hls' ? 'hls' : 'mjpeg'
|
||||
await db.run(
|
||||
'INSERT OR IGNORE INTO devices (id, name, device_type, vendor, lat, lng, stream_url, source_type, config) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)',
|
||||
[feed.id, feed.name, 'feed', null, feed.lat, feed.lng, streamUrl, sourceType, null],
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -32,7 +32,7 @@ export async function handleWebSocketMessage(userId, sessionId, type, data) {
|
||||
}
|
||||
case 'create-transport': {
|
||||
const router = await getRouter(sessionId)
|
||||
const { transport, params } = await createTransport(router, true)
|
||||
const { transport, params } = await createTransport(router)
|
||||
updateLiveSession(sessionId, { transportId: transport.id, routerId: router.id })
|
||||
return { type: 'transport-created', data: params }
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user