make kestrel a tak server, so that it can send and receive pois as cots data
Some checks failed
ci/woodpecker/pr/pr Pipeline failed

This commit is contained in:
Madison Grubb
2026-02-17 10:42:53 -05:00
parent b18283d3b3
commit b0e8dd7ad9
96 changed files with 5767 additions and 500 deletions

39
server/utils/asyncLock.js Normal file
View File

@@ -0,0 +1,39 @@
/**
* Async lock utility - Promise-based mutex per key.
* Ensures only one async operation executes per key at a time.
*/
const locks = new Map()
/**
* Acquire a lock for a key and execute callback.
* Only one callback per key executes at a time.
* @param {string} key - Lock key
* @param {Function} callback - Async function to execute
* @returns {Promise<any>} Result of callback
*/
export async function acquire(key, callback) {
const lockKey = String(key)
let queue = locks.get(lockKey)
if (!queue) {
queue = Promise.resolve()
locks.set(lockKey, queue)
}
const next = queue.then(() => callback()).finally(() => {
if (locks.get(lockKey) === next) {
locks.delete(lockKey)
}
})
locks.set(lockKey, next)
return next
}
/**
* Clear all locks (for testing).
*/
export function clearLocks() {
locks.clear()
}

23
server/utils/constants.js Normal file
View File

@@ -0,0 +1,23 @@
/**
* Application constants with environment variable support.
*/
// Timeouts (milliseconds)
export const COT_AUTH_TIMEOUT_MS = Number(process.env.COT_AUTH_TIMEOUT_MS) || 15_000
export const LIVE_SESSION_TTL_MS = Number(process.env.LIVE_SESSION_TTL_MS) || 60_000
export const COT_ENTITY_TTL_MS = Number(process.env.COT_ENTITY_TTL_MS) || 90_000
export const POLL_INTERVAL_MS = Number(process.env.POLL_INTERVAL_MS) || 1500
export const SHUTDOWN_TIMEOUT_MS = Number(process.env.SHUTDOWN_TIMEOUT_MS) || 30_000
// Ports
export const COT_PORT = Number(process.env.COT_PORT) || 8089
export const WEBSOCKET_PATH = process.env.WEBSOCKET_PATH || '/ws'
// Limits
export const MAX_PAYLOAD_BYTES = Number(process.env.MAX_PAYLOAD_BYTES) || 64 * 1024
export const MAX_STRING_LENGTH = Number(process.env.MAX_STRING_LENGTH) || 1000
export const MAX_IDENTIFIER_LENGTH = Number(process.env.MAX_IDENTIFIER_LENGTH) || 255
// Mediasoup
export const MEDIASOUP_RTC_MIN_PORT = Number(process.env.MEDIASOUP_RTC_MIN_PORT) || 40000
export const MEDIASOUP_RTC_MAX_PORT = Number(process.env.MEDIASOUP_RTC_MAX_PORT) || 49999

25
server/utils/cotAuth.js Normal file
View File

@@ -0,0 +1,25 @@
import { getDb } from './db.js'
import { verifyPassword } from './password.js'
/**
* Validate CoT auth: local users use password_hash; OIDC users use cot_password_hash (ATAK password).
* @param {string} identifier - KestrelOS identifier (username)
* @param {string} password - Plain password from CoT auth
* @returns {Promise<boolean>} True if valid
*/
export async function validateCotAuth(identifier, password) {
const id = typeof identifier === 'string' ? identifier.trim() : ''
if (!id || typeof password !== 'string') return false
const { get } = await getDb()
const user = await get(
'SELECT auth_provider, password_hash, cot_password_hash FROM users WHERE identifier = ?',
[id],
)
if (!user) return false
const hash = user.auth_provider === 'local' ? user.password_hash : user.cot_password_hash
if (!hash) return false
return verifyPassword(password, hash)
}

142
server/utils/cotParser.js Normal file
View File

