major: kestrel is now a tak server (#6)
All checks were successful
ci/woodpecker/push/push Pipeline was successful

## Added

- CoT (Cursor on Target) server on port 8089 enabling ATAK/iTAK device connectivity
- Support for TAK stream protocol and traditional XML CoT messages
- TLS/SSL support with automatic fallback to plain TCP
- Username/password authentication for CoT connections
- Real-time device position tracking with TTL-based expiration (90s default)
- API endpoints: `/api/cot/config`, `/api/cot/server-package`, `/api/cot/truststore`, `/api/me/cot-password`
- TAK Server section in Settings with QR code for iTAK setup
- ATAK password management in Account page for OIDC users
- CoT device markers on map showing real-time positions
- Comprehensive documentation in `docs/` directory
- Environment variables: `COT_PORT`, `COT_TTL_MS`, `COT_REQUIRE_AUTH`, `COT_SSL_CERT`, `COT_SSL_KEY`, `COT_DEBUG`
- Dependencies: `fast-xml-parser`, `jszip`, `qrcode`

## Changed

- Authentication system supports CoT password management for OIDC users
- Database schema includes `cot_password_hash` field
- Test suite refactored to follow functional design principles

## Removed

- Consolidated utility modules: `authConfig.js`, `authSkipPaths.js`, `bootstrap.js`, `poiConstants.js`, `session.js`

## Security

- XML entity expansion protection in CoT parser
- Enhanced input validation and SQL injection prevention
- Authentication timeout to prevent hanging connections

## Breaking Changes

- Port 8089 must be exposed for CoT server. Update firewall rules and Docker/Kubernetes configurations.

## Migration Notes

- OIDC users must set ATAK password via Account settings before connecting
- Docker: expose port 8089 (`-p 8089:8089`)
- Kubernetes: update Helm values to expose port 8089

Co-authored-by: Madison Grubb <madison@elastiflow.com>
Reviewed-on: #6
This commit was merged in pull request #6.
This commit is contained in:
2026-02-17 16:41:41 +00:00
parent b18283d3b3
commit e61e6bc7e3
117 changed files with 5329 additions and 1040 deletions

47
server/utils/asyncLock.js Normal file
View File

@@ -0,0 +1,47 @@
/**
* Async lock utility - Promise-based mutex per key.
* Ensures only one async operation executes per key at a time.
*/
const locks = new Map()
/**
* Get or create a queue for a lock key.
* @param {string} lockKey - Lock key
* @returns {Promise<any>} Existing or new queue promise
*/
const getOrCreateQueue = (lockKey) => {
const existingQueue = locks.get(lockKey)
if (existingQueue) return existingQueue
const newQueue = Promise.resolve()
locks.set(lockKey, newQueue)
return newQueue
}
/**
* Acquire a lock for a key and execute callback.
* Only one callback per key executes at a time.
* @param {string} key - Lock key
* @param {Function} callback - Async function to execute
* @returns {Promise<any>} Result of callback
*/
export async function acquire(key, callback) {
const lockKey = String(key)
const queue = getOrCreateQueue(lockKey)
const next = queue.then(() => callback()).finally(() => {
if (locks.get(lockKey) === next) {
locks.delete(lockKey)
}
})
locks.set(lockKey, next)
return next
}
/**
* Clear all locks (for testing).
*/
export function clearLocks() {
locks.clear()
}

View File

@@ -1,5 +0,0 @@
export function getAuthConfig() {
const hasOidc = !!(process.env.OIDC_ISSUER && process.env.OIDC_CLIENT_ID && process.env.OIDC_CLIENT_SECRET)
const label = process.env.OIDC_LABEL?.trim() || (hasOidc ? 'Sign in with OIDC' : '')
return Object.freeze({ oidc: { enabled: hasOidc, label } })
}

View File

@@ -8,3 +8,26 @@ export function requireAuth(event, opts = {}) {
if (role === 'adminOrLeader' && !ROLES_ADMIN_OR_LEADER.includes(user.role)) throw createError({ statusCode: 403, message: 'Forbidden' })
return user
}
// Auth path utilities
export const SKIP_PATHS = Object.freeze([
'/api/auth/login',
'/api/auth/logout',
'/api/auth/config',
'/api/auth/oidc/authorize',
'/api/auth/oidc/callback',
])
export const PROTECTED_PATH_PREFIXES = Object.freeze([
'/api/cameras',
'/api/devices',
'/api/live',
'/api/me',
'/api/pois',
'/api/users',
])
export function skipAuth(path) {
if (path.startsWith('/api/health') || path === '/health') return true
return SKIP_PATHS.some(p => path === p || path.startsWith(p + '/'))
}

View File

@@ -1,23 +0,0 @@
/** Paths that skip auth (no session required). Do not add if any handler uses requireAuth. */
export const SKIP_PATHS = Object.freeze([
'/api/auth/login',
'/api/auth/logout',
'/api/auth/config',
'/api/auth/oidc/authorize',
'/api/auth/oidc/callback',
])
/** Path prefixes for protected routes. Used by tests to ensure they're never in SKIP_PATHS. */
export const PROTECTED_PATH_PREFIXES = Object.freeze([
'/api/cameras',
'/api/devices',
'/api/live',
'/api/me',
'/api/pois',
'/api/users',
])
export function skipAuth(path) {
if (path.startsWith('/api/health') || path === '/health') return true
return SKIP_PATHS.some(p => path === p || path.startsWith(p + '/'))
}

View File

@@ -1,26 +0,0 @@
import { randomBytes } from 'node:crypto'
import { hashPassword } from './password.js'
const PASSWORD_CHARS = Object.freeze('abcdefghjkmnopqrstuvwxyzABCDEFGHJKMNPQRSTUVWXYZ23456789')
const generateRandomPassword = () =>
Array.from(randomBytes(14), b => PASSWORD_CHARS[b % PASSWORD_CHARS.length]).join('')
export async function bootstrapAdmin(run, get) {
const row = await get('SELECT COUNT(*) as n FROM users')
if (row?.n !== 0) return
const email = process.env.BOOTSTRAP_EMAIL?.trim()
const password = process.env.BOOTSTRAP_PASSWORD
const identifier = (email && password) ? email : 'admin'
const plainPassword = (email && password) ? password : generateRandomPassword()
await run(
'INSERT INTO users (id, identifier, password_hash, role, created_at, auth_provider, oidc_issuer, oidc_sub) VALUES (?, ?, ?, ?, ?, ?, ?, ?)',
[crypto.randomUUID(), identifier, hashPassword(plainPassword), 'admin', new Date().toISOString(), 'local', null, null],
)
if (!email || !password) {
console.log(`\n[KestrelOS] No bootstrap admin configured. Default admin created. Sign in at /login with:\n\n Identifier: ${identifier}\n Password: ${plainPassword}\n\n Set BOOTSTRAP_EMAIL and BOOTSTRAP_PASSWORD to use your own credentials on first run.\n`)
}
}

30
server/utils/constants.js Normal file
View File

@@ -0,0 +1,30 @@
/**
* Application constants with environment variable support.
*/
// Timeouts (milliseconds)
export const COT_AUTH_TIMEOUT_MS = Number(process.env.COT_AUTH_TIMEOUT_MS) || 15_000
export const LIVE_SESSION_TTL_MS = Number(process.env.LIVE_SESSION_TTL_MS) || 60_000
export const COT_ENTITY_TTL_MS = Number(process.env.COT_ENTITY_TTL_MS) || 90_000
export const POLL_INTERVAL_MS = Number(process.env.POLL_INTERVAL_MS) || 1500
export const SHUTDOWN_TIMEOUT_MS = Number(process.env.SHUTDOWN_TIMEOUT_MS) || 30_000
// Ports
export const COT_PORT = Number(process.env.COT_PORT) || 8089
export const WEBSOCKET_PATH = process.env.WEBSOCKET_PATH || '/ws'
// Limits
export const MAX_PAYLOAD_BYTES = Number(process.env.MAX_PAYLOAD_BYTES) || 64 * 1024
export const MAX_STRING_LENGTH = Number(process.env.MAX_STRING_LENGTH) || 1000
export const MAX_IDENTIFIER_LENGTH = Number(process.env.MAX_IDENTIFIER_LENGTH) || 255
// Mediasoup
export const MEDIASOUP_RTC_MIN_PORT = Number(process.env.MEDIASOUP_RTC_MIN_PORT) || 40000
export const MEDIASOUP_RTC_MAX_PORT = Number(process.env.MEDIASOUP_RTC_MAX_PORT) || 49999
// Session
const [MIN_DAYS, MAX_DAYS, DEFAULT_DAYS] = [1, 365, 7]
export function getSessionMaxAgeDays() {
const raw = Number.parseInt(process.env.SESSION_MAX_AGE_DAYS ?? '', 10)
return Number.isFinite(raw) ? Math.max(MIN_DAYS, Math.min(MAX_DAYS, raw)) : DEFAULT_DAYS
}

25
server/utils/cotAuth.js Normal file
View File

@@ -0,0 +1,25 @@
import { getDb } from './db.js'
import { verifyPassword } from './password.js'
/**
* Validate CoT auth: local users use password_hash; OIDC users use cot_password_hash (ATAK password).
* @param {string} identifier - KestrelOS identifier (username)
* @param {string} password - Plain password from CoT auth
* @returns {Promise<boolean>} True if valid
*/
export async function validateCotAuth(identifier, password) {
const id = typeof identifier === 'string' ? identifier.trim() : ''
if (!id || typeof password !== 'string') return false
const { get } = await getDb()
const user = await get(
'SELECT auth_provider, password_hash, cot_password_hash FROM users WHERE identifier = ?',
[id],
)
if (!user) return false
const hash = user.auth_provider === 'local' ? user.password_hash : user.cot_password_hash
if (!hash) return false
return verifyPassword(password, hash)
}

151
server/utils/cotParser.js Normal file
View File

@@ -0,0 +1,151 @@
import { XMLParser } from 'fast-xml-parser'
import { MAX_PAYLOAD_BYTES } from './constants.js'
// CoT protocol detection constants
export const COT_FIRST_BYTE_TAK = 0xBF
export const COT_FIRST_BYTE_XML = 0x3C
/** @param {number} byte - First byte of stream. @returns {boolean} */
export function isCotFirstByte(byte) {
return byte === COT_FIRST_BYTE_TAK || byte === COT_FIRST_BYTE_XML
}
const TRADITIONAL_DELIMITER = Buffer.from('</event>', 'utf8')
/**
* @param {Buffer} buf
* @param {number} offset
* @param {number} value - Accumulated value
* @param {number} shift - Current bit shift
* @param {number} bytesRead - Bytes consumed so far
* @returns {{ value: number, bytesRead: number }} Decoded varint and bytes consumed.
*/
function readVarint(buf, offset, value = 0, shift = 0, bytesRead = 0) {
if (offset + bytesRead >= buf.length) return { value, bytesRead }
const b = buf[offset + bytesRead]
const newValue = value + ((b & 0x7F) << shift)
const newBytesRead = bytesRead + 1
if ((b & 0x80) === 0) return { value: newValue, bytesRead: newBytesRead }
const newShift = shift + 7
if (newShift > 28) return { value: 0, bytesRead: 0 }
return readVarint(buf, offset, newValue, newShift, newBytesRead)
}
/**
* TAK stream frame: 0xBF, varint length, payload.
* @param {Buffer} buf
* @returns {{ payload: Buffer, bytesConsumed: number } | null} Frame or null if incomplete/invalid.
*/
export function parseTakStreamFrame(buf) {
if (!buf || buf.length < 2 || buf[0] !== COT_FIRST_BYTE_TAK) return null
const { value: length, bytesRead } = readVarint(buf, 1)
if (length < 0 || length > MAX_PAYLOAD_BYTES) return null
const bytesConsumed = 1 + bytesRead + length
if (buf.length < bytesConsumed) return null
return { payload: buf.subarray(1 + bytesRead, bytesConsumed), bytesConsumed }
}
/**
* Traditional CoT: one XML message delimited by </event>.
* @param {Buffer} buf
* @returns {{ payload: Buffer, bytesConsumed: number } | null} Frame or null if incomplete.
*/
export function parseTraditionalXmlFrame(buf) {
if (!buf || buf.length < 8 || buf[0] !== COT_FIRST_BYTE_XML) return null
const idx = buf.indexOf(TRADITIONAL_DELIMITER)
if (idx === -1) return null
const bytesConsumed = idx + TRADITIONAL_DELIMITER.length
if (bytesConsumed > MAX_PAYLOAD_BYTES) return null
return { payload: buf.subarray(0, bytesConsumed), bytesConsumed }
}
const xmlParser = new XMLParser({
ignoreAttributes: false,
attributeNamePrefix: '@_',
parseTagValue: false,
ignoreDeclaration: true,
ignorePiTags: true,
processEntities: false, // Disable entity expansion to prevent XML bomb attacks
maxAttributes: 100,
parseAttributeValue: false,
trimValues: true,
parseTrueNumberOnly: false,
arrayMode: false,
stopNodes: [], // Could add depth limit here if needed
})
/**
* Case-insensitive key lookup in nested object.
* @returns {unknown} Found value or undefined.
*/
function findInObject(obj, key) {
if (!obj || typeof obj !== 'object') return undefined
const k = key.toLowerCase()
for (const [name, val] of Object.entries(obj)) {
if (name.toLowerCase() === k) return val
if (typeof val === 'object' && val !== null) {
const found = findInObject(val, key)
if (found !== undefined) return found
}
}
return undefined
}
/**
* Extract { username, password } from detail.auth (or __auth / credentials).
* @returns {{ username: string, password: string } | null} Credentials or null if missing/invalid.
*/
function extractAuth(parsed) {
const detail = findInObject(parsed, 'detail')
if (!detail || typeof detail !== 'object') return null
const auth = findInObject(detail, 'auth') ?? findInObject(detail, '__auth') ?? findInObject(detail, 'credentials')
if (!auth || typeof auth !== 'object') return null
const username = auth['@_username'] ?? auth['@_Username'] ?? auth.username
const password = auth['@_password'] ?? auth['@_Password'] ?? auth.password
if (typeof username !== 'string' || typeof password !== 'string' || !username.trim()) return null
return { username: username.trim(), password }
}
/**
* Parse CoT XML payload into auth or position. Does not mutate payload.
* @param {Buffer} payload - UTF-8 XML
* @returns {{ type: 'auth', username: string, password: string } | { type: 'cot', id: string, lat: number, lng: number, label: string, eventType: string } | null} Auth or position, or null.
*/
export function parseCotPayload(payload) {
if (!payload?.length) return null
const str = payload.toString('utf8').trim()
if (!str.startsWith('<')) return null
try {
const parsed = xmlParser.parse(str)
const event = findInObject(parsed, 'event')
if (!event || typeof event !== 'object') return null
const auth = extractAuth(parsed)
if (auth) return { type: 'auth', username: auth.username, password: auth.password }
const uid = String(event['@_uid'] ?? event.uid ?? '')
const eventType = String(event['@_type'] ?? event.type ?? '')
const point = findInObject(parsed, 'point') ?? findInObject(event, 'point')
const extractCoords = (pt) => {
if (!pt || typeof pt !== 'object') return { lat: Number.NaN, lng: Number.NaN }
return {
lat: Number(pt['@_lat'] ?? pt.lat),
lng: Number(pt['@_lon'] ?? pt.lon ?? pt['@_lng'] ?? pt.lng),
}
}
const { lat, lng } = extractCoords(point)
if (!Number.isFinite(lat) || !Number.isFinite(lng)) return null
const detail = findInObject(parsed, 'detail')
const contact = detail && typeof detail === 'object' ? (findInObject(detail, 'contact') ?? detail) : null
const callsign = contact && typeof contact === 'object'
? (contact['@_callsign'] ?? contact.callsign ?? contact['@_Callsign'])
: ''
const label = typeof callsign === 'string' ? callsign.trim() || uid : uid
return { type: 'cot', id: uid, lat, lng, label, eventType }
}
catch {
return null
}
}

73
server/utils/cotSsl.js Normal file
View File

@@ -0,0 +1,73 @@
import { existsSync, readFileSync, unlinkSync } from 'node:fs'
import { join, dirname } from 'node:path'
import { tmpdir } from 'node:os'
import { execSync } from 'node:child_process'
import { fileURLToPath } from 'node:url'
const __dirname = dirname(fileURLToPath(import.meta.url))
/** Default password for the CoT trust store (document in atak-itak.md). */
export const TRUSTSTORE_PASSWORD = 'kestrelos'
/** Default CoT server port. */
export const DEFAULT_COT_PORT = 8089
/**
* CoT port from env or default.
* @returns {number} Port number (COT_PORT env or DEFAULT_COT_PORT).
*/
export function getCotPort() {
return Number(process.env.COT_PORT ?? DEFAULT_COT_PORT)
}
/** Message when an endpoint requires TLS but server is not using it. */
export const COT_TLS_REQUIRED_MESSAGE = 'Only available when the server runs with SSL (e.g. .dev-certs or COT_SSL_*).'
/**
* Resolve CoT server TLS cert and key paths (for plugin and API).
* @param {{ cotSslCert?: string, cotSslKey?: string }} [config] - Runtime config (optional)
* @returns {{ certPath: string, keyPath: string } | null} Paths when TLS is configured, else null.
*/
export function getCotSslPaths(config = {}) {
if (process.env.COT_SSL_CERT && process.env.COT_SSL_KEY) {
return { certPath: process.env.COT_SSL_CERT, keyPath: process.env.COT_SSL_KEY }
}
if (config.cotSslCert && config.cotSslKey) {
return { certPath: config.cotSslCert, keyPath: config.cotSslKey }
}
const candidates = [
join(process.cwd(), '.dev-certs', 'cert.pem'),
join(__dirname, '../../.dev-certs', 'cert.pem'),
]
for (const certPath of candidates) {
const keyPath = certPath.replace('cert.pem', 'key.pem')
if (existsSync(certPath) && existsSync(keyPath)) {
return { certPath, keyPath }
}
}
return null
}
/**
* Build a P12 trust store from a PEM cert path (for truststore download and server package).
* @param {string} certPath - Path to cert.pem
* @param {string} password - P12 password
* @returns {Buffer} P12 buffer
* @throws {Error} If openssl fails
*/
export function buildP12FromCertPath(certPath, password) {
const outPath = join(tmpdir(), `kestrelos-cot-p12-${Date.now()}.p12`)
try {
execSync(
`openssl pkcs12 -export -nokeys -in "${certPath}" -out "${outPath}" -passout pass:${password}`,
{ stdio: 'pipe' },
)
const p12 = readFileSync(outPath)
unlinkSync(outPath)
return p12
}
catch (err) {
if (existsSync(outPath)) unlinkSync(outPath)
throw err
}
}

71
server/utils/cotStore.js Normal file
View File

@@ -0,0 +1,71 @@
/**
* In-memory CoT entity store: upsert by id, prune on read by TTL.
* Single source of truth; getActiveEntities returns new objects (no mutation of returned refs).
*/
import { acquire } from './asyncLock.js'
import { COT_ENTITY_TTL_MS } from './constants.js'
const entities = new Map()
/**
* Upsert entity by id. Input is not mutated; stored value is a new object.
* @param {{ id: string, lat: number, lng: number, label?: string, eventType?: string, type?: string }} parsed
*/
export async function updateFromCot(parsed) {
if (!parsed || typeof parsed.id !== 'string') return
const lat = Number(parsed.lat)
const lng = Number(parsed.lng)
if (!Number.isFinite(lat) || !Number.isFinite(lng)) return
await acquire(`cot-${parsed.id}`, async () => {
const now = Date.now()
const existing = entities.get(parsed.id)
const label = typeof parsed.label === 'string' ? parsed.label : (existing?.label ?? parsed.id)
const type = typeof parsed.eventType === 'string' ? parsed.eventType : (typeof parsed.type === 'string' ? parsed.type : (existing?.type ?? ''))
entities.set(parsed.id, {
id: parsed.id,
lat,
lng,
label,
type,
updatedAt: now,
})
})
}
/**
* Active entities (updated within ttlMs). Prunes expired. Returns new array of new objects.
* @param {number} [ttlMs]
* @returns {Promise<Array<{ id: string, lat: number, lng: number, label: string, type: string, updatedAt: number }>>} Snapshot of active entities.
*/
export async function getActiveEntities(ttlMs = COT_ENTITY_TTL_MS) {
return acquire('cot-prune', async () => {
const now = Date.now()
const active = []
const expired = []
for (const entity of entities.values()) {
if (now - entity.updatedAt <= ttlMs) {
active.push({
id: entity.id,
lat: entity.lat,
lng: entity.lng,
label: entity.label ?? entity.id,
type: entity.type ?? '',
updatedAt: entity.updatedAt,
})
}
else {
expired.push(entity.id)
}
}
for (const id of expired) entities.delete(id)
return active
})
}
/** Clear store (tests only). */
export function clearCotStore() {
entities.clear()
}

View File

@@ -2,12 +2,15 @@ import { join, dirname } from 'node:path'
import { mkdirSync, existsSync } from 'node:fs'
import { createRequire } from 'node:module'
import { promisify } from 'node:util'
import { bootstrapAdmin } from './bootstrap.js'
import { randomBytes } from 'node:crypto'
import { hashPassword } from './password.js'
import { registerCleanup } from './shutdown.js'
const require = createRequire(import.meta.url)
const sqlite3 = require('sqlite3')
// Resolve from project root so bundled server (e.g. .output) finds node_modules/sqlite3
const requireFromRoot = createRequire(join(process.cwd(), 'package.json'))
const sqlite3 = requireFromRoot('sqlite3')
const SCHEMA_VERSION = 3
const SCHEMA_VERSION = 4
const DB_BUSY_TIMEOUT_MS = 5000
let dbInstance = null
@@ -111,6 +114,12 @@ const migrateToV3 = async (run, all) => {
await run('ALTER TABLE users ADD COLUMN avatar_path TEXT')
}
const migrateToV4 = async (run, all) => {
const info = await all('PRAGMA table_info(users)')
if (info.some(c => c.name === 'cot_password_hash')) return
await run('ALTER TABLE users ADD COLUMN cot_password_hash TEXT')
}
const runMigrations = async (run, all, get) => {
const version = await getSchemaVersion(get)
if (version >= SCHEMA_VERSION) return
@@ -122,6 +131,10 @@ const runMigrations = async (run, all, get) => {
await migrateToV3(run, all)
await setSchemaVersion(run, 3)
}
if (version < 4) {
await migrateToV4(run, all)
await setSchemaVersion(run, 4)
}
}
const initDb = async (db, run, all, get) => {
@@ -140,7 +153,29 @@ const initDb = async (db, run, all, get) => {
await run(SCHEMA.pois)
await run(SCHEMA.devices)
if (!testPath) await bootstrapAdmin(run, get)
if (!testPath) {
// Bootstrap admin user on first run
const PASSWORD_CHARS = Object.freeze('abcdefghjkmnopqrstuvwxyzABCDEFGHJKMNPQRSTUVWXYZ23456789')
const generateRandomPassword = () =>
Array.from(randomBytes(14), b => PASSWORD_CHARS[b % PASSWORD_CHARS.length]).join('')
const row = await get('SELECT COUNT(*) as n FROM users')
if (row?.n === 0) {
const email = process.env.BOOTSTRAP_EMAIL?.trim()
const password = process.env.BOOTSTRAP_PASSWORD
const identifier = (email && password) ? email : 'admin'
const plainPassword = (email && password) ? password : generateRandomPassword()
await run(
'INSERT INTO users (id, identifier, password_hash, role, created_at, auth_provider, oidc_issuer, oidc_sub) VALUES (?, ?, ?, ?, ?, ?, ?, ?)',
[crypto.randomUUID(), identifier, hashPassword(plainPassword), 'admin', new Date().toISOString(), 'local', null, null],
)
if (!email || !password) {
console.log(`\n[KestrelOS] No bootstrap admin configured. Default admin created. Sign in at /login with:\n\n Identifier: ${identifier}\n Password: ${plainPassword}\n\n Set BOOTSTRAP_EMAIL and BOOTSTRAP_PASSWORD to use your own credentials on first run.\n`)
}
}
}
}
export async function getDb() {
@@ -167,9 +202,91 @@ export async function getDb() {
}
dbInstance = { db, run, all, get }
registerCleanup(async () => {
if (dbInstance) {
try {
await new Promise((resolve, reject) => {
dbInstance.db.close((err) => {
if (err) reject(err)
else resolve()
})
})
}
catch (error) {
console.error('[db] Error closing database during shutdown:', error?.message)
}
dbInstance = null
}
})
return dbInstance
}
/**
* Health check for database connection.
* @returns {Promise<{ healthy: boolean, error?: string }>} Health status
*/
export async function healthCheck() {
try {
const db = await getDb()
await db.get('SELECT 1')
return { healthy: true }
}
catch (error) {
return {
healthy: false,
error: error?.message || String(error),
}
}
}
/**
* Database connection model documentation:
*
* KestrelOS uses SQLite with WAL (Write-Ahead Logging) mode for concurrent access.
* - Single connection instance shared across all requests (singleton pattern)
* - WAL mode allows multiple readers and one writer concurrently
* - Connection is initialized on first getDb() call and reused thereafter
* - Busy timeout is set to 5000ms to handle concurrent access gracefully
* - Transactions are supported via withTransaction() helper
*
* Concurrency considerations:
* - SQLite with WAL handles concurrent reads efficiently
* - Writes are serialized (one at a time)
* - For high write loads, consider migrating to PostgreSQL
* - Current model is suitable for moderate traffic (< 100 req/sec)
*
* Connection lifecycle:
* - Created on first getDb() call
* - Persists for application lifetime
* - Closed during graceful shutdown
* - Test path can be set via setDbPathForTest() for testing
*/
/**
* Execute a callback within a database transaction.
* Automatically commits on success or rolls back on error.
* @param {object} db - Database instance from getDb()
* @param {Function} callback - Async function receiving { run, all, get }
* @returns {Promise<any>} Result of callback
*/
export async function withTransaction(db, callback) {
const { run } = db
await run('BEGIN TRANSACTION')
try {
const result = await callback(db)
await run('COMMIT')
return result
}
catch (error) {
await run('ROLLBACK').catch(() => {
// Ignore rollback errors
})
throw error
}
}
export function closeDb() {
if (!dbInstance) return
try {

View File

@@ -1,47 +1,79 @@
import { closeRouter, getProducer, getTransport } from './mediasoup.js'
import { acquire } from './asyncLock.js'
import { LIVE_SESSION_TTL_MS } from './constants.js'
const TTL_MS = 60_000
const sessions = new Map()
export const createSession = (userId, label = '') => {
const id = crypto.randomUUID()
const session = {
id,
userId,
label: (label || 'Live').trim() || 'Live',
lat: 0,
lng: 0,
updatedAt: Date.now(),
routerId: null,
producerId: null,
transportId: null,
}
sessions.set(id, session)
return session
export const createSession = async (userId, label = '') => {
return acquire(`session-create-${userId}`, async () => {
const id = crypto.randomUUID()
const session = {
id,
userId,
label: (label || 'Live').trim() || 'Live',
lat: 0,
lng: 0,
updatedAt: Date.now(),
routerId: null,
producerId: null,
transportId: null,
}
sessions.set(id, session)
return session
})
}
/**
* Atomically get existing active session or create new one for user.
* @param {string} userId - User ID
* @param {string} label - Session label
* @returns {Promise<object>} Session object
*/
export const getOrCreateSession = async (userId, label = '') => {
return acquire(`session-get-or-create-${userId}`, async () => {
const now = Date.now()
for (const s of sessions.values()) {
if (s.userId === userId && now - s.updatedAt <= LIVE_SESSION_TTL_MS) {
return s
}
}
return await createSession(userId, label)
})
}
export const getLiveSession = id => sessions.get(id)
export const getActiveSessionByUserId = (userId) => {
const now = Date.now()
for (const s of sessions.values()) {
if (s.userId === userId && now - s.updatedAt <= TTL_MS) return s
}
export const getActiveSessionByUserId = async (userId) => {
return acquire(`session-get-${userId}`, async () => {
const now = Date.now()
for (const s of sessions.values()) {
if (s.userId === userId && now - s.updatedAt <= LIVE_SESSION_TTL_MS) return s
}
})
}
export const updateLiveSession = (id, updates) => {
const session = sessions.get(id)
if (!session) return
const now = Date.now()
if (Number.isFinite(updates.lat)) session.lat = updates.lat
if (Number.isFinite(updates.lng)) session.lng = updates.lng
if (updates.routerId !== undefined) session.routerId = updates.routerId
if (updates.producerId !== undefined) session.producerId = updates.producerId
if (updates.transportId !== undefined) session.transportId = updates.transportId
session.updatedAt = now
export const updateLiveSession = async (id, updates) => {
return acquire(`session-update-${id}`, async () => {
const session = sessions.get(id)
if (!session) {
throw new Error('Session not found')
}
const now = Date.now()
if (Number.isFinite(updates.lat)) session.lat = updates.lat
if (Number.isFinite(updates.lng)) session.lng = updates.lng
if (updates.routerId !== undefined) session.routerId = updates.routerId
if (updates.producerId !== undefined) session.producerId = updates.producerId
if (updates.transportId !== undefined) session.transportId = updates.transportId
session.updatedAt = now
return session
})
}
export const deleteLiveSession = id => sessions.delete(id)
export const deleteLiveSession = async (id) => {
await acquire(`session-delete-${id}`, async () => {
sessions.delete(id)
})
}
export const clearSessions = () => sessions.clear()
@@ -62,31 +94,33 @@ const cleanupSession = async (session) => {
}
export const getActiveSessions = async () => {
const now = Date.now()
const active = []
const expired = []
return acquire('get-active-sessions', async () => {
const now = Date.now()
const active = []
const expired = []
for (const session of sessions.values()) {
if (now - session.updatedAt <= TTL_MS) {
active.push({
id: session.id,
userId: session.userId,
label: session.label,
lat: session.lat,
lng: session.lng,
updatedAt: session.updatedAt,
hasStream: Boolean(session.producerId),
})
for (const session of sessions.values()) {
if (now - session.updatedAt <= LIVE_SESSION_TTL_MS) {
active.push({
id: session.id,
userId: session.userId,
label: session.label,
lat: session.lat,
lng: session.lng,
updatedAt: session.updatedAt,
hasStream: Boolean(session.producerId),
})
}
else {
expired.push(session)
}
}
else {
expired.push(session)
for (const session of expired) {
await cleanupSession(session)
sessions.delete(session.id)
}
}
for (const session of expired) {
await cleanupSession(session)
sessions.delete(session.id)
}
return active
return active
})
}

84
server/utils/logger.js Normal file
View File

@@ -0,0 +1,84 @@
/**
* Structured logger with request context support.
* Uses AsyncLocalStorage to provide request-scoped context that's automatically isolated per async context.
*/
import { AsyncLocalStorage } from 'node:async_hooks'
const asyncLocalStorage = new AsyncLocalStorage()
/**
* Run a function with logger context. Context is automatically isolated per async execution.
* @param {string} reqId - Request ID
* @param {string|null} uId - User ID (optional)
* @param {Function} fn - Function to run with context
* @returns {Promise<any>} Result of the function
*/
export function runWithContext(reqId, uId, fn) {
return asyncLocalStorage.run({ requestId: reqId, userId: uId }, fn)
}
/**
* Set context for the current async context. Use runWithContext() instead for proper isolation.
* @deprecated Use runWithContext() instead for proper async context isolation
* @param {string} reqId - Request ID
* @param {string|null} uId - User ID (optional)
*/
export function setContext(reqId, uId = null) {
const store = asyncLocalStorage.getStore()
if (store) {
store.requestId = reqId
store.userId = uId
}
}
/**
* Clear context for the current async context.
* @deprecated Context is automatically cleared when async context ends. Use runWithContext() instead.
*/
export function clearContext() {
const store = asyncLocalStorage.getStore()
if (store) {
store.requestId = null
store.userId = null
}
}
function getContext() {
return asyncLocalStorage.getStore() || { requestId: null, userId: null }
}
function formatMessage(level, message, context = {}) {
const { requestId, userId } = getContext()
const timestamp = new Date().toISOString()
const ctx = {
timestamp,
level,
requestId,
...(userId && { userId }),
...context,
}
return `[${level.toUpperCase()}] ${JSON.stringify({ message, ...ctx })}`
}
export function info(message, context = {}) {
console.log(formatMessage('info', message, context))
}
export function error(message, context = {}) {
const ctx = { ...context }
if (context.error && context.error.stack) {
ctx.stack = context.error.stack
}
console.error(formatMessage('error', message, ctx))
}
export function warn(message, context = {}) {
console.warn(formatMessage('warn', message, context))
}
export function debug(message, context = {}) {
if (process.env.NODE_ENV === 'development') {
console.debug(formatMessage('debug', message, context))
}
}

View File

@@ -1,5 +1,7 @@
import os from 'node:os'
import mediasoup from 'mediasoup'
import { acquire } from './asyncLock.js'
import { MEDIASOUP_RTC_MIN_PORT, MEDIASOUP_RTC_MAX_PORT } from './constants.js'
let worker = null
const routers = new Map()
@@ -17,22 +19,25 @@ export const getWorker = async () => {
worker = await mediasoup.createWorker({
logLevel: process.env.NODE_ENV === 'development' ? 'debug' : 'warn',
logTags: ['info', 'ice', 'dtls', 'rtp', 'srtp', 'rtcp'],
rtcMinPort: 40000,
rtcMaxPort: 49999,
rtcMinPort: MEDIASOUP_RTC_MIN_PORT,
rtcMaxPort: MEDIASOUP_RTC_MAX_PORT,
})
worker.on('died', () => {
console.error('[mediasoup] Worker died, exiting')
process.exit(1)
worker.on('died', async (error) => {
console.error('[mediasoup] Worker died:', error?.message || String(error))
const { graceful } = await import('./shutdown.js')
await graceful(error || new Error('Mediasoup worker died'))
})
return worker
}
export const getRouter = async (sessionId) => {
const existing = routers.get(sessionId)
if (existing) return existing
const router = await (await getWorker()).createRouter({ mediaCodecs: MEDIA_CODECS })
routers.set(sessionId, router)
return router
return acquire(`router-${sessionId}`, async () => {
const existing = routers.get(sessionId)
if (existing) return existing
const router = await (await getWorker()).createRouter({ mediaCodecs: MEDIA_CODECS })
routers.set(sessionId, router)
return router
})
}
const isIPv4 = (host) => {
@@ -64,34 +69,36 @@ const resolveAnnouncedIp = (requestHost) => {
}
export const createTransport = async (router, requestHost = undefined) => {
const announcedIp = resolveAnnouncedIp(requestHost)
const listenIps = announcedIp
? [{ ip: '0.0.0.0', announcedIp }, { ip: '127.0.0.1' }]
: [{ ip: '127.0.0.1' }]
return acquire(`transport-${router.id}`, async () => {
const announcedIp = resolveAnnouncedIp(requestHost)
const listenIps = announcedIp
? [{ ip: '0.0.0.0', announcedIp }, { ip: '127.0.0.1' }]
: [{ ip: '127.0.0.1' }]
const transport = await router.createWebRtcTransport({
listenIps,
enableUdp: true,
enableTcp: true,
preferUdp: true,
initialAvailableOutgoingBitrate: 1_000_000,
}).catch((err) => {
console.error('[mediasoup] Transport creation failed:', err)
throw new Error(`Failed to create transport: ${err.message || String(err)}`)
const transport = await router.createWebRtcTransport({
listenIps,
enableUdp: true,
enableTcp: true,
preferUdp: true,
initialAvailableOutgoingBitrate: 1_000_000,
}).catch((err) => {
console.error('[mediasoup] Transport creation failed:', err)
throw new Error(`Failed to create transport: ${err.message || String(err)}`)
})
transports.set(transport.id, transport)
transport.on('close', () => transports.delete(transport.id))
return {
transport,
params: {
id: transport.id,
iceParameters: transport.iceParameters,
iceCandidates: transport.iceCandidates,
dtlsParameters: transport.dtlsParameters,
},
}
})
transports.set(transport.id, transport)
transport.on('close', () => transports.delete(transport.id))
return {
transport,
params: {
id: transport.id,
iceParameters: transport.iceParameters,
iceCandidates: transport.iceCandidates,
dtlsParameters: transport.dtlsParameters,
},
}
}
export const getTransport = transportId => transports.get(transportId)

View File

@@ -3,6 +3,13 @@ import * as oidc from 'openid-client'
const CACHE_TTL_MS = 60 * 60 * 1000
const configCache = new Map()
// Auth configuration
export function getAuthConfig() {
const hasOidc = !!(process.env.OIDC_ISSUER && process.env.OIDC_CLIENT_ID && process.env.OIDC_CLIENT_SECRET)
const label = process.env.OIDC_LABEL?.trim() || (hasOidc ? 'Sign in with OIDC' : '')
return Object.freeze({ oidc: { enabled: hasOidc, label } })
}
function getRedirectUri() {
const explicit
= process.env.OIDC_REDIRECT_URI ?? process.env.OPENID_REDIRECT_URI ?? ''

View File

@@ -1 +0,0 @@
export const POI_ICON_TYPES = Object.freeze(['pin', 'flag', 'waypoint'])

View File

@@ -0,0 +1,28 @@
/**
* Query builder for safe dynamic UPDATE queries with column whitelist validation.
* Prevents SQL injection by validating column names against allowed sets.
*/
const ALLOWED_COLUMNS = {
devices: new Set(['name', 'device_type', 'vendor', 'lat', 'lng', 'stream_url', 'source_type', 'config']),
users: new Set(['role', 'identifier', 'password_hash']),
pois: new Set(['label', 'icon_type', 'lat', 'lng']),
}
export function buildUpdateQuery(table, allowedColumns, updates) {
if (!ALLOWED_COLUMNS[table]) throw new Error(`Unknown table: ${table}`)
const columns = allowedColumns || ALLOWED_COLUMNS[table]
const clauses = []
const params = []
for (const [column, value] of Object.entries(updates)) {
if (!columns.has(column)) throw new Error(`Invalid column: ${column} for table: ${table}`)
clauses.push(`${column} = ?`)
params.push(value)
}
if (clauses.length === 0) return { query: '', params: [] }
return { query: `UPDATE ${table} SET ${clauses.join(', ')} WHERE id = ?`, params }
}
export function getAllowedColumns(table) {
return ALLOWED_COLUMNS[table] || new Set()
}

View File

@@ -1,6 +0,0 @@
const [MIN_DAYS, MAX_DAYS, DEFAULT_DAYS] = [1, 365, 7]
export function getSessionMaxAgeDays() {
const raw = Number.parseInt(process.env.SESSION_MAX_AGE_DAYS ?? '', 10)
return Number.isFinite(raw) ? Math.max(MIN_DAYS, Math.min(MAX_DAYS, raw)) : DEFAULT_DAYS
}

78
server/utils/shutdown.js Normal file
View File

@@ -0,0 +1,78 @@
/**
* Graceful shutdown handler - registers cleanup functions and handles shutdown signals.
*/
import { SHUTDOWN_TIMEOUT_MS } from './constants.js'
const cleanupFunctions = []
const shutdownState = {
isShuttingDown: false,
}
export function clearCleanup() {
cleanupFunctions.length = 0
shutdownState.isShuttingDown = false
}
export function registerCleanup(fn) {
if (typeof fn !== 'function') throw new TypeError('Cleanup function must be a function')
cleanupFunctions.push(fn)
}
const executeCleanupFunction = async (fn, index) => {
try {
await fn()
}
catch (error) {
console.error(`[shutdown] Cleanup function ${index} failed:`, error?.message || String(error))
}
}
const executeCleanupReverse = async (functions, index = functions.length - 1) => {
if (index < 0) return
await executeCleanupFunction(functions[index], index)
return executeCleanupReverse(functions, index - 1)
}
async function executeCleanup() {
if (shutdownState.isShuttingDown) return
shutdownState.isShuttingDown = true
await executeCleanupReverse(cleanupFunctions)
}
export async function graceful(error) {
if (error) {
console.error('[shutdown] Shutting down due to error:', error?.message || String(error))
if (error.stack) console.error('[shutdown] Stack trace:', error.stack)
}
else {
console.log('[shutdown] Initiating graceful shutdown')
}
const timeout = setTimeout(() => {
console.error('[shutdown] Shutdown timeout exceeded, forcing exit')
process.exit(1)
}, SHUTDOWN_TIMEOUT_MS)
try {
await executeCleanup()
clearTimeout(timeout)
console.log('[shutdown] Cleanup complete')
process.exit(error ? 1 : 0)
}
catch (err) {
clearTimeout(timeout)
console.error('[shutdown] Error during cleanup:', err?.message || String(err))
process.exit(1)
}
}
export function initShutdownHandlers() {
for (const signal of ['SIGTERM', 'SIGINT']) {
process.on(signal, () => {
console.log(`[shutdown] Received ${signal}`)
graceful().catch((err) => {
console.error('[shutdown] Error in graceful shutdown:', err)
process.exit(1)
})
})
}
}

150
server/utils/validation.js Normal file
View File

@@ -0,0 +1,150 @@
/**
* Validation and sanitization utilities - pure functions for consistent input validation and cleaning.
*/
import { MAX_IDENTIFIER_LENGTH, MAX_STRING_LENGTH } from './constants.js'
import { DEVICE_TYPES, SOURCE_TYPES } from './deviceUtils.js'
// Constants
export const POI_ICON_TYPES = Object.freeze(['pin', 'flag', 'waypoint'])
// Sanitization functions
const IDENTIFIER_REGEX = /^\w+$/
export function sanitizeString(str, maxLength = MAX_STRING_LENGTH) {
if (typeof str !== 'string') return ''
const trimmed = str.trim()
return trimmed.length > maxLength ? trimmed.slice(0, maxLength) : trimmed
}
export function sanitizeIdentifier(str) {
if (typeof str !== 'string') return ''
const trimmed = str.trim()
if (trimmed.length === 0 || trimmed.length > MAX_IDENTIFIER_LENGTH) return ''
return IDENTIFIER_REGEX.test(trimmed) ? trimmed : ''
}
export function sanitizeLabel(str, maxLength = MAX_STRING_LENGTH) {
return sanitizeString(str, maxLength)
}
const ROLES = ['admin', 'leader', 'member']
const validateNumber = (value, field) => {
const num = Number(value)
return Number.isFinite(num) ? { valid: true, value: num } : { valid: false, error: `${field} must be a finite number` }
}
const validateEnum = (value, allowed, field) => allowed.includes(value) ? { valid: true, value } : { valid: false, error: `Invalid ${field}` }
const handleField = (d, field, handler, updates, errors, outputField = null) => {
if (d[field] !== undefined) {
const result = handler(d[field])
if (result.valid) updates[outputField || field] = result.value
else errors.push(result.error)
}
}
export function validateDevice(data) {
if (!data || typeof data !== 'object') return { valid: false, errors: ['body required'] }
const d = /** @type {Record<string, unknown>} */ (data)
const errors = []
const latCheck = validateNumber(d.lat, 'lat')
const lngCheck = validateNumber(d.lng, 'lng')
if (!latCheck.valid || !lngCheck.valid) errors.push('lat and lng required as finite numbers')
if (errors.length > 0) return { valid: false, errors }
return {
valid: true,
errors: [],
data: {
name: sanitizeString(d.name, 1000),
device_type: validateEnum(d.device_type, DEVICE_TYPES, 'device_type').value || 'feed',
vendor: d.vendor !== undefined ? sanitizeString(d.vendor, 255) : null,
lat: latCheck.value,
lng: lngCheck.value,
stream_url: typeof d.stream_url === 'string' ? sanitizeString(d.stream_url, 2000) : '',
source_type: validateEnum(d.source_type, SOURCE_TYPES, 'source_type').value || 'mjpeg',
config: d.config == null ? null : (typeof d.config === 'string' ? d.config : JSON.stringify(d.config)),
},
}
}
export function validateUpdateDevice(data) {
if (!data || typeof data !== 'object') return { valid: true, errors: [], data: {} }
const d = /** @type {Record<string, unknown>} */ (data)
const errors = []
const updates = {}
if (d.name !== undefined) updates.name = sanitizeString(d.name, 1000)
handleField(d, 'device_type', v => validateEnum(v, DEVICE_TYPES, 'device_type'), updates, errors)
if (d.vendor !== undefined) updates.vendor = d.vendor === null || d.vendor === '' ? null : sanitizeString(d.vendor, 255)
handleField(d, 'lat', v => validateNumber(v, 'lat'), updates, errors)
handleField(d, 'lng', v => validateNumber(v, 'lng'), updates, errors)
if (d.stream_url !== undefined) updates.stream_url = sanitizeString(d.stream_url, 2000)
handleField(d, 'source_type', v => validateEnum(v, SOURCE_TYPES, 'source_type'), updates, errors)
if (d.config !== undefined) updates.config = d.config === null ? null : (typeof d.config === 'string' ? d.config : JSON.stringify(d.config))
return errors.length > 0 ? { valid: false, errors } : { valid: true, errors: [], data: updates }
}
export function validateUser(data) {
if (!data || typeof data !== 'object') return { valid: false, errors: ['body required'] }
const d = /** @type {Record<string, unknown>} */ (data)
const errors = []
const identifier = sanitizeIdentifier(d.identifier)
const password = typeof d.password === 'string' ? d.password : ''
const role = typeof d.role === 'string' ? d.role : ''
if (!identifier) errors.push('identifier required')
if (!password) errors.push('password required')
if (!role || !ROLES.includes(role)) errors.push('role must be admin, leader, or member')
return errors.length > 0 ? { valid: false, errors } : { valid: true, errors: [], data: { identifier, password, role: role || 'member' } }
}
export function validateUpdateUser(data) {
if (!data || typeof data !== 'object') return { valid: true, errors: [], data: {} }
const d = /** @type {Record<string, unknown>} */ (data)
const errors = []
const updates = {}
if (d.role !== undefined) {
if (ROLES.includes(d.role)) updates.role = d.role
else errors.push('role must be admin, leader, or member')
}
if (d.identifier !== undefined) {
const identifier = sanitizeIdentifier(d.identifier)
if (!identifier) errors.push('identifier cannot be empty')
else updates.identifier = identifier
}
if (d.password !== undefined && d.password !== '') {
if (typeof d.password !== 'string' || !d.password) errors.push('password cannot be empty')
else updates.password = d.password
}
return errors.length > 0 ? { valid: false, errors } : { valid: true, errors: [], data: updates }
}
export function validatePoi(data) {
if (!data || typeof data !== 'object') return { valid: false, errors: ['body required'] }
const d = /** @type {Record<string, unknown>} */ (data)
const latCheck = validateNumber(d.lat, 'lat')
const lngCheck = validateNumber(d.lng, 'lng')
if (!latCheck.valid || !lngCheck.valid) return { valid: false, errors: ['lat and lng required as finite numbers'] }
return {
valid: true,
errors: [],
data: {
lat: latCheck.value,
lng: lngCheck.value,
label: sanitizeLabel(d.label, 500),
icon_type: validateEnum(d.iconType, POI_ICON_TYPES, 'iconType').value || 'pin',
},
}
}
export function validateUpdatePoi(data) {
if (!data || typeof data !== 'object') return { valid: true, errors: [], data: {} }
const d = /** @type {Record<string, unknown>} */ (data)
const errors = []
const updates = {}
if (d.label !== undefined) updates.label = sanitizeLabel(d.label, 500)
handleField(d, 'iconType', v => validateEnum(v, POI_ICON_TYPES, 'iconType'), updates, errors, 'icon_type')
handleField(d, 'lat', v => validateNumber(v, 'lat'), updates, errors)
handleField(d, 'lng', v => validateNumber(v, 'lng'), updates, errors)
return errors.length > 0 ? { valid: false, errors } : { valid: true, errors: [], data: updates }
}

View File

@@ -20,7 +20,15 @@ export async function handleWebSocketMessage(userId, sessionId, type, data) {
case 'create-transport': {
const router = await getRouter(sessionId)
const { transport, params } = await createTransport(router)
updateLiveSession(sessionId, { transportId: transport.id, routerId: router.id })
try {
await updateLiveSession(sessionId, { transportId: transport.id, routerId: router.id })
}
catch (err) {
if (err.message === 'Session not found') {
return { error: 'Session not found' }
}
throw err
}
return { type: 'transport-created', data: params }
}
case 'connect-transport': {