Files
kestrelos/server/utils/cotStore.js
T
keligrubb bb01e9a06c
Push / release (push) Successful in 13s
Push / publish (push) Successful in 1m4s
Add ADS-B, AIS, and ALPR map layers with live CoT streaming (#36)
## Summary

- **ADS-B & AIS:** OpenSky and AISStream OSINT feeds upsert into the CoT store; tactical tracks still arrive via adsbcot/aiscot on `:8089`. Map clients subscribe via `GET /api/cot/stream` (SSE) with viewport bbox filtering and Air / Surface / Team layer toggles.
- **ALPR (Flock/OSM):** Toggleable license-plate reader layer sourced from OpenStreetMap, with SQLite cache, Overpass fallback, tiled viewport fetching, and clustered markers with direction cones.
- **Map performance:** Ring-based tile selection (fixes zoom-out crash), immutable tile cache, incremental marker sync, split cluster load/query, and padded SSE bbox to reduce reconnect churn.

## Docs

- `docs/tracking.md` — ADS-B/AIS accuracy tiers, freshness, self-hosted receivers, optional OSINT API keys
- `docs/map-and-cameras.md` — ALPR layer and map behavior updates

---------

Co-authored-by: Madison Grubb <madison@elastiflow.com>
Reviewed-on: #36
2026-06-24 20:54:50 +00:00

216 lines
6.8 KiB
JavaScript

/**
* In-memory CoT store (TAK, ADS-B, AIS).
*/
import { acquire } from './asyncLock.js'
import { COT_OSINT_TTL_MS, COT_TTL_MS } from './constants.js'
import { inferSourceFromId, isInBbox, matchesLayerFilter } from './cotEntityUtils.js'
const entities = new Map()
/** @type {Set<(event: string, payload: unknown) => void>} */
const listeners = new Set()
/**
* @param {(event: string, payload: unknown) => void} fn
* @returns {() => void} Unsubscribe function.
*/
export function onCotChange(fn) {
listeners.add(fn)
return () => listeners.delete(fn)
}
function emitChange(event, payload) {
for (const fn of listeners) {
try {
fn(event, payload)
}
catch {
/* ignore listener errors */
}
}
}
function pickOptionalNumber(value, fallback) {
if (value === undefined || value === null) return fallback
const n = Number(value)
return Number.isFinite(n) ? n : fallback
}
/**
* @param {Record<string, unknown>} entity
*/
function toSnapshot(entity) {
return {
id: entity.id,
lat: entity.lat,
lng: entity.lng,
label: entity.label ?? entity.id,
type: entity.type ?? '',
source: entity.source ?? 'tak',
heading: entity.heading,
speed: entity.speed,
altitude: entity.altitude,
verticalRate: entity.verticalRate,
onGround: entity.onGround,
originCountry: entity.originCountry,
icao: entity.icao,
mmsi: entity.mmsi,
squawk: entity.squawk,
updatedAt: entity.updatedAt,
}
}
/**
* @param {Record<string, unknown>} entity
* @param {{ ttlMs?: number, osintTtlMs?: number }} opts
*/
function entityTtlMs(entity, opts) {
const source = entity.source
if (source === 'adsb' || source === 'ais') {
return opts.osintTtlMs ?? COT_OSINT_TTL_MS
}
return opts.ttlMs ?? COT_TTL_MS
}
function isEntityExpired(entity, now, opts) {
return now - entity.updatedAt > entityTtlMs(entity, opts)
}
/**
* Upsert entity by id. Input is not mutated; stored value is a new object.
* @param {{ id: string, lat: number, lng: number, label?: string, eventType?: string, type?: string, source?: string, heading?: number, speed?: number, altitude?: number }} parsed
* @param {{ silent?: boolean }} [options]
*/
export async function updateFromCot(parsed, options = {}) {
if (!parsed || typeof parsed.id !== 'string') return
const lat = Number(parsed.lat)
const lng = Number(parsed.lng)
if (!Number.isFinite(lat) || !Number.isFinite(lng)) return
let snapshot = null
await acquire(`cot-${parsed.id}`, async () => {
const now = Date.now()
const existing = entities.get(parsed.id)
const label = typeof parsed.label === 'string' ? parsed.label : (existing?.label ?? parsed.id)
const type = typeof parsed.eventType === 'string'
? parsed.eventType
: (typeof parsed.type === 'string' ? parsed.type : (existing?.type ?? ''))
const explicitSource = parsed.source
const source = explicitSource === 'adsb' || explicitSource === 'ais' || explicitSource === 'tak'
? explicitSource
: inferSourceFromId(parsed.id)
const stored = {
id: parsed.id,
lat,
lng,
label,
type,
source,
heading: pickOptionalNumber(parsed.heading, existing?.heading),
speed: pickOptionalNumber(parsed.speed, existing?.speed),
altitude: pickOptionalNumber(parsed.altitude, existing?.altitude),
verticalRate: pickOptionalNumber(parsed.verticalRate, existing?.verticalRate),
onGround: typeof parsed.onGround === 'boolean' ? parsed.onGround : existing?.onGround,
originCountry: typeof parsed.originCountry === 'string' ? parsed.originCountry : existing?.originCountry,
icao: typeof parsed.icao === 'string' ? parsed.icao : existing?.icao,
mmsi: typeof parsed.mmsi === 'string' ? parsed.mmsi : existing?.mmsi,
squawk: typeof parsed.squawk === 'string' ? parsed.squawk : existing?.squawk,
updatedAt: now,
}
entities.set(parsed.id, stored)
snapshot = toSnapshot(stored)
})
if (snapshot && !options.silent) emitChange('update', { entity: snapshot })
}
/**
* @param {number} now
* @param {{ ttlMs?: number, osintTtlMs?: number }} opts
*/
function pruneExpired(now, opts) {
const expired = []
for (const entity of entities.values()) {
if (isEntityExpired(entity, now, opts)) expired.push(entity.id)
}
for (const id of expired) {
entities.delete(id)
emitChange('remove', { id })
}
}
/**
* @param {{ ttlMs?: number, osintTtlMs?: number }} [opts]
*/
export async function pruneStaleEntities(opts = {}) {
const ttlMs = opts.ttlMs ?? COT_TTL_MS
const osintTtlMs = opts.osintTtlMs ?? COT_OSINT_TTL_MS
await acquire('cot-prune', async () => {
pruneExpired(Date.now(), { ttlMs, osintTtlMs })
})
}
/**
* @param {Record<string, unknown>} entity
* @param {{ west: number, south: number, east: number, north: number } | null | undefined} bbox
* @param {boolean} takFilterBbox
*/
function passesBboxFilter(entity, bbox, takFilterBbox) {
if (!bbox) return true
const inBox = isInBbox(entity, bbox)
if (entity.source === 'tak' && !takFilterBbox) return true
return inBox
}
/**
* Active entities (updated within ttlMs). Prunes expired. Returns new array of new objects.
* @param {{ ttlMs?: number, osintTtlMs?: number }} [opts]
*/
export async function getActiveEntities(opts = {}) {
const ttlMs = opts.ttlMs ?? COT_TTL_MS
const osintTtlMs = opts.osintTtlMs ?? COT_OSINT_TTL_MS
return acquire('cot-prune', async () => {
const now = Date.now()
pruneExpired(now, { ttlMs, osintTtlMs })
return [...entities.values()].map(toSnapshot)
})
}
/**
* Active entities filtered by viewport bbox and layer set.
* @param {{ west: number, south: number, east: number, north: number } | null} bbox
* @param {{ ttlMs?: number, osintTtlMs?: number, layers?: Set<string>, takFilterBbox?: boolean, maxEntities?: number }} [opts]
*/
export async function getActiveEntitiesInBbox(bbox, opts = {}) {
const ttlMs = opts.ttlMs ?? COT_TTL_MS
const osintTtlMs = opts.osintTtlMs ?? COT_OSINT_TTL_MS
const layers = opts.layers
const takFilterBbox = Boolean(opts.takFilterBbox)
const ttlOpts = { ttlMs, osintTtlMs }
return acquire('cot-prune', async () => {
const now = Date.now()
pruneExpired(now, ttlOpts)
const active = []
for (const entity of entities.values()) {
if (isEntityExpired(entity, now, ttlOpts)) continue
const snap = toSnapshot(entity)
if (!passesBboxFilter(snap, bbox, takFilterBbox)) continue
if (!matchesLayerFilter(layers, snap)) continue
active.push(snap)
}
const maxEntities = opts.maxEntities
if (maxEntities != null && active.length > maxEntities) {
active.sort((a, b) => (b.updatedAt ?? 0) - (a.updatedAt ?? 0))
active.length = maxEntities
}
return active
})
}
/** Clear store (tests only). */
export function clearCotStore() {
entities.clear()
}