@@ -0,0 +1,142 @@
import { XMLParser } from 'fast-xml-parser'
import { MAX_PAYLOAD_BYTES } from './constants.js'
const TAK_MAGIC = 0xBF
const TRADITIONAL_DELIMITER = Buffer.from('</event>', 'utf8')
/**
* @param {Buffer} buf
* @param {number} offset
* @returns {{ value: number, bytesRead: number }} Decoded varint and bytes consumed.
*/
function readVarint(buf, offset) {
let value = 0
let shift = 0
let bytesRead = 0
while (offset + bytesRead < buf.length) {
const b = buf[offset + bytesRead]
bytesRead += 1
value += (b & 0x7F) << shift
if ((b & 0x80) === 0) return { value, bytesRead }
shift += 7
if (shift > 28) return { value: 0, bytesRead: 0 }
}
return { value, bytesRead }
}
/**
* TAK stream frame: 0xBF, varint length, payload.
* @param {Buffer} buf
* @returns {{ payload: Buffer, bytesConsumed: number } | null} Frame or null if incomplete/invalid.
*/
export function parseTakStreamFrame(buf) {
if (!buf || buf.length < 2 || buf[0] !== TAK_MAGIC) return null
const { value: length, bytesRead } = readVarint(buf, 1)
if (length < 0 || length > MAX_PAYLOAD_BYTES) return null
const bytesConsumed = 1 + bytesRead + length
if (buf.length < bytesConsumed) return null
return { payload: buf.subarray(1 + bytesRead, bytesConsumed), bytesConsumed }
}
/**
* Traditional CoT: one XML message delimited by </event>.
* @param {Buffer} buf
* @returns {{ payload: Buffer, bytesConsumed: number } | null} Frame or null if incomplete.
*/
export function parseTraditionalXmlFrame(buf) {
if (!buf || buf.length < 8 || buf[0] !== 0x3C) return null
const idx = buf.indexOf(TRADITIONAL_DELIMITER)
if (idx === -1) return null
const bytesConsumed = idx + TRADITIONAL_DELIMITER.length
if (bytesConsumed > MAX_PAYLOAD_BYTES) return null
return { payload: buf.subarray(0, bytesConsumed), bytesConsumed }
}
const xmlParser = new XMLParser({
ignoreAttributes: false,
attributeNamePrefix: '@_',
parseTagValue: false,
ignoreDeclaration: true,
ignorePiTags: true,
processEntities: false, // Disable entity expansion to prevent XML bomb attacks
maxAttributes: 100,
parseAttributeValue: false,
trimValues: true,
parseTrueNumberOnly: false,
arrayMode: false,
stopNodes: [], // Could add depth limit here if needed
})
/**
* Case-insensitive key lookup in nested object.
* @returns {unknown} Found value or undefined.
*/
function findInObject(obj, key) {
if (!obj || typeof obj !== 'object') return undefined
const k = key.toLowerCase()
for (const [name, val] of Object.entries(obj)) {
if (name.toLowerCase() === k) return val
if (typeof val === 'object' && val !== null) {
const found = findInObject(val, key)
if (found !== undefined) return found
}
}
return undefined
}
/**
* Extract { username, password } from detail.auth (or __auth / credentials).
* @returns {{ username: string, password: string } | null} Credentials or null if missing/invalid.
*/
function extractAuth(parsed) {
const detail = findInObject(parsed, 'detail')
if (!detail || typeof detail !== 'object') return null
const auth = findInObject(detail, 'auth') ?? findInObject(detail, '__auth') ?? findInObject(detail, 'credentials')
if (!auth || typeof auth !== 'object') return null
const username = auth['@_username'] ?? auth['@_Username'] ?? auth.username
const password = auth['@_password'] ?? auth['@_Password'] ?? auth.password
if (typeof username !== 'string' || typeof password !== 'string' || !username.trim()) return null
return { username: username.trim(), password }
}
/**
* Parse CoT XML payload into auth or position. Does not mutate payload.
* @param {Buffer} payload - UTF-8 XML
* @returns {{ type: 'auth', username: string, password: string } | { type: 'cot', id: string, lat: number, lng: number, label: string, eventType: string } | null} Auth or position, or null.
*/
export function parseCotPayload(payload) {
if (!payload?.length) return null
const str = payload.toString('utf8').trim()
if (!str.startsWith('<')) return null
try {
const parsed = xmlParser.parse(str)
const event = findInObject(parsed, 'event')
if (!event || typeof event !== 'object') return null
const auth = extractAuth(parsed)
if (auth) return { type: 'auth', username: auth.username, password: auth.password }
const uid = String(event['@_uid'] ?? event.uid ?? '')
const eventType = String(event['@_type'] ?? event.type ?? '')
const point = findInObject(parsed, 'point') ?? findInObject(event, 'point')
let lat = Number.NaN
let lng = Number.NaN
if (point && typeof point === 'object') {
lat = Number(point['@_lat'] ?? point.lat)
lng = Number(point['@_lon'] ?? point.lon ?? point['@_lng'] ?? point.lng)
}
if (!Number.isFinite(lat) || !Number.isFinite(lng)) return null
const detail = findInObject(parsed, 'detail')
const contact = detail && typeof detail === 'object' ? (findInObject(detail, 'contact') ?? detail) : null
const callsign = contact && typeof contact === 'object'
? (contact['@_callsign'] ?? contact.callsign ?? contact['@_Callsign'])
: ''
const label = typeof callsign === 'string' ? callsign.trim() || uid : uid
return { type: 'cot', id: uid, lat, lng, label, eventType }
}
catch {
return null
}
}

12
server/utils/cotRouter.js Normal file
View File

@@ -0,0 +1,12 @@
/**
* CoT stream first-byte detection: TAK Protocol (0xBF) or traditional XML (0x3C '<').
* Used by tests and any code that must distinguish CoT from other protocols.
*/
export const COT_FIRST_BYTE_TAK = 0xBF
export const COT_FIRST_BYTE_XML = 0x3C
/** @param {number} byte - First byte of stream. @returns {boolean} */
export function isCotFirstByte(byte) {
return byte === COT_FIRST_BYTE_TAK || byte === COT_FIRST_BYTE_XML
}

73
server/utils/cotSsl.js Normal file
View File

@@ -0,0 +1,73 @@
import { existsSync, readFileSync, unlinkSync } from 'node:fs'
import { join, dirname } from 'node:path'
import { tmpdir } from 'node:os'
import { execSync } from 'node:child_process'
import { fileURLToPath } from 'node:url'
const __dirname = dirname(fileURLToPath(import.meta.url))
/** Default password for the CoT trust store (document in atak-itak.md). */
export const TRUSTSTORE_PASSWORD = 'kestrelos'
/** Default CoT server port. */
export const DEFAULT_COT_PORT = 8089
/**
* CoT port from env or default.
* @returns {number} Port number (COT_PORT env or DEFAULT_COT_PORT).
*/
export function getCotPort() {
return Number(process.env.COT_PORT ?? DEFAULT_COT_PORT)
}
/** Message when an endpoint requires TLS but server is not using it. */
export const COT_TLS_REQUIRED_MESSAGE = 'Only available when the server runs with SSL (e.g. .dev-certs or COT_SSL_*).'
/**
* Resolve CoT server TLS cert and key paths (for plugin and API).
* @param {{ cotSslCert?: string, cotSslKey?: string }} [config] - Runtime config (optional)
* @returns {{ certPath: string, keyPath: string } | null} Paths when TLS is configured, else null.
*/
export function getCotSslPaths(config = {}) {
if (process.env.COT_SSL_CERT && process.env.COT_SSL_KEY) {
return { certPath: process.env.COT_SSL_CERT, keyPath: process.env.COT_SSL_KEY }
}
if (config.cotSslCert && config.cotSslKey) {
return { certPath: config.cotSslCert, keyPath: config.cotSslKey }
}
const candidates = [
join(process.cwd(), '.dev-certs', 'cert.pem'),
join(__dirname, '../../.dev-certs', 'cert.pem'),
]
for (const certPath of candidates) {
const keyPath = certPath.replace('cert.pem', 'key.pem')
if (existsSync(certPath) && existsSync(keyPath)) {
return { certPath, keyPath }
}
}
return null
}
/**
* Build a P12 trust store from a PEM cert path (for truststore download and server package).
* @param {string} certPath - Path to cert.pem
* @param {string} password - P12 password
* @returns {Buffer} P12 buffer
* @throws {Error} If openssl fails
*/
export function buildP12FromCertPath(certPath, password) {
const outPath = join(tmpdir(), `kestrelos-cot-p12-${Date.now()}.p12`)
try {
execSync(
`openssl pkcs12 -export -nokeys -in "${certPath}" -out "${outPath}" -passout pass:${password}`,
{ stdio: 'pipe' },
)
const p12 = readFileSync(outPath)
unlinkSync(outPath)
return p12
}
catch (err) {
if (existsSync(outPath)) unlinkSync(outPath)
throw err
}
}

