From fbb38c5dd7b05f9dae6357eb1f38706964ee5214 Mon Sep 17 00:00:00 2001 From: Madison Grubb Date: Thu, 12 Feb 2026 13:28:36 -0500 Subject: [PATCH] improve db --- README.md | 6 +- app/components/KestrelMap.vue | 9 +- app/pages/index.vue | 1 - server/plugins/db.init.js | 7 +- server/utils/bootstrap.js | 29 +++ server/utils/db.js | 246 +++++++++++++----------- server/utils/deviceUtils.js | 8 +- server/utils/feedUtils.js | 54 ------ server/utils/liveSessions.js | 158 +++++---------- server/utils/mediasoup.js | 174 ++++------------- server/utils/migrateFeedsToDevices.js | 27 --- test/e2e/global-setup.js | 47 +++-- test/nuxt/KestrelMap.spec.js | 14 +- test/nuxt/api.spec.js | 14 -- test/unit/feedUtils.spec.js | 119 ------------ test/unit/migrateFeedsToDevices.spec.js | 32 --- vitest.config.js | 1 - 17 files changed, 292 insertions(+), 654 deletions(-) create mode 100644 server/utils/bootstrap.js delete mode 100644 server/utils/feedUtils.js delete mode 100644 server/utils/migrateFeedsToDevices.js delete mode 100644 test/nuxt/api.spec.js delete mode 100644 test/unit/feedUtils.spec.js delete mode 100644 test/unit/migrateFeedsToDevices.spec.js diff --git a/README.md b/README.md index 72f1514..900b4d6 100644 --- a/README.md +++ b/README.md @@ -62,7 +62,7 @@ See [docs/live-streaming.md](docs/live-streaming.md) for architecture details. ## Configuration -- **Feeds**: Edit `server/data/feeds.json` to add cameras/feeds. Each feed needs `id`, `name`, `lat`, `lng`, `streamUrl`, and `sourceType` (`mjpeg` or `hls`). Home Assistant and other sources use the same shape; use proxy URLs for HA. +- **Devices**: Manage cameras/devices via the API (`/api/devices`) or the Members/Cameras UI. Each device needs `name`, `device_type`, `lat`, `lng`, `stream_url`, and `source_type` (`mjpeg` or `hls`). - **Environment**: No required env vars for basic run. For production, set `HOST=0.0.0.0` and `PORT` as needed (e.g. in Docker/Helm). - **Authentication**: The login page always offers password sign-in (local). Optionally set `BOOTSTRAP_EMAIL` and `BOOTSTRAP_PASSWORD` before the first run to create the first admin; otherwise a default admin is created and its credentials are printed in the terminal. To also show an OIDC sign-in button, configure `OIDC_ISSUER`, `OIDC_CLIENT_ID`, `OIDC_CLIENT_SECRET`, and optionally `OIDC_LABEL`, `OIDC_REDIRECT_URI`. See [docs/auth.md](docs/auth.md) for provider-specific examples. - **Bootstrap admin** (when using local auth): The server initializes the database and runs bootstrap at startup. On first run (no users in the database), it creates the first admin. If you set `BOOTSTRAP_EMAIL` and `BOOTSTRAP_PASSWORD` before starting, that account is created. If you don’t set them, a default admin is created (identifier: `admin`) with a random password and the credentials are printed in the terminal—copy them and sign in at `/login`, then change the password or add users via Members. Use **Members** to change roles (admin, leader, member). Only admins can change roles; admins and leaders can edit POIs. @@ -85,8 +85,8 @@ Health: `GET /health` (overview), `GET /health/live` (liveness), `GET /health/re ## Security -- Feed list is validated server-side (`getValidFeeds`); only valid entries are returned. -- Stream URLs are treated as untrusted; the UI only uses `http://` or `https://` URLs for display. +- Device data is validated server-side; only valid entries are returned. +- Stream URLs are sanitized to `http://` or `https://` only; other protocols are rejected. ## License diff --git a/app/components/KestrelMap.vue b/app/components/KestrelMap.vue index a74be34..0bb5d58 100644 --- a/app/components/KestrelMap.vue +++ b/app/components/KestrelMap.vue @@ -214,10 +214,6 @@ import 'leaflet/dist/leaflet.css' const props = defineProps({ - feeds: { - type: Array, - default: () => [], - }, devices: { type: Array, default: () => [], @@ -382,8 +378,7 @@ function updateMarkers() { if (m) m.remove() }) - const feedSources = [...(props.feeds || []), ...(props.devices || [])] - const validSources = feedSources.filter(f => typeof f?.lat === 'number' && typeof f?.lng === 'number') + const validSources = (props.devices || []).filter(f => typeof f?.lat === 'number' && typeof f?.lng === 'number') markersRef.value = validSources.map(item => L.marker([item.lat, item.lng]).addTo(ctx.map).on('click', () => emit('select', item)), ) @@ -622,7 +617,7 @@ onBeforeUnmount(() => { destroyMap() }) -watch(() => [props.feeds, props.devices], () => updateMarkers(), { deep: true }) +watch(() => props.devices, () => updateMarkers(), { deep: true }) watch([() => props.pois, () => props.canEditPois], () => updatePoiMarkers(), { deep: true }) watch(() => props.liveSessions, () => updateLiveMarkers(), { deep: true }) diff --git a/app/pages/index.vue b/app/pages/index.vue index 0e050ed..8b038f5 100644 --- a/app/pages/index.vue +++ b/app/pages/index.vue @@ -3,7 +3,6 @@
{ - void getDb().then(() => migrateFeedsToDevices()) + void getDb() nitroApp.hooks.hook('close', () => { closeDb() }) diff --git a/server/utils/bootstrap.js b/server/utils/bootstrap.js new file mode 100644 index 0000000..5cbc23e --- /dev/null +++ b/server/utils/bootstrap.js @@ -0,0 +1,29 @@ +import { randomBytes } from 'node:crypto' +import { hashPassword } from './password.js' + +const DEFAULT_ADMIN_IDENTIFIER = 'admin' +const PASSWORD_CHARS = 'abcdefghjkmnopqrstuvwxyzABCDEFGHJKMNPQRSTUVWXYZ23456789' + +const generateRandomPassword = () => { + const bytes = randomBytes(14) + return Array.from(bytes, b => PASSWORD_CHARS[b % PASSWORD_CHARS.length]).join('') +} + +export async function bootstrapAdmin(run, get) { + const row = await get('SELECT COUNT(*) as n FROM users') + if (row?.n !== 0) return + + const email = process.env.BOOTSTRAP_EMAIL?.trim() + const password = process.env.BOOTSTRAP_PASSWORD + const identifier = (email && password) ? email : DEFAULT_ADMIN_IDENTIFIER + const plainPassword = (email && password) ? password : generateRandomPassword() + + await run( + 'INSERT INTO users (id, identifier, password_hash, role, created_at, auth_provider, oidc_issuer, oidc_sub) VALUES (?, ?, ?, ?, ?, ?, ?, ?)', + [crypto.randomUUID(), identifier, hashPassword(plainPassword), 'admin', new Date().toISOString(), 'local', null, null], + ) + + if (!email || !password) { + console.log(`\n[KestrelOS] No bootstrap admin configured. Default admin created. Sign in at /login with:\n\n Identifier: ${identifier}\n Password: ${plainPassword}\n\n Set BOOTSTRAP_EMAIL and BOOTSTRAP_PASSWORD to use your own credentials on first run.\n`) + } +} diff --git a/server/utils/db.js b/server/utils/db.js index e2bc926..6ae9616 100644 --- a/server/utils/db.js +++ b/server/utils/db.js @@ -2,154 +2,172 @@ import { join } from 'node:path' import { mkdirSync, existsSync } from 'node:fs' import { createRequire } from 'node:module' import { promisify } from 'node:util' -import { randomBytes } from 'node:crypto' -import { hashPassword } from './password.js' - -const DEFAULT_ADMIN_IDENTIFIER = 'admin' -const DEFAULT_PASSWORD_LENGTH = 14 -const PASSWORD_CHARS = 'abcdefghjkmnopqrstuvwxyzABCDEFGHJKMNPQRSTUVWXYZ23456789' - -function generateRandomPassword() { - const bytes = randomBytes(DEFAULT_PASSWORD_LENGTH) - let s = '' - for (let i = 0; i < DEFAULT_PASSWORD_LENGTH; i++) { - s += PASSWORD_CHARS[bytes[i] % PASSWORD_CHARS.length] - } - return s -} +import { bootstrapAdmin } from './bootstrap.js' const require = createRequire(import.meta.url) const sqlite3 = require('sqlite3') +const SCHEMA_VERSION = 2 +const DB_BUSY_TIMEOUT_MS = 5000 + let dbInstance = null -/** Set by tests to use :memory: or a temp path */ let testPath = null -const USERS_SQL = `CREATE TABLE IF NOT EXISTS users ( - id TEXT PRIMARY KEY, - identifier TEXT UNIQUE NOT NULL, - password_hash TEXT NOT NULL, - role TEXT NOT NULL DEFAULT 'member', - created_at TEXT NOT NULL -)` +const SCHEMA = { + schema_version: 'CREATE TABLE IF NOT EXISTS schema_version (version INTEGER PRIMARY KEY)', + users: `CREATE TABLE IF NOT EXISTS users ( + id TEXT PRIMARY KEY, + identifier TEXT UNIQUE NOT NULL, + password_hash TEXT NOT NULL, + role TEXT NOT NULL DEFAULT 'member', + created_at TEXT NOT NULL + )`, + users_v2: `CREATE TABLE users_new ( + id TEXT PRIMARY KEY, + identifier TEXT UNIQUE NOT NULL, + password_hash TEXT, + role TEXT NOT NULL DEFAULT 'member', + created_at TEXT NOT NULL, + auth_provider TEXT NOT NULL DEFAULT 'local', + oidc_issuer TEXT, + oidc_sub TEXT + )`, + users_oidc_index: `CREATE UNIQUE INDEX IF NOT EXISTS users_oidc_unique ON users(oidc_issuer, oidc_sub) WHERE oidc_issuer IS NOT NULL AND oidc_sub IS NOT NULL`, + sessions: `CREATE TABLE IF NOT EXISTS sessions ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + created_at TEXT NOT NULL, + expires_at TEXT NOT NULL, + FOREIGN KEY(user_id) REFERENCES users(id) + )`, + pois: `CREATE TABLE IF NOT EXISTS pois ( + id TEXT PRIMARY KEY, + lat REAL NOT NULL, + lng REAL NOT NULL, + label TEXT NOT NULL DEFAULT '', + icon_type TEXT NOT NULL DEFAULT 'pin' + )`, + devices: `CREATE TABLE IF NOT EXISTS devices ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL DEFAULT '', + device_type TEXT NOT NULL, + vendor TEXT, + lat REAL NOT NULL, + lng REAL NOT NULL, + stream_url TEXT NOT NULL DEFAULT '', + source_type TEXT NOT NULL DEFAULT 'mjpeg', + config TEXT + )`, +} -const USERS_V2_SQL = `CREATE TABLE users_new ( - id TEXT PRIMARY KEY, - identifier TEXT UNIQUE NOT NULL, - password_hash TEXT, - role TEXT NOT NULL DEFAULT 'member', - created_at TEXT NOT NULL, - auth_provider TEXT NOT NULL DEFAULT 'local', - oidc_issuer TEXT, - oidc_sub TEXT -)` -const USERS_OIDC_UNIQUE = `CREATE UNIQUE INDEX IF NOT EXISTS users_oidc_unique ON users(oidc_issuer, oidc_sub) WHERE oidc_issuer IS NOT NULL AND oidc_sub IS NOT NULL` -const SESSIONS_SQL = `CREATE TABLE IF NOT EXISTS sessions ( - id TEXT PRIMARY KEY, - user_id TEXT NOT NULL, - created_at TEXT NOT NULL, - expires_at TEXT NOT NULL, - FOREIGN KEY(user_id) REFERENCES users(id) -)` -const POIS_SQL = `CREATE TABLE IF NOT EXISTS pois ( - id TEXT PRIMARY KEY, - lat REAL NOT NULL, - lng REAL NOT NULL, - label TEXT NOT NULL DEFAULT '', - icon_type TEXT NOT NULL DEFAULT 'pin' -)` -const DEVICES_SQL = `CREATE TABLE IF NOT EXISTS devices ( - id TEXT PRIMARY KEY, - name TEXT NOT NULL DEFAULT '', - device_type TEXT NOT NULL, - vendor TEXT, - lat REAL NOT NULL, - lng REAL NOT NULL, - stream_url TEXT NOT NULL DEFAULT '', - source_type TEXT NOT NULL DEFAULT 'mjpeg', - config TEXT -)` - -function getDbPath() { +const getDbPath = () => { if (testPath) return testPath + if (process.env.DB_PATH) return process.env.DB_PATH const dir = join(process.cwd(), 'data') if (!existsSync(dir)) mkdirSync(dir, { recursive: true }) return join(dir, 'kestrelos.db') } -async function bootstrap(db) { - if (testPath) return - const row = await db.get('SELECT COUNT(*) as n FROM users') - if (row?.n !== 0) return - const email = process.env.BOOTSTRAP_EMAIL?.trim() - const password = process.env.BOOTSTRAP_PASSWORD - const identifier = (email && password) ? email : DEFAULT_ADMIN_IDENTIFIER - const plainPassword = (email && password) ? password : generateRandomPassword() - const id = crypto.randomUUID() - const now = new Date().toISOString() - await db.run( - 'INSERT INTO users (id, identifier, password_hash, role, created_at, auth_provider, oidc_issuer, oidc_sub) VALUES (?, ?, ?, ?, ?, ?, ?, ?)', - [id, identifier, hashPassword(plainPassword), 'admin', now, 'local', null, null], - ) - if (!email || !password) { - console.log('\n[KestrelOS] No bootstrap admin configured. Default admin created. Sign in at /login with:\n') - - console.log(` Identifier: ${identifier}\n Password: ${plainPassword}\n`) - - console.log(' Set BOOTSTRAP_EMAIL and BOOTSTRAP_PASSWORD to use your own credentials on first run.\n') +const getSchemaVersion = async (get) => { + try { + const row = await get('SELECT version FROM schema_version ORDER BY version DESC LIMIT 1') + return row?.version || 0 + } + catch { + return 0 } } -async function migrateUsersIfNeeded(run, all) { +const setSchemaVersion = (run, version) => run('INSERT OR REPLACE INTO schema_version (version) VALUES (?)', [version]) + +const migrateToV2 = async (run, all) => { const info = await all('PRAGMA table_info(users)') if (info.some(c => c.name === 'auth_provider')) return - await run(USERS_V2_SQL) - await run( - `INSERT INTO users_new (id, identifier, password_hash, role, created_at, auth_provider, oidc_issuer, oidc_sub) - SELECT id, identifier, password_hash, role, created_at, 'local', NULL, NULL FROM users`, - ) - await run('DROP TABLE users') - await run('ALTER TABLE users_new RENAME TO users') - await run(USERS_OIDC_UNIQUE) + + await run('BEGIN TRANSACTION') + try { + await run(SCHEMA.users_v2) + await run('INSERT INTO users_new (id, identifier, password_hash, role, created_at, auth_provider, oidc_issuer, oidc_sub) SELECT id, identifier, password_hash, role, created_at, ?, ?, ? FROM users', ['local', null, null]) + await run('DROP TABLE users') + await run('ALTER TABLE users_new RENAME TO users') + await run(SCHEMA.users_oidc_index) + await run('COMMIT') + } + catch (error) { + await run('ROLLBACK').catch(() => {}) + throw error + } +} + +const runMigrations = async (run, all, get) => { + const version = await getSchemaVersion(get) + if (version >= SCHEMA_VERSION) return + if (version < 2) { + await migrateToV2(run, all) + await setSchemaVersion(run, 2) + } +} + +const initDb = async (db, run, all, get) => { + try { + await run('PRAGMA journal_mode = WAL') + } + catch { + // WAL not supported (e.g., network filesystem) + } + db.configure('busyTimeout', DB_BUSY_TIMEOUT_MS) + + await run(SCHEMA.schema_version) + await run(SCHEMA.users) + await runMigrations(run, all, get) + await run(SCHEMA.sessions) + await run(SCHEMA.pois) + await run(SCHEMA.devices) + + if (!testPath) await bootstrapAdmin(run, get) } export async function getDb() { if (dbInstance) return dbInstance - const path = getDbPath() - const db = new sqlite3.Database(path) + + const db = new sqlite3.Database(getDbPath(), (err) => { + if (err) { + console.error('[db] Failed to open database:', err.message) + throw err + } + }) + const run = promisify(db.run.bind(db)) const all = promisify(db.all.bind(db)) const get = promisify(db.get.bind(db)) - await run(USERS_SQL) - await migrateUsersIfNeeded(run, all) - await run(SESSIONS_SQL) - await run(POIS_SQL) - await run(DEVICES_SQL) - await bootstrap({ run, get }) + + try { + await initDb(db, run, all, get) + } + catch (error) { + db.close() + console.error('[db] Database initialization failed:', error.message) + throw error + } + dbInstance = { db, run, all, get } return dbInstance } -/** - * Close the DB connection. Call on server shutdown to avoid native sqlite3 crashes in worker teardown. - */ export function closeDb() { - if (dbInstance) { - try { - dbInstance.db.close() - } - catch { - // ignore if already closed - } - dbInstance = null + if (!dbInstance) return + try { + dbInstance.db.close((err) => { + if (err) console.error('[db] Error closing database:', err.message) + }) } + catch (error) { + console.error('[db] Error closing database:', error.message) + } + dbInstance = null } -/** - * For tests: use in-memory DB and reset singleton. - * @param {string} path - e.g. ':memory:' - */ export function setDbPathForTest(path) { - testPath = path + testPath = path || null closeDb() } diff --git a/server/utils/deviceUtils.js b/server/utils/deviceUtils.js index 3307ea6..eb62e67 100644 --- a/server/utils/deviceUtils.js +++ b/server/utils/deviceUtils.js @@ -1,8 +1,12 @@ -import { sanitizeStreamUrl } from './feedUtils.js' - const DEVICE_TYPES = Object.freeze(['alpr', 'nvr', 'doorbell', 'feed', 'traffic', 'ip', 'drone']) const SOURCE_TYPES = Object.freeze(['mjpeg', 'hls']) +const sanitizeStreamUrl = (url) => { + if (typeof url !== 'string' || !url.trim()) return '' + const u = url.trim() + return (u.startsWith('https://') || u.startsWith('http://')) ? u : '' +} + /** @typedef {{ id: string, name: string, device_type: string, vendor: string | null, lat: number, lng: number, stream_url: string, source_type: string, config: string | null }} DeviceRow */ /** diff --git a/server/utils/feedUtils.js b/server/utils/feedUtils.js deleted file mode 100644 index 7be00de..0000000 --- a/server/utils/feedUtils.js +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Validates a single feed object shape (pure function). - * @param {unknown} item - * @returns {boolean} True if item has id, name, lat, lng with correct types. - */ -export function isValidFeed(item) { - if (!item || typeof item !== 'object') return false - const o = /** @type {Record} */ (item) - return ( - typeof o.id === 'string' - && typeof o.name === 'string' - && typeof o.lat === 'number' - && typeof o.lng === 'number' - ) -} - -/** - * Returns a safe stream URL (http/https only) or empty string. Prevents javascript:, data:, etc. - * @param {unknown} url - * @returns {string} Safe http(s) URL or empty string. - */ -export function sanitizeStreamUrl(url) { - if (typeof url !== 'string' || !url.trim()) return '' - const u = url.trim() - if (u.startsWith('https://') || u.startsWith('http://')) return u - return '' -} - -/** - * Sanitizes a validated feed for API response: safe streamUrl and sourceType only. - * @param {{ id: string, name: string, lat: number, lng: number, [key: string]: unknown }} feed - * @returns {{ id: string, name: string, lat: number, lng: number, streamUrl: string, sourceType: string, description?: string }} Sanitized feed for API. - */ -export function sanitizeFeedForResponse(feed) { - return { - id: feed.id, - name: feed.name, - lat: feed.lat, - lng: feed.lng, - streamUrl: sanitizeStreamUrl(feed.streamUrl), - sourceType: feed.sourceType === 'hls' ? 'hls' : 'mjpeg', - ...(typeof feed.description === 'string' ? { description: feed.description } : {}), - } -} - -/** - * Filters and returns only valid feeds from an array (pure function). - * @param {unknown[]} list - * @returns {Array<{ id: string, name: string, lat: number, lng: number }>} Array of valid feed objects. - */ -export function getValidFeeds(list) { - if (!Array.isArray(list)) return [] - return list.filter(isValidFeed) -} diff --git a/server/utils/liveSessions.js b/server/utils/liveSessions.js index 0432261..0e8dcca 100644 --- a/server/utils/liveSessions.js +++ b/server/utils/liveSessions.js @@ -1,43 +1,17 @@ -/** - * In-memory store for live sharing sessions (camera + location). - * Sessions expire after TTL_MS without an update. - */ - import { closeRouter, getProducer, getTransport } from './mediasoup.js' -const TTL_MS = 60_000 // 60 seconds without update = inactive - +const TTL_MS = 60_000 const sessions = new Map() -/** - * @typedef {{ - * id: string - * userId: string - * label: string - * lat: number - * lng: number - * updatedAt: number - * routerId: string | null - * producerId: string | null - * transportId: string | null - * }} LiveSession - */ - -/** - * @param {string} userId - * @param {string} [label] - * @returns {LiveSession} The created live session. - */ -export function createSession(userId, label = '') { +export const createSession = (userId, label = '') => { const id = crypto.randomUUID() - const now = Date.now() const session = { id, userId, label: (label || 'Live').trim() || 'Live', lat: 0, lng: 0, - updatedAt: now, + updatedAt: Date.now(), routerId: null, producerId: null, transportId: null, @@ -46,34 +20,16 @@ export function createSession(userId, label = '') { return session } -/** - * @param {string} id - * @returns {LiveSession | undefined} The session or undefined. - */ -export function getLiveSession(id) { - return sessions.get(id) -} +export const getLiveSession = (id) => sessions.get(id) -/** - * Get an existing active session for a user (for replacing with a new one). - * @param {string} userId - * @returns {LiveSession | undefined} The first active session for the user, or undefined. - */ -export function getActiveSessionByUserId(userId) { +export const getActiveSessionByUserId = (userId) => { const now = Date.now() - for (const [, s] of sessions) { - if (s.userId === userId && now - s.updatedAt <= TTL_MS) { - return s - } + for (const s of sessions.values()) { + if (s.userId === userId && now - s.updatedAt <= TTL_MS) return s } - return undefined } -/** - * @param {string} id - * @param {{ lat?: number, lng?: number, routerId?: string | null, producerId?: string | null, transportId?: string | null }} updates - */ -export function updateLiveSession(id, updates) { +export const updateLiveSession = (id, updates) => { const session = sessions.get(id) if (!session) return const now = Date.now() @@ -85,74 +41,52 @@ export function updateLiveSession(id, updates) { session.updatedAt = now } -/** - * @param {string} id - */ -export function deleteLiveSession(id) { - sessions.delete(id) +export const deleteLiveSession = (id) => sessions.delete(id) + +export const clearSessions = () => sessions.clear() + +const cleanupSession = async (session) => { + if (session.producerId) { + const producer = getProducer(session.producerId) + producer?.close() + } + if (session.transportId) { + const transport = getTransport(session.transportId) + transport?.close() + } + if (session.routerId) { + await closeRouter(session.id).catch((err) => { + console.error(`[liveSessions] Error closing router for expired session ${session.id}:`, err) + }) + } } -/** - * Clear all sessions (for tests only). - */ -export function clearSessions() { - sessions.clear() -} - -/** - * Returns sessions updated within TTL_MS (active only). - * Also cleans up expired sessions. - * @returns {Promise>} Active sessions with hasStream flag. - */ -export async function getActiveSessions() { +export const getActiveSessions = async () => { const now = Date.now() - const result = [] - const expiredIds = [] - for (const [id, s] of sessions) { - if (now - s.updatedAt <= TTL_MS) { - result.push({ - id: s.id, - userId: s.userId, - label: s.label, - lat: s.lat, - lng: s.lng, - updatedAt: s.updatedAt, - hasStream: Boolean(s.producerId), + const active = [] + const expired = [] + + for (const session of sessions.values()) { + if (now - session.updatedAt <= TTL_MS) { + active.push({ + id: session.id, + userId: session.userId, + label: session.label, + lat: session.lat, + lng: session.lng, + updatedAt: session.updatedAt, + hasStream: Boolean(session.producerId), }) } else { - expiredIds.push(id) + expired.push(session) } } - // Clean up expired sessions and their WebRTC resources - for (const id of expiredIds) { - const session = sessions.get(id) - if (session) { - // Clean up producer if it exists - if (session.producerId) { - const producer = getProducer(session.producerId) - if (producer) { - producer.close() - } - } - // Clean up transport if it exists - if (session.transportId) { - const transport = getTransport(session.transportId) - if (transport) { - transport.close() - } - } - - // Clean up router - if (session.routerId) { - await closeRouter(id).catch((err) => { - console.error(`[liveSessions] Error closing router for expired session ${id}:`, err) - }) - } - - sessions.delete(id) - } + for (const session of expired) { + await cleanupSession(session) + sessions.delete(session.id) } - return result + + return active } diff --git a/server/utils/mediasoup.js b/server/utils/mediasoup.js index 189fa1f..79166f6 100644 --- a/server/utils/mediasoup.js +++ b/server/utils/mediasoup.js @@ -1,21 +1,18 @@ -/** - * Mediasoup SFU (Selective Forwarding Unit) setup and management. - * Handles WebRTC router, transport, producer, and consumer creation. - */ - import os from 'node:os' import mediasoup from 'mediasoup' let worker = null -const routers = new Map() // sessionId -> Router -const transports = new Map() // transportId -> WebRtcTransport -export const producers = new Map() // producerId -> Producer +const routers = new Map() +const transports = new Map() +export const producers = new Map() -/** - * Initialize Mediasoup worker (singleton). - * @returns {Promise} The Mediasoup worker. - */ -export async function getWorker() { +const MEDIA_CODECS = [ + { kind: 'video', mimeType: 'video/H264', clockRate: 90000, parameters: { 'packetization-mode': 1, 'profile-level-id': '42e01f' } }, + { kind: 'video', mimeType: 'video/VP8', clockRate: 90000 }, + { kind: 'video', mimeType: 'video/VP9', clockRate: 90000 }, +] + +export const getWorker = async () => { if (worker) return worker worker = await mediasoup.createWorker({ logLevel: process.env.NODE_ENV === 'development' ? 'debug' : 'warn', @@ -30,50 +27,15 @@ export async function getWorker() { return worker } -/** - * Create or get a router for a live session. - * @param {string} sessionId - * @returns {Promise} Router for the session. - */ -export async function getRouter(sessionId) { - if (routers.has(sessionId)) { - return routers.get(sessionId) - } - const w = await getWorker() - const router = await w.createRouter({ - mediaCodecs: [ - { - kind: 'video', - mimeType: 'video/H264', - clockRate: 90000, - parameters: { - 'packetization-mode': 1, - 'profile-level-id': '42e01f', - }, - }, - { - kind: 'video', - mimeType: 'video/VP8', - clockRate: 90000, - }, - { - kind: 'video', - mimeType: 'video/VP9', - clockRate: 90000, - }, - ], - }) +export const getRouter = async (sessionId) => { + const existing = routers.get(sessionId) + if (existing) return existing + const router = await (await getWorker()).createRouter({ mediaCodecs: MEDIA_CODECS }) routers.set(sessionId, router) return router } -/** - * True if the string is a valid IPv4 address (numeric a.b.c.d, each octet 0-255). - * Used to accept request Host as announced IP only when it's safe (no hostnames/DNS rebinding). - * @param {string} host - * @returns {boolean} True if host is a valid IPv4 address. - */ -function isIPv4(host) { +const isIPv4 = (host) => { if (typeof host !== 'string' || !host) return false const parts = host.split('.') if (parts.length !== 4) return false @@ -84,45 +46,24 @@ function isIPv4(host) { return true } -/** - * First non-internal IPv4 from network interfaces (no env read). - * @returns {string | null} First non-internal IPv4 address or null. - */ -function getAnnouncedIpFromInterfaces() { - const ifaces = os.networkInterfaces() - for (const addrs of Object.values(ifaces)) { +const getAnnouncedIpFromInterfaces = () => { + for (const addrs of Object.values(os.networkInterfaces())) { if (!addrs) continue for (const addr of addrs) { - if (addr.family === 'IPv4' && !addr.internal) { - return addr.address - } + if (addr.family === 'IPv4' && !addr.internal) return addr.address } } return null } -/** - * Resolve announced IP: env override, then request host if IPv4, then auto-detect. Pure and deterministic. - * @param {string | undefined} requestHost - Host header from the client. - * @returns {string | null} The IP to announce in ICE, or null for localhost-only. - */ -function resolveAnnouncedIp(requestHost) { +const resolveAnnouncedIp = (requestHost) => { const envIp = process.env.MEDIASOUP_ANNOUNCED_IP?.trim() if (envIp) return envIp if (requestHost && isIPv4(requestHost)) return requestHost return getAnnouncedIpFromInterfaces() } -/** - * Create a WebRTC transport for a router. - * @param {mediasoup.types.Router} router - * @param {boolean} _isProducer - true for publisher, false for consumer (reserved for future use) - * @param {string} [requestHost] - Hostname from the request (e.g. getRequestURL(event).hostname). If a valid IPv4, used as announced IP so the client can reach the server. - * @returns {Promise<{ transport: mediasoup.types.WebRtcTransport, params: object }>} Transport and connection params. - */ -// eslint-disable-next-line no-unused-vars -export async function createTransport(router, _isProducer = false, requestHost = undefined) { - // LAN first so the phone (and remote viewers) try the reachable IP before 127.0.0.1 (loopback on the client). +export const createTransport = async (router, _isProducer = false, requestHost = undefined) => { const announcedIp = resolveAnnouncedIp(requestHost) const listenIps = announcedIp ? [{ ip: '0.0.0.0', announcedIp }, { ip: '127.0.0.1' }] @@ -138,10 +79,10 @@ export async function createTransport(router, _isProducer = false, requestHost = console.error('[mediasoup] Transport creation failed:', err) throw new Error(`Failed to create transport: ${err.message || String(err)}`) }) + transports.set(transport.id, transport) - transport.on('close', () => { - transports.delete(transport.id) - }) + transport.on('close', () => transports.delete(transport.id)) + return { transport, params: { @@ -153,61 +94,22 @@ export async function createTransport(router, _isProducer = false, requestHost = } } -/** - * Get transport by ID. - * @param {string} transportId - * @returns {mediasoup.types.WebRtcTransport | undefined} Transport or undefined. - */ -export function getTransport(transportId) { - return transports.get(transportId) -} +export const getTransport = (transportId) => transports.get(transportId) -/** - * Create a producer (publisher's video track). - * @param {mediasoup.types.WebRtcTransport} transport - * @param {MediaStreamTrack} track - * @returns {Promise} The producer. - */ -export async function createProducer(transport, track) { +export const createProducer = async (transport, track) => { const producer = await transport.produce({ track }) producers.set(producer.id, producer) - producer.on('close', () => { - producers.delete(producer.id) - }) + producer.on('close', () => producers.delete(producer.id)) return producer } -/** - * Get producer by ID. - * @param {string} producerId - * @returns {mediasoup.types.Producer | undefined} Producer or undefined. - */ -export function getProducer(producerId) { - return producers.get(producerId) -} +export const getProducer = (producerId) => producers.get(producerId) -/** - * Get transports Map (for cleanup). - * @returns {Map} Map of transport ID to transport. - */ -export function getTransports() { - return transports -} +export const getTransports = () => transports -/** - * Create a consumer (viewer subscribes to producer's stream). - * @param {mediasoup.types.WebRtcTransport} transport - * @param {mediasoup.types.Producer} producer - * @param {boolean} rtpCapabilities - * @returns {Promise<{ consumer: mediasoup.types.Consumer, params: object }>} Consumer and connection params. - */ -export async function createConsumer(transport, producer, rtpCapabilities) { - if (producer.closed) { - throw new Error('Producer is closed') - } - if (producer.paused) { - await producer.resume() - } +export const createConsumer = async (transport, producer, rtpCapabilities) => { + if (producer.closed) throw new Error('Producer is closed') + if (producer.paused) await producer.resume() const consumer = await transport.consume({ producerId: producer.id, @@ -229,11 +131,7 @@ export async function createConsumer(transport, producer, rtpCapabilities) { } } -/** - * Clean up router for a session. - * @param {string} sessionId - */ -export async function closeRouter(sessionId) { +export const closeRouter = async (sessionId) => { const router = routers.get(sessionId) if (router) { router.close() @@ -241,10 +139,4 @@ export async function closeRouter(sessionId) { } } -/** - * Get all active routers (for debugging/monitoring). - * @returns {Array} Session IDs with active routers - */ -export function getActiveRouters() { - return Array.from(routers.keys()) -} +export const getActiveRouters = () => Array.from(routers.keys()) diff --git a/server/utils/migrateFeedsToDevices.js b/server/utils/migrateFeedsToDevices.js deleted file mode 100644 index 5b8ba8f..0000000 --- a/server/utils/migrateFeedsToDevices.js +++ /dev/null @@ -1,27 +0,0 @@ -import { join } from 'node:path' -import { readFileSync, existsSync } from 'node:fs' -import { getDb } from './db.js' -import { sanitizeStreamUrl } from './feedUtils.js' - -/** - * One-time migration: insert entries from server/data/feeds.json into devices (device_type = 'feed'). - * No-op if devices table already has rows or feeds file is missing. - */ -export async function migrateFeedsToDevices() { - const db = await getDb() - const row = await db.get('SELECT COUNT(*) as n FROM devices') - if (row?.n > 0) return - const path = join(process.cwd(), 'server/data/feeds.json') - if (!existsSync(path)) return - const data = JSON.parse(readFileSync(path, 'utf8')) - const list = Array.isArray(data) ? data : [] - for (const feed of list) { - if (!feed?.id || typeof feed.name !== 'string' || typeof feed.lat !== 'number' || typeof feed.lng !== 'number') continue - const streamUrl = sanitizeStreamUrl(feed.streamUrl) ?? '' - const sourceType = feed.sourceType === 'hls' ? 'hls' : 'mjpeg' - await db.run( - 'INSERT OR IGNORE INTO devices (id, name, device_type, vendor, lat, lng, stream_url, source_type, config) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)', - [feed.id, feed.name, 'feed', null, feed.lat, feed.lng, streamUrl, sourceType, null], - ) - } -} diff --git a/test/e2e/global-setup.js b/test/e2e/global-setup.js index e5c7c6d..e5323f9 100644 --- a/test/e2e/global-setup.js +++ b/test/e2e/global-setup.js @@ -42,25 +42,42 @@ function ensureDevCerts() { } async function globalSetup() { - // Ensure dev certificates exist ensureDevCerts() - // Create test admin user if it doesn't exist - const { get, run } = await getDb() - const existingUser = await get('SELECT id FROM users WHERE identifier = ?', [TEST_ADMIN.identifier]) + let retries = 3 + let lastError = null + while (retries > 0) { + try { + const { get, run } = await getDb() + const existingUser = await get('SELECT id FROM users WHERE identifier = ?', [TEST_ADMIN.identifier]) - if (!existingUser) { - const id = crypto.randomUUID() - const now = new Date().toISOString() - await run( - 'INSERT INTO users (id, identifier, password_hash, role, created_at, auth_provider, oidc_issuer, oidc_sub) VALUES (?, ?, ?, ?, ?, ?, ?, ?)', - [id, TEST_ADMIN.identifier, hashPassword(TEST_ADMIN.password), TEST_ADMIN.role, now, 'local', null, null], - ) - console.log(`[test] Created test admin user: ${TEST_ADMIN.identifier}`) - } - else { - console.log(`[test] Test admin user already exists: ${TEST_ADMIN.identifier}`) + if (!existingUser) { + const id = crypto.randomUUID() + const now = new Date().toISOString() + await run( + 'INSERT INTO users (id, identifier, password_hash, role, created_at, auth_provider, oidc_issuer, oidc_sub) VALUES (?, ?, ?, ?, ?, ?, ?, ?)', + [id, TEST_ADMIN.identifier, hashPassword(TEST_ADMIN.password), TEST_ADMIN.role, now, 'local', null, null], + ) + console.log(`[test] Created test admin user: ${TEST_ADMIN.identifier}`) + } + else { + console.log(`[test] Test admin user already exists: ${TEST_ADMIN.identifier}`) + } + return + } + catch (error) { + lastError = error + if (error.message?.includes('SQLITE_BUSY') || error.message?.includes('database is locked')) { + retries-- + if (retries > 0) { + await new Promise(resolve => setTimeout(resolve, 100 * (4 - retries))) + continue + } + } + throw error + } } + throw lastError } export default globalSetup diff --git a/test/nuxt/KestrelMap.spec.js b/test/nuxt/KestrelMap.spec.js index d42cd16..2ee479e 100644 --- a/test/nuxt/KestrelMap.spec.js +++ b/test/nuxt/KestrelMap.spec.js @@ -10,24 +10,24 @@ vi.mock('leaflet.offline', () => ({ tileLayerOffline: null, savetiles: null })) describe('KestrelMap', () => { it('renders map container', async () => { const wrapper = await mountSuspended(KestrelMap, { - props: { feeds: [] }, + props: { devices: [] }, }) expect(wrapper.find('[data-testid="kestrel-map"]').exists()).toBe(true) }) - it('accepts feeds prop', async () => { - const feeds = [ + it('accepts devices prop', async () => { + const devices = [ { id: '1', name: 'A', lat: 37.7, lng: -122.4, streamUrl: '', sourceType: 'mjpeg' }, ] const wrapper = await mountSuspended(KestrelMap, { - props: { feeds }, + props: { devices }, }) - expect(wrapper.props('feeds')).toEqual(feeds) + expect(wrapper.props('devices')).toEqual(devices) }) it('has select emit', async () => { const wrapper = await mountSuspended(KestrelMap, { - props: { feeds: [] }, + props: { devices: [] }, }) wrapper.vm.$emit('select', { id: 'x', name: 'X', lat: 0, lng: 0 }) expect(wrapper.emitted('select')).toHaveLength(1) @@ -67,7 +67,7 @@ describe('KestrelMap', () => { it('accepts pois and canEditPois props', async () => { const wrapper = await mountSuspended(KestrelMap, { props: { - feeds: [], + devices: [], pois: [{ id: 'p1', lat: 37.7, lng: -122.4, label: 'P', icon_type: 'pin' }], canEditPois: false, }, diff --git a/test/nuxt/api.spec.js b/test/nuxt/api.spec.js deleted file mode 100644 index a42600b..0000000 --- a/test/nuxt/api.spec.js +++ /dev/null @@ -1,14 +0,0 @@ -import { describe, it, expect } from 'vitest' -import { getValidFeeds } from '../../server/utils/feedUtils.js' - -describe('API contract', () => { - it('getValidFeeds returns array suitable for API response', () => { - const raw = [ - { id: '1', name: 'A', lat: 1, lng: 2 }, - { id: '2', name: 'B', lat: 3, lng: 4 }, - ] - const out = getValidFeeds(raw) - expect(Array.isArray(out)).toBe(true) - expect(out).toHaveLength(2) - }) -}) diff --git a/test/unit/feedUtils.spec.js b/test/unit/feedUtils.spec.js deleted file mode 100644 index 2a15310..0000000 --- a/test/unit/feedUtils.spec.js +++ /dev/null @@ -1,119 +0,0 @@ -import { describe, it, expect } from 'vitest' -import { isValidFeed, getValidFeeds, sanitizeStreamUrl, sanitizeFeedForResponse } from '../../server/utils/feedUtils.js' - -describe('feedUtils', () => { - describe('isValidFeed', () => { - it('returns true for valid feed', () => { - expect(isValidFeed({ - id: '1', - name: 'Cam', - lat: 37.7, - lng: -122.4, - })).toBe(true) - }) - - it('returns false for null', () => { - expect(isValidFeed(null)).toBe(false) - }) - - it('returns false for missing id', () => { - expect(isValidFeed({ name: 'x', lat: 0, lng: 0 })).toBe(false) - }) - - it('returns false for wrong lat type', () => { - expect(isValidFeed({ id: '1', name: 'x', lat: '37', lng: -122 })).toBe(false) - }) - }) - - describe('getValidFeeds', () => { - it('returns only valid feeds', () => { - const list = [ - { id: 'a', name: 'A', lat: 1, lng: 2 }, - null, - { id: 'b', name: 'B', lat: 3, lng: 4 }, - ] - expect(getValidFeeds(list)).toHaveLength(2) - }) - - it('returns empty array for non-array', () => { - expect(getValidFeeds(null)).toEqual([]) - expect(getValidFeeds({})).toEqual([]) - }) - }) - - describe('sanitizeStreamUrl', () => { - it('allows http and https', () => { - expect(sanitizeStreamUrl('https://example.com/stream')).toBe('https://example.com/stream') - expect(sanitizeStreamUrl('http://example.com/stream')).toBe('http://example.com/stream') - }) - - it('returns empty for javascript:, data:, and other schemes', () => { - expect(sanitizeStreamUrl('javascript:alert(1)')).toBe('') - expect(sanitizeStreamUrl('data:text/html,