bb01e9a06c
## Summary - **ADS-B & AIS:** OpenSky and AISStream OSINT feeds upsert into the CoT store; tactical tracks still arrive via adsbcot/aiscot on `:8089`. Map clients subscribe via `GET /api/cot/stream` (SSE) with viewport bbox filtering and Air / Surface / Team layer toggles. - **ALPR (Flock/OSM):** Toggleable license-plate reader layer sourced from OpenStreetMap, with SQLite cache, Overpass fallback, tiled viewport fetching, and clustered markers with direction cones. - **Map performance:** Ring-based tile selection (fixes zoom-out crash), immutable tile cache, incremental marker sync, split cluster load/query, and padded SSE bbox to reduce reconnect churn. ## Docs - `docs/tracking.md` — ADS-B/AIS accuracy tiers, freshness, self-hosted receivers, optional OSINT API keys - `docs/map-and-cameras.md` — ALPR layer and map behavior updates --------- Co-authored-by: Madison Grubb <madison@elastiflow.com> Reviewed-on: #36
71 lines
2.2 KiB
JavaScript
71 lines
2.2 KiB
JavaScript
/** SSE subscriber registry; bbox union drives OSINT feeds. */
|
|
|
|
import { getActiveEntitiesInBbox } from './cotStore.js'
|
|
import { isInBbox, matchesLayerFilter, unionBboxes } from './cotEntityUtils.js'
|
|
|
|
/** @typedef {{ west: number, south: number, east: number, north: number }} Bbox */
|
|
/** @typedef {(event: string, data: string) => Promise<void> | void} PushFn */
|
|
|
|
/** @type {Map<string, { bbox: Bbox | null, layers: Set<string>, push: PushFn }>} */
|
|
const subscribers = new Map()
|
|
let nextId = 1
|
|
|
|
/**
|
|
* @param {{ bbox: Bbox | null, layers: Set<string>, push: PushFn }} sub
|
|
* @returns {() => void} Unregister function.
|
|
*/
|
|
export function registerSubscriber(sub) {
|
|
const id = String(nextId++)
|
|
subscribers.set(id, sub)
|
|
return () => subscribers.delete(id)
|
|
}
|
|
|
|
/** @returns {Bbox | null} Union of all subscriber bboxes. */
|
|
export function getSubscriberBboxUnion() {
|
|
return unionBboxes([...subscribers.values()].map(s => s.bbox))
|
|
}
|
|
|
|
export function getSubscriberCount() {
|
|
return subscribers.size
|
|
}
|
|
|
|
export function clearSubscribers() {
|
|
subscribers.clear()
|
|
}
|
|
|
|
export async function notifySubscribersForEntity(event, payload, entity) {
|
|
const data = JSON.stringify(payload)
|
|
const tasks = []
|
|
for (const sub of subscribers.values()) {
|
|
if (sub.bbox && !isInBbox(entity, sub.bbox)) continue
|
|
if (!matchesLayerFilter(sub.layers, entity)) continue
|
|
tasks.push(Promise.resolve(sub.push(event, data)))
|
|
}
|
|
await Promise.all(tasks)
|
|
}
|
|
|
|
export async function notifySubscribersRemove(id) {
|
|
const data = JSON.stringify({ id })
|
|
await Promise.all(
|
|
[...subscribers.values()].map(sub => Promise.resolve(sub.push('remove', data))),
|
|
)
|
|
}
|
|
|
|
/**
|
|
* Push a filtered snapshot to each active SSE subscriber.
|
|
* @param {{ ttlMs?: number, osintTtlMs?: number, takFilterBbox?: boolean, maxEntities?: number }} snapshotOpts
|
|
*/
|
|
export async function broadcastSubscriberSnapshots(snapshotOpts) {
|
|
const tasks = []
|
|
for (const sub of subscribers.values()) {
|
|
tasks.push((async () => {
|
|
const entities = await getActiveEntitiesInBbox(sub.bbox, {
|
|
...snapshotOpts,
|
|
layers: sub.layers,
|
|
})
|
|
await sub.push('snapshot', JSON.stringify({ entities }))
|
|
})())
|
|
}
|
|
await Promise.all(tasks)
|
|
}
|