71
server/utils/cotStore.js Normal file
View File

@@ -0,0 +1,71 @@
/**
* In-memory CoT entity store: upsert by id, prune on read by TTL.
* Single source of truth; getActiveEntities returns new objects (no mutation of returned refs).
*/
import { acquire } from './asyncLock.js'
import { COT_ENTITY_TTL_MS } from './constants.js'
const entities = new Map()
/**
* Upsert entity by id. Input is not mutated; stored value is a new object.
* @param {{ id: string, lat: number, lng: number, label?: string, eventType?: string, type?: string }} parsed
*/
export async function updateFromCot(parsed) {
if (!parsed || typeof parsed.id !== 'string') return
const lat = Number(parsed.lat)
const lng = Number(parsed.lng)
if (!Number.isFinite(lat) || !Number.isFinite(lng)) return
await acquire(`cot-${parsed.id}`, async () => {
const now = Date.now()
const existing = entities.get(parsed.id)
const label = typeof parsed.label === 'string' ? parsed.label : (existing?.label ?? parsed.id)
const type = typeof parsed.eventType === 'string' ? parsed.eventType : (typeof parsed.type === 'string' ? parsed.type : (existing?.type ?? ''))
entities.set(parsed.id, {
id: parsed.id,
lat,
lng,
label,
type,
updatedAt: now,
})
})
}
/**
* Active entities (updated within ttlMs). Prunes expired. Returns new array of new objects.
* @param {number} [ttlMs]
* @returns {Promise<Array<{ id: string, lat: number, lng: number, label: string, type: string, updatedAt: number }>>} Snapshot of active entities.
*/
export async function getActiveEntities(ttlMs = COT_ENTITY_TTL_MS) {
return acquire('cot-prune', async () => {
const now = Date.now()
const active = []
const expired = []
for (const entity of entities.values()) {
if (now - entity.updatedAt <= ttlMs) {
active.push({
id: entity.id,
lat: entity.lat,
lng: entity.lng,
label: entity.label ?? entity.id,
type: entity.type ?? '',
updatedAt: entity.updatedAt,
})
}
else {
expired.push(entity.id)
}
}
for (const id of expired) entities.delete(id)
return active
})
}
/** Clear store (tests only). */
export function clearCotStore() {
entities.clear()
}

View File

@@ -3,11 +3,13 @@ import { mkdirSync, existsSync } from 'node:fs'
import { createRequire } from 'node:module'
import { promisify } from 'node:util'
import { bootstrapAdmin } from './bootstrap.js'
import { registerCleanup } from './shutdown.js'
const require = createRequire(import.meta.url)
const sqlite3 = require('sqlite3')
// Resolve from project root so bundled server (e.g. .output) finds node_modules/sqlite3
const requireFromRoot = createRequire(join(process.cwd(), 'package.json'))
const sqlite3 = requireFromRoot('sqlite3')
const SCHEMA_VERSION = 3
const SCHEMA_VERSION = 4
const DB_BUSY_TIMEOUT_MS = 5000
let dbInstance = null
@@ -111,6 +113,12 @@ const migrateToV3 = async (run, all) => {
await run('ALTER TABLE users ADD COLUMN avatar_path TEXT')
}
const migrateToV4 = async (run, all) => {
const info = await all('PRAGMA table_info(users)')
if (info.some(c => c.name === 'cot_password_hash')) return
await run('ALTER TABLE users ADD COLUMN cot_password_hash TEXT')
}
const runMigrations = async (run, all, get) => {
const version = await getSchemaVersion(get)
if (version >= SCHEMA_VERSION) return
@@ -122,6 +130,10 @@ const runMigrations = async (run, all, get) => {
await migrateToV3(run, all)
await setSchemaVersion(run, 3)
}
if (version < 4) {
await migrateToV4(run, all)
await setSchemaVersion(run, 4)
}
}
const initDb = async (db, run, all, get) => {
@@ -167,9 +179,91 @@ export async function getDb() {
}
dbInstance = { db, run, all, get }
registerCleanup(async () => {
if (dbInstance) {
try {
await new Promise((resolve, reject) => {
dbInstance.db.close((err) => {
if (err) reject(err)
else resolve()
})
})
}
catch (error) {
console.error('[db] Error closing database during shutdown:', error?.message)
}
dbInstance = null
}
})
return dbInstance
}
/**
* Health check for database connection.
* @returns {Promise<{ healthy: boolean, error?: string }>} Health status
*/
export async function healthCheck() {
try {
const db = await getDb()
await db.get('SELECT 1')
return { healthy: true }
}
catch (error) {
return {
healthy: false,
error: error?.message || String(error),
}
}
}
/**
* Database connection model documentation:
*
* KestrelOS uses SQLite with WAL (Write-Ahead Logging) mode for concurrent access.
* - Single connection instance shared across all requests (singleton pattern)
* - WAL mode allows multiple readers and one writer concurrently
* - Connection is initialized on first getDb() call and reused thereafter
* - Busy timeout is set to 5000ms to handle concurrent access gracefully
* - Transactions are supported via withTransaction() helper
*
* Concurrency considerations:
* - SQLite with WAL handles concurrent reads efficiently
* - Writes are serialized (one at a time)
* - For high write loads, consider migrating to PostgreSQL
* - Current model is suitable for moderate traffic (< 100 req/sec)
*
* Connection lifecycle:
* - Created on first getDb() call
* - Persists for application lifetime
* - Closed during graceful shutdown
* - Test path can be set via setDbPathForTest() for testing
*/
/**
* Execute a callback within a database transaction.
* Automatically commits on success or rolls back on error.
* @param {object} db - Database instance from getDb()
* @param {Function} callback - Async function receiving { run, all, get }
* @returns {Promise<any>} Result of callback
*/
export async function withTransaction(db, callback) {
const { run } = db
await run('BEGIN TRANSACTION')
try {
const result = await callback(db)
await run('COMMIT')
return result
}
catch (error) {
await run('ROLLBACK').catch(() => {
// Ignore rollback errors
})
throw error
}
}
export function closeDb() {
if (!dbInstance) return
try {

86
server/utils/errors.js Normal file
View File

@@ -0,0 +1,86 @@
/**
* Custom error classes and error handling utilities.
*/
/**
* Base application error.
*/
export class AppError extends Error {
constructor(message, statusCode = 500, code = 'INTERNAL_ERROR') {
super(message)
this.name = this.constructor.name
this.statusCode = statusCode
this.code = code
Error.captureStackTrace(this, this.constructor)
}
}
/**
* Validation error (400).
*/
export class ValidationError extends AppError {
constructor(message, details = null) {
super(message, 400, 'VALIDATION_ERROR')
this.details = details
}
}
/**
* Not found error (404).
*/
export class NotFoundError extends AppError {
constructor(resource = 'Resource') {
super(`${resource} not found`, 404, 'NOT_FOUND')
}
}
/**
* Unauthorized error (401).
*/
export class UnauthorizedError extends AppError {
constructor(message = 'Unauthorized') {
super(message, 401, 'UNAUTHORIZED')
}
}
/**
* Forbidden error (403).
*/
export class ForbiddenError extends AppError {
constructor(message = 'Forbidden') {
super(message, 403, 'FORBIDDEN')
}
}
/**
* Conflict error (409).
*/
export class ConflictError extends AppError {
constructor(message = 'Conflict') {
super(message, 409, 'CONFLICT')
}
}
/**
* Format error response for API.
* @param {Error} error - Error object
* @returns {object} Formatted error response
*/
export function formatErrorResponse(error) {
if (error instanceof AppError) {
return {
error: {
code: error.code,
message: error.message,
...(error.details && { details: error.details }),
},
}
}
return {
error: {
code: 'INTERNAL_ERROR',
message: error?.message || 'Internal server error',
},
}
}

View File

@@ -1,47 +1,79 @@
import { closeRouter, getProducer, getTransport } from './mediasoup.js'
import { acquire } from './asyncLock.js'
import { LIVE_SESSION_TTL_MS } from './constants.js'
const TTL_MS = 60_000
const sessions = new Map()
export const createSession = (userId, label = '') => {
const id = crypto.randomUUID()
const session = {
id,
userId,
label: (label || 'Live').trim() || 'Live',
lat: 0,
lng: 0,
updatedAt: Date.now(),
routerId: null,
producerId: null,
transportId: null,
}
sessions.set(id, session)
return session
export const createSession = async (userId, label = '') => {
return acquire(`session-create-${userId}`, async () => {
const id = crypto.randomUUID()
const session = {
id,
userId,
label: (label || 'Live').trim() || 'Live',
lat: 0,
lng: 0,
updatedAt: Date.now(),
routerId: null,
producerId: null,
transportId: null,
}
sessions.set(id, session)
return session
})
}
/**
* Atomically get existing active session or create new one for user.
* @param {string} userId - User ID
* @param {string} label - Session label
* @returns {Promise<object>} Session object
*/
export const getOrCreateSession = async (userId, label = '') => {
return acquire(`session-get-or-create-${userId}`, async () => {
const now = Date.now()
for (const s of sessions.values()) {
if (s.userId === userId && now - s.updatedAt <= LIVE_SESSION_TTL_MS) {
return s
}
}
return await createSession(userId, label)
})
}
export const getLiveSession = id => sessions.get(id)
export const getActiveSessionByUserId = (userId) => {
const now = Date.now()
for (const s of sessions.values()) {
if (s.userId === userId && now - s.updatedAt <= TTL_MS) return s
}
export const getActiveSessionByUserId = async (userId) => {
return acquire(`session-get-${userId}`, async () => {
const now = Date.now()
for (const s of sessions.values()) {
if (s.userId === userId && now - s.updatedAt <= LIVE_SESSION_TTL_MS) return s
}
})
}
export const updateLiveSession = (id, updates) => {
const session = sessions.get(id)
if (!session) return
const now = Date.now()
if (Number.isFinite(updates.lat)) session.lat = updates.lat
if (Number.isFinite(updates.lng)) session.lng = updates.lng
if (updates.routerId !== undefined) session.routerId = updates.routerId
if (updates.producerId !== undefined) session.producerId = updates.producerId
if (updates.transportId !== undefined) session.transportId = updates.transportId
session.updatedAt = now
export const updateLiveSession = async (id, updates) => {
return acquire(`session-update-${id}`, async () => {
const session = sessions.get(id)
if (!session) {
throw new Error('Session not found')
}
const now = Date.now()
if (Number.isFinite(updates.lat)) session.lat = updates.lat
if (Number.isFinite(updates.lng)) session.lng = updates.lng
if (updates.routerId !== undefined) session.routerId = updates.routerId
if (updates.producerId !== undefined) session.producerId = updates.producerId
if (updates.transportId !== undefined) session.transportId = updates.transportId
session.updatedAt = now
return session
})
}
export const deleteLiveSession = id => sessions.delete(id)
export const deleteLiveSession = async (id) => {
await acquire(`session-delete-${id}`, async () => {
sessions.delete(id)
})
}
export const clearSessions = () => sessions.clear()
@@ -62,31 +94,33 @@ const cleanupSession = async (session) => {
}
export const getActiveSessions = async () => {
const now = Date.now()
const active = []
const expired = []
return acquire('get-active-sessions', async () => {
const now = Date.now()
const active = []
const expired = []
for (const session of sessions.values()) {
if (now - session.updatedAt <= TTL_MS) {
active.push({
id: session.id,
userId: session.userId,
label: session.label,
lat: session.lat,
lng: session.lng,
updatedAt: session.updatedAt,
hasStream: Boolean(session.producerId),
})
for (const session of sessions.values()) {
if (now - session.updatedAt <= LIVE_SESSION_TTL_MS) {
active.push({
id: session.id,
userId: session.userId,
label: session.label,
lat: session.lat,
lng: session.lng,
updatedAt: session.updatedAt,
hasStream: Boolean(session.producerId),
})
}
else {
expired.push(session)
}
}
else {
expired.push(session)
for (const session of expired) {
await cleanupSession(session)
sessions.delete(session.id)
}
}
for (const session of expired) {
await cleanupSession(session)
sessions.delete(session.id)
}
return active
return active
})
}

84
server/utils/logger.js Normal file
View File

@@ -0,0 +1,84 @@
/**
* Structured logger with request context support.
* Uses AsyncLocalStorage to provide request-scoped context that's automatically isolated per async context.
*/
import { AsyncLocalStorage } from 'node:async_hooks'
const asyncLocalStorage = new AsyncLocalStorage()
/**
* Run a function with logger context. Context is automatically isolated per async execution.
* @param {string} reqId - Request ID
* @param {string|null} uId - User ID (optional)
* @param {Function} fn - Function to run with context
* @returns {Promise<any>} Result of the function
*/
export function runWithContext(reqId, uId, fn) {
return asyncLocalStorage.run({ requestId: reqId, userId: uId }, fn)
}
/**
* Set context for the current async context. Use runWithContext() instead for proper isolation.
* @deprecated Use runWithContext() instead for proper async context isolation
* @param {string} reqId - Request ID
* @param {string|null} uId - User ID (optional)
*/
export function setContext(reqId, uId = null) {
const store = asyncLocalStorage.getStore()
if (store) {
store.requestId = reqId
store.userId = uId
}
}
/**
* Clear context for the current async context.
* @deprecated Context is automatically cleared when async context ends. Use runWithContext() instead.
*/
export function clearContext() {
const store = asyncLocalStorage.getStore()
if (store) {
store.requestId = null
store.userId = null
}
}
function getContext() {
return asyncLocalStorage.getStore() || { requestId: null, userId: null }
}
function formatMessage(level, message, context = {}) {
const { requestId, userId } = getContext()
const timestamp = new Date().toISOString()
const ctx = {
timestamp,
level,
requestId,
...(userId && { userId }),
...context,
}
return `[${level.toUpperCase()}] ${JSON.stringify({ message, ...ctx })}`
}
export function info(message, context = {}) {
console.log(formatMessage('info', message, context))
}
export function error(message, context = {}) {
const ctx = { ...context }
if (context.error && context.error.stack) {
ctx.stack = context.error.stack
}
console.error(formatMessage('error', message, ctx))
}
export function warn(message, context = {}) {
console.warn(formatMessage('warn', message, context))
}
export function debug(message, context = {}) {
if (process.env.NODE_ENV === 'development') {
console.debug(formatMessage('debug', message, context))
}
}

View File

@@ -1,5 +1,7 @@
import os from 'node:os'
import mediasoup from 'mediasoup'
import { acquire } from './asyncLock.js'
import { MEDIASOUP_RTC_MIN_PORT, MEDIASOUP_RTC_MAX_PORT } from './constants.js'
let worker = null
const routers = new Map()
@@ -17,22 +19,25 @@ export const getWorker = async () => {
worker = await mediasoup.createWorker({
logLevel: process.env.NODE_ENV === 'development' ? 'debug' : 'warn',
logTags: ['info', 'ice', 'dtls', 'rtp', 'srtp', 'rtcp'],
rtcMinPort: 40000,
rtcMaxPort: 49999,
rtcMinPort: MEDIASOUP_RTC_MIN_PORT,
rtcMaxPort: MEDIASOUP_RTC_MAX_PORT,
})
worker.on('died', () => {
console.error('[mediasoup] Worker died, exiting')
process.exit(1)
worker.on('died', async (error) => {
console.error('[mediasoup] Worker died:', error?.message || String(error))
const { graceful } = await import('./shutdown.js')
await graceful(error || new Error('Mediasoup worker died'))
})
return worker
}
export const getRouter = async (sessionId) => {
const existing = routers.get(sessionId)
if (existing) return existing
const router = await (await getWorker()).createRouter({ mediaCodecs: MEDIA_CODECS })
routers.set(sessionId, router)
return router
return acquire(`router-${sessionId}`, async () => {
const existing = routers.get(sessionId)
if (existing) return existing
const router = await (await getWorker()).createRouter({ mediaCodecs: MEDIA_CODECS })
routers.set(sessionId, router)
return router
})
}
const isIPv4 = (host) => {
@@ -64,34 +69,36 @@ const resolveAnnouncedIp = (requestHost) => {
}
export const createTransport = async (router, requestHost = undefined) => {
const announcedIp = resolveAnnouncedIp(requestHost)
const listenIps = announcedIp
? [{ ip: '0.0.0.0', announcedIp }, { ip: '127.0.0.1' }]
: [{ ip: '127.0.0.1' }]
return acquire(`transport-${router.id}`, async () => {
const announcedIp = resolveAnnouncedIp(requestHost)
const listenIps = announcedIp
? [{ ip: '0.0.0.0', announcedIp }, { ip: '127.0.0.1' }]
: [{ ip: '127.0.0.1' }]
const transport = await router.createWebRtcTransport({
listenIps,
enableUdp: true,
enableTcp: true,
preferUdp: true,
initialAvailableOutgoingBitrate: 1_000_000,
}).catch((err) => {
console.error('[mediasoup] Transport creation failed:', err)
throw new Error(`Failed to create transport: ${err.message || String(err)}`)
const transport = await router.createWebRtcTransport({
listenIps,
enableUdp: true,
enableTcp: true,
preferUdp: true,
initialAvailableOutgoingBitrate: 1_000_000,
}).catch((err) => {
console.error('[mediasoup] Transport creation failed:', err)
throw new Error(`Failed to create transport: ${err.message || String(err)}`)
})
transports.set(transport.id, transport)
transport.on('close', () => transports.delete(transport.id))
return {
transport,
params: {
id: transport.id,
iceParameters: transport.iceParameters,
iceCandidates: transport.iceCandidates,
dtlsParameters: transport.dtlsParameters,
},
}
})
transports.set(transport.id, transport)
transport.on('close', () => transports.delete(transport.id))
return {
transport,
params: {
id: transport.id,
iceParameters: transport.iceParameters,
iceCandidates: transport.iceCandidates,
dtlsParameters: transport.dtlsParameters,
},
}
}
export const getTransport = transportId => transports.get(transportId)

100
server/utils/queries.js Normal file
View File

@@ -0,0 +1,100 @@
/**
* Reusable query functions - eliminates SQL duplication across routes.
*/
const updateEntity = async (db, table, id, updates, getById) => {
if (Object.keys(updates).length === 0) return getById(db, id)
const { buildUpdateQuery } = await import('./queryBuilder.js')
const { query, params } = buildUpdateQuery(table, null, updates)
if (!query) return getById(db, id)
await db.run(query, [...params, id])
return getById(db, id)
}
export async function getDeviceById(db, id) {
const result = await db.get('SELECT id, name, device_type, vendor, lat, lng, stream_url, source_type, config FROM devices WHERE id = ?', [id])
return result || null
}
export async function getAllDevices(db) {
return db.all('SELECT id, name, device_type, vendor, lat, lng, stream_url, source_type, config FROM devices ORDER BY id')
}
export async function createDevice(db, data) {
const id = crypto.randomUUID()
await db.run(
'INSERT INTO devices (id, name, device_type, vendor, lat, lng, stream_url, source_type, config) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)',
[id, data.name, data.device_type, data.vendor, data.lat, data.lng, data.stream_url, data.source_type, data.config],
)
return getDeviceById(db, id)
}
export async function updateDevice(db, id, updates) {
return updateEntity(db, 'devices', id, updates, getDeviceById)
}
export async function getUserById(db, id) {
const result = await db.get('SELECT id, identifier, role, auth_provider, password_hash FROM users WHERE id = ?', [id])
return result || null
}
export async function getUserByIdentifier(db, identifier) {
const result = await db.get('SELECT id, identifier, role, password_hash FROM users WHERE identifier = ?', [identifier])
return result || null
}
export async function createUser(db, data) {
const id = crypto.randomUUID()
await db.run(
'INSERT INTO users (id, identifier, password_hash, role, created_at, auth_provider, oidc_issuer, oidc_sub) VALUES (?, ?, ?, ?, ?, ?, ?, ?)',
[id, data.identifier, data.password_hash, data.role, data.created_at, data.auth_provider || 'local', data.oidc_issuer || null, data.oidc_sub || null],
)
return db.get('SELECT id, identifier, role, auth_provider FROM users WHERE id = ?', [id])
}
export async function updateUser(db, id, updates) {
if (Object.keys(updates).length === 0) return getUserById(db, id)
const { buildUpdateQuery } = await import('./queryBuilder.js')
const { query, params } = buildUpdateQuery('users', null, updates)
if (!query) return getUserById(db, id)
await db.run(query, [...params, id])
return db.get('SELECT id, identifier, role, auth_provider FROM users WHERE id = ?', [id])
}
export async function getPoiById(db, id) {
const result = await db.get('SELECT id, lat, lng, label, icon_type FROM pois WHERE id = ?', [id])
return result || null
}
export async function getAllPois(db) {
return db.all('SELECT id, lat, lng, label, icon_type FROM pois ORDER BY id')
}
export async function createPoi(db, data) {
const id = crypto.randomUUID()
await db.run(
'INSERT INTO pois (id, lat, lng, label, icon_type) VALUES (?, ?, ?, ?, ?)',
[id, data.lat, data.lng, data.label || '', data.icon_type || 'pin'],
)
return getPoiById(db, id)
}
export async function updatePoi(db, id, updates) {
return updateEntity(db, 'pois', id, updates, getPoiById)
}
export async function getSessionById(db, id) {
const result = await db.get('SELECT id, user_id, expires_at FROM sessions WHERE id = ?', [id])
return result || null
}
export async function createDbSession(db, data) {
await db.run(
'INSERT INTO sessions (id, user_id, created_at, expires_at) VALUES (?, ?, ?, ?)',
[data.id, data.user_id, data.created_at, data.expires_at],
)
}
export async function deleteSession(db, id) {
await db.run('DELETE FROM sessions WHERE id = ?', [id])
}

View File

@@ -0,0 +1,28 @@
/**
* Query builder for safe dynamic UPDATE queries with column whitelist validation.
* Prevents SQL injection by validating column names against allowed sets.
*/
const ALLOWED_COLUMNS = {
devices: new Set(['name', 'device_type', 'vendor', 'lat', 'lng', 'stream_url', 'source_type', 'config']),
users: new Set(['role', 'identifier', 'password_hash']),
pois: new Set(['label', 'icon_type', 'lat', 'lng']),
}
export function buildUpdateQuery(table, allowedColumns, updates) {
if (!ALLOWED_COLUMNS[table]) throw new Error(`Unknown table: ${table}`)
const columns = allowedColumns || ALLOWED_COLUMNS[table]
const clauses = []
const params = []
for (const [column, value] of Object.entries(updates)) {
if (!columns.has(column)) throw new Error(`Invalid column: ${column} for table: ${table}`)
clauses.push(`${column} = ?`)
params.push(value)
}
if (clauses.length === 0) return { query: '', params: [] }
return { query: `UPDATE ${table} SET ${clauses.join(', ')} WHERE id = ?`, params }
}
export function getAllowedColumns(table) {
return ALLOWED_COLUMNS[table] || new Set()
}

24
server/utils/sanitize.js Normal file
View File

@@ -0,0 +1,24 @@
/**
* Input sanitization utilities - pure functions for cleaning user input.
*/
import { MAX_IDENTIFIER_LENGTH, MAX_STRING_LENGTH } from './constants.js'
const IDENTIFIER_REGEX = /^\w+$/
export function sanitizeString(str, maxLength = MAX_STRING_LENGTH) {
if (typeof str !== 'string') return ''
const trimmed = str.trim()
return trimmed.length > maxLength ? trimmed.slice(0, maxLength) : trimmed
}
export function sanitizeIdentifier(str) {
if (typeof str !== 'string') return ''
const trimmed = str.trim()
if (trimmed.length === 0 || trimmed.length > MAX_IDENTIFIER_LENGTH) return ''
return IDENTIFIER_REGEX.test(trimmed) ? trimmed : ''
}
export function sanitizeLabel(str, maxLength = MAX_STRING_LENGTH) {
return sanitizeString(str, maxLength)
}

68
server/utils/shutdown.js Normal file
View File

@@ -0,0 +1,68 @@
/**
* Graceful shutdown handler - registers cleanup functions and handles shutdown signals.
*/
import { SHUTDOWN_TIMEOUT_MS } from './constants.js'
const cleanupFunctions = []
let isShuttingDown = false
export function clearCleanup() {
cleanupFunctions.length = 0
isShuttingDown = false
}
export function registerCleanup(fn) {
if (typeof fn !== 'function') throw new TypeError('Cleanup function must be a function')
cleanupFunctions.push(fn)
}
async function executeCleanup() {
if (isShuttingDown) return
isShuttingDown = true
for (let i = cleanupFunctions.length - 1; i >= 0; i--) {
try {
await cleanupFunctions[i]()
}
catch (error) {
console.error(`[shutdown] Cleanup function ${i} failed:`, error?.message || String(error))
}
}
}
export async function graceful(error) {
if (error) {
console.error('[shutdown] Shutting down due to error:', error?.message || String(error))
if (error.stack) console.error('[shutdown] Stack trace:', error.stack)
}
else {
console.log('[shutdown] Initiating graceful shutdown')
}
const timeout = setTimeout(() => {
console.error('[shutdown] Shutdown timeout exceeded, forcing exit')
process.exit(1)
}, SHUTDOWN_TIMEOUT_MS)
try {
await executeCleanup()
clearTimeout(timeout)
console.log('[shutdown] Cleanup complete')
process.exit(error ? 1 : 0)
}
catch (err) {
clearTimeout(timeout)
console.error('[shutdown] Error during cleanup:', err?.message || String(err))
process.exit(1)
}
}
export function initShutdownHandlers() {
for (const signal of ['SIGTERM', 'SIGINT']) {
process.on(signal, () => {
console.log(`[shutdown] Received ${signal}`)
graceful().catch((err) => {
console.error('[shutdown] Error in graceful shutdown:', err)
process.exit(1)
})
})
}
}

128
server/utils/validation.js Normal file
View File

@@ -0,0 +1,128 @@
/**
* Validation schemas - pure functions for consistent input validation.
*/
import { sanitizeString, sanitizeIdentifier, sanitizeLabel } from './sanitize.js'
import { DEVICE_TYPES, SOURCE_TYPES } from './deviceUtils.js'
import { POI_ICON_TYPES } from './poiConstants.js'
const ROLES = ['admin', 'leader', 'member']
const validateNumber = (value, field) => {
const num = Number(value)
return Number.isFinite(num) ? { valid: true, value: num } : { valid: false, error: `${field} must be a finite number` }
}
const validateEnum = (value, allowed, field) => allowed.includes(value) ? { valid: true, value } : { valid: false, error: `Invalid ${field}` }
const handleField = (d, field, handler, updates, errors, outputField = null) => {
if (d[field] !== undefined) {
const result = handler(d[field])
if (result.valid) updates[outputField || field] = result.value
else errors.push(result.error)
}
}
export function validateDevice(data) {
if (!data || typeof data !== 'object') return { valid: false, errors: ['body required'] }
const d = /** @type {Record<string, unknown>} */ (data)
const errors = []
const latCheck = validateNumber(d.lat, 'lat')
const lngCheck = validateNumber(d.lng, 'lng')
if (!latCheck.valid || !lngCheck.valid) errors.push('lat and lng required as finite numbers')
if (errors.length > 0) return { valid: false, errors }
return {
valid: true,
errors: [],
data: {
name: sanitizeString(d.name, 1000),
device_type: validateEnum(d.device_type, DEVICE_TYPES, 'device_type').value || 'feed',
vendor: d.vendor !== undefined ? sanitizeString(d.vendor, 255) : null,
lat: latCheck.value,
lng: lngCheck.value,
stream_url: typeof d.stream_url === 'string' ? sanitizeString(d.stream_url, 2000) : '',
source_type: validateEnum(d.source_type, SOURCE_TYPES, 'source_type').value || 'mjpeg',
config: d.config == null ? null : (typeof d.config === 'string' ? d.config : JSON.stringify(d.config)),
},
}
}
export function validateUpdateDevice(data) {
if (!data || typeof data !== 'object') return { valid: true, errors: [], data: {} }
const d = /** @type {Record<string, unknown>} */ (data)
const errors = []
const updates = {}
if (d.name !== undefined) updates.name = sanitizeString(d.name, 1000)
handleField(d, 'device_type', v => validateEnum(v, DEVICE_TYPES, 'device_type'), updates, errors)
if (d.vendor !== undefined) updates.vendor = d.vendor === null || d.vendor === '' ? null : sanitizeString(d.vendor, 255)
handleField(d, 'lat', v => validateNumber(v, 'lat'), updates, errors)
handleField(d, 'lng', v => validateNumber(v, 'lng'), updates, errors)
if (d.stream_url !== undefined) updates.stream_url = sanitizeString(d.stream_url, 2000)
handleField(d, 'source_type', v => validateEnum(v, SOURCE_TYPES, 'source_type'), updates, errors)
if (d.config !== undefined) updates.config = d.config === null ? null : (typeof d.config === 'string' ? d.config : JSON.stringify(d.config))
return errors.length > 0 ? { valid: false, errors } : { valid: true, errors: [], data: updates }
}
export function validateUser(data) {
if (!data || typeof data !== 'object') return { valid: false, errors: ['body required'] }
const d = /** @type {Record<string, unknown>} */ (data)
const errors = []
const identifier = sanitizeIdentifier(d.identifier)
const password = typeof d.password === 'string' ? d.password : ''
const role = typeof d.role === 'string' ? d.role : ''
if (!identifier) errors.push('identifier required')
if (!password) errors.push('password required')
if (!role || !ROLES.includes(role)) errors.push('role must be admin, leader, or member')
return errors.length > 0 ? { valid: false, errors } : { valid: true, errors: [], data: { identifier, password, role: role || 'member' } }
}
export function validateUpdateUser(data) {
if (!data || typeof data !== 'object') return { valid: true, errors: [], data: {} }
const d = /** @type {Record<string, unknown>} */ (data)
const errors = []
const updates = {}
if (d.role !== undefined) {
if (ROLES.includes(d.role)) updates.role = d.role
else errors.push('role must be admin, leader, or member')
}
if (d.identifier !== undefined) {
const identifier = sanitizeIdentifier(d.identifier)
if (!identifier) errors.push('identifier cannot be empty')
else updates.identifier = identifier
}
if (d.password !== undefined && d.password !== '') {
if (typeof d.password !== 'string' || !d.password) errors.push('password cannot be empty')
else updates.password = d.password
}
return errors.length > 0 ? { valid: false, errors } : { valid: true, errors: [], data: updates }
}
export function validatePoi(data) {
if (!data || typeof data !== 'object') return { valid: false, errors: ['body required'] }
const d = /** @type {Record<string, unknown>} */ (data)
const latCheck = validateNumber(d.lat, 'lat')
const lngCheck = validateNumber(d.lng, 'lng')
if (!latCheck.valid || !lngCheck.valid) return { valid: false, errors: ['lat and lng required as finite numbers'] }
return {
valid: true,
errors: [],
data: {
lat: latCheck.value,
lng: lngCheck.value,
label: sanitizeLabel(d.label, 500),
icon_type: validateEnum(d.iconType, POI_ICON_TYPES, 'iconType').value || 'pin',
},
}
}
export function validateUpdatePoi(data) {
if (!data || typeof data !== 'object') return { valid: true, errors: [], data: {} }
const d = /** @type {Record<string, unknown>} */ (data)
const errors = []
const updates = {}
if (d.label !== undefined) updates.label = sanitizeLabel(d.label, 500)
handleField(d, 'iconType', v => validateEnum(v, POI_ICON_TYPES, 'iconType'), updates, errors, 'icon_type')
handleField(d, 'lat', v => validateNumber(v, 'lat'), updates, errors)
handleField(d, 'lng', v => validateNumber(v, 'lng'), updates, errors)
return errors.length > 0 ? { valid: false, errors } : { valid: true, errors: [], data: updates }
}

View File

@@ -20,7 +20,15 @@ export async function handleWebSocketMessage(userId, sessionId, type, data) {
case 'create-transport': {
const router = await getRouter(sessionId)
const { transport, params } = await createTransport(router)
updateLiveSession(sessionId, { transportId: transport.id, routerId: router.id })
try {
await updateLiveSession(sessionId, { transportId: transport.id, routerId: router.id })
}
catch (err) {
if (err.message === 'Session not found') {
return { error: 'Session not found' }
}
throw err
}
return { type: 'transport-created', data: params }
}
case 'connect-transport': {