This commit is contained in:
@@ -9,8 +9,10 @@ import { registerCleanup } from '../utils/shutdown.js'
|
||||
import { COT_AUTH_TIMEOUT_MS } from '../utils/constants.js'
|
||||
import { acquire } from '../utils/asyncLock.js'
|
||||
|
||||
let tcpServer = null
|
||||
let tlsServer = null
|
||||
const serverState = {
|
||||
tcpServer: null,
|
||||
tlsServer: null,
|
||||
}
|
||||
const relaySet = new Set()
|
||||
const allSockets = new Set()
|
||||
const socketBuffers = new WeakMap()
|
||||
@@ -44,22 +46,27 @@ function broadcast(senderSocket, rawMessage) {
|
||||
}
|
||||
}
|
||||
|
||||
const createPreview = (payload) => {
|
||||
try {
|
||||
const str = payload.toString('utf8')
|
||||
if (str.startsWith('<')) {
|
||||
const s = str.length <= 120 ? str : str.slice(0, 120) + '...'
|
||||
// eslint-disable-next-line no-control-regex -- sanitize control chars for log preview
|
||||
return s.replace(/[\u0000-\u0008\v\f\u000E-\u001F]/g, '.')
|
||||
}
|
||||
return 'hex:' + payload.subarray(0, Math.min(40, payload.length)).toString('hex')
|
||||
}
|
||||
catch {
|
||||
return 'hex:' + payload.subarray(0, Math.min(40, payload.length)).toString('hex')
|
||||
}
|
||||
}
|
||||
|
||||
async function processFrame(socket, rawMessage, payload, authenticated) {
|
||||
const requireAuth = socket._cotRequireAuth !== false
|
||||
const debug = socket._cotDebug === true
|
||||
const parsed = parseCotPayload(payload)
|
||||
if (debug) {
|
||||
let preview = payload.length
|
||||
try {
|
||||
const str = payload.toString('utf8')
|
||||
if (str.startsWith('<')) {
|
||||
const s = str.length <= 120 ? str : str.slice(0, 120) + '...'
|
||||
// eslint-disable-next-line no-control-regex -- sanitize control chars for log preview
|
||||
preview = s.replace(/[\u0000-\u0008\v\f\u000E-\u001F]/g, '.')
|
||||
}
|
||||
else preview = 'hex:' + payload.subarray(0, Math.min(40, payload.length)).toString('hex')
|
||||
}
|
||||
catch { preview = 'hex:' + payload.subarray(0, Math.min(40, payload.length)).toString('hex') }
|
||||
const preview = createPreview(payload)
|
||||
console.log('[cot] payload length:', payload.length, 'parsed:', parsed ? parsed.type : null, 'preview:', preview)
|
||||
}
|
||||
if (!parsed) return
|
||||
@@ -108,10 +115,35 @@ async function processFrame(socket, rawMessage, payload, authenticated) {
|
||||
}
|
||||
}
|
||||
|
||||
const parseFrame = (buf) => {
|
||||
const takResult = parseTakStreamFrame(buf)
|
||||
if (takResult) return { result: takResult, frameType: 'tak' }
|
||||
if (buf[0] === 0x3C) {
|
||||
const xmlResult = parseTraditionalXmlFrame(buf)
|
||||
if (xmlResult) return { result: xmlResult, frameType: 'traditional' }
|
||||
}
|
||||
return { result: null, frameType: null }
|
||||
}
|
||||
|
||||
const processBufferedData = async (socket, buf, authenticated) => {
|
||||
if (buf.length === 0) return buf
|
||||
const { result, frameType } = parseFrame(buf)
|
||||
if (result && socket._cotDebug) {
|
||||
console.log('[cot] frame parsed as', frameType, 'bytesConsumed=', result.bytesConsumed)
|
||||
}
|
||||
if (!result) return buf
|
||||
const { payload, bytesConsumed } = result
|
||||
const rawMessage = buf.subarray(0, bytesConsumed)
|
||||
await processFrame(socket, rawMessage, payload, authenticated)
|
||||
if (socket.destroyed) return null
|
||||
const remainingBuf = buf.subarray(bytesConsumed)
|
||||
socketBuffers.set(socket, remainingBuf)
|
||||
return processBufferedData(socket, remainingBuf, authenticated)
|
||||
}
|
||||
|
||||
async function onData(socket, data) {
|
||||
let buf = socketBuffers.get(socket)
|
||||
if (!buf) buf = Buffer.alloc(0)
|
||||
buf = Buffer.concat([buf, data])
|
||||
const existingBuf = socketBuffers.get(socket)
|
||||
const buf = Buffer.concat([existingBuf || Buffer.alloc(0), data])
|
||||
socketBuffers.set(socket, buf)
|
||||
const authenticated = Boolean(socket._cotAuthenticated)
|
||||
|
||||
@@ -120,22 +152,7 @@ async function onData(socket, data) {
|
||||
const hex = buf.subarray(0, Math.min(80, buf.length)).toString('hex')
|
||||
console.log('[cot] first chunk len=', buf.length, 'first bytes (hex):', hex, 'starts with 0xBF:', buf[0] === 0xBF, 'starts with <:', buf[0] === 0x3C)
|
||||
}
|
||||
while (buf.length > 0) {
|
||||
let result = parseTakStreamFrame(buf)
|
||||
let frameType = 'tak'
|
||||
if (!result && buf[0] === 0x3C) {
|
||||
result = parseTraditionalXmlFrame(buf)
|
||||
frameType = 'traditional'
|
||||
}
|
||||
if (result && socket._cotDebug) console.log('[cot] frame parsed as', frameType, 'bytesConsumed=', result.bytesConsumed)
|
||||
if (!result) break
|
||||
const { payload, bytesConsumed } = result
|
||||
const rawMessage = buf.subarray(0, bytesConsumed)
|
||||
await processFrame(socket, rawMessage, payload, authenticated)
|
||||
if (socket.destroyed) return
|
||||
buf = buf.subarray(bytesConsumed)
|
||||
socketBuffers.set(socket, buf)
|
||||
}
|
||||
await processBufferedData(socket, buf, authenticated)
|
||||
}
|
||||
|
||||
function setupSocket(socket, tls = false) {
|
||||
@@ -182,16 +199,16 @@ function startCotServers() {
|
||||
key: readFileSync(keyPath),
|
||||
rejectUnauthorized: false,
|
||||
}
|
||||
tlsServer = createTlsServer(tlsOpts, socket => setupSocket(socket, true))
|
||||
tlsServer.on('error', err => console.error('[cot] TLS server error:', err?.message))
|
||||
tlsServer.listen(port, '0.0.0.0', () => {
|
||||
serverState.tlsServer = createTlsServer(tlsOpts, socket => setupSocket(socket, true))
|
||||
serverState.tlsServer.on('error', err => console.error('[cot] TLS server error:', err?.message))
|
||||
serverState.tlsServer.listen(port, '0.0.0.0', () => {
|
||||
console.log('[cot] CoT server listening on 0.0.0.0:' + port + ' (TLS) — use this port in ATAK/iTAK and enable SSL')
|
||||
})
|
||||
}
|
||||
else {
|
||||
tcpServer = createTcpServer(socket => setupSocket(socket, false))
|
||||
tcpServer.on('error', err => console.error('[cot] TCP server error:', err?.message))
|
||||
tcpServer.listen(port, '0.0.0.0', () => {
|
||||
serverState.tcpServer = createTcpServer(socket => setupSocket(socket, false))
|
||||
serverState.tcpServer.on('error', err => console.error('[cot] TCP server error:', err?.message))
|
||||
serverState.tcpServer.listen(port, '0.0.0.0', () => {
|
||||
console.log('[cot] CoT server listening on 0.0.0.0:' + port + ' (plain TCP) — use this port in ATAK/iTAK with SSL disabled')
|
||||
})
|
||||
}
|
||||
@@ -209,7 +226,18 @@ export default defineNitroPlugin((nitroApp) => {
|
||||
// Start immediately so CoT is up before first request in dev; ready may fire late in some setups.
|
||||
setImmediate(startCotServers)
|
||||
|
||||
registerCleanup(async () => {
|
||||
const cleanupServers = () => {
|
||||
if (serverState.tcpServer) {
|
||||
serverState.tcpServer.close()
|
||||
serverState.tcpServer = null
|
||||
}
|
||||
if (serverState.tlsServer) {
|
||||
serverState.tlsServer.close()
|
||||
serverState.tlsServer = null
|
||||
}
|
||||
}
|
||||
|
||||
const cleanupSockets = () => {
|
||||
for (const s of allSockets) {
|
||||
try {
|
||||
s.destroy()
|
||||
@@ -220,34 +248,15 @@ export default defineNitroPlugin((nitroApp) => {
|
||||
}
|
||||
allSockets.clear()
|
||||
relaySet.clear()
|
||||
if (tcpServer) {
|
||||
tcpServer.close()
|
||||
tcpServer = null
|
||||
}
|
||||
if (tlsServer) {
|
||||
tlsServer.close()
|
||||
tlsServer = null
|
||||
}
|
||||
}
|
||||
|
||||
registerCleanup(async () => {
|
||||
cleanupSockets()
|
||||
cleanupServers()
|
||||
})
|
||||
|
||||
nitroApp.hooks.hook('close', async () => {
|
||||
for (const s of allSockets) {
|
||||
try {
|
||||
s.destroy()
|
||||
}
|
||||
catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
allSockets.clear()
|
||||
relaySet.clear()
|
||||
if (tcpServer) {
|
||||
tcpServer.close()
|
||||
tcpServer = null
|
||||
}
|
||||
if (tlsServer) {
|
||||
tlsServer.close()
|
||||
tlsServer = null
|
||||
}
|
||||
cleanupSockets()
|
||||
cleanupServers()
|
||||
})
|
||||
})
|
||||
|
||||
@@ -5,6 +5,19 @@
|
||||
|
||||
const locks = new Map()
|
||||
|
||||
/**
|
||||
* Get or create a queue for a lock key.
|
||||
* @param {string} lockKey - Lock key
|
||||
* @returns {Promise<any>} Existing or new queue promise
|
||||
*/
|
||||
const getOrCreateQueue = (lockKey) => {
|
||||
const existingQueue = locks.get(lockKey)
|
||||
if (existingQueue) return existingQueue
|
||||
const newQueue = Promise.resolve()
|
||||
locks.set(lockKey, newQueue)
|
||||
return newQueue
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquire a lock for a key and execute callback.
|
||||
* Only one callback per key executes at a time.
|
||||
@@ -14,12 +27,7 @@ const locks = new Map()
|
||||
*/
|
||||
export async function acquire(key, callback) {
|
||||
const lockKey = String(key)
|
||||
let queue = locks.get(lockKey)
|
||||
|
||||
if (!queue) {
|
||||
queue = Promise.resolve()
|
||||
locks.set(lockKey, queue)
|
||||
}
|
||||
const queue = getOrCreateQueue(lockKey)
|
||||
|
||||
const next = queue.then(() => callback()).finally(() => {
|
||||
if (locks.get(lockKey) === next) {
|
||||
|
||||
@@ -15,21 +15,20 @@ const TRADITIONAL_DELIMITER = Buffer.from('</event>', 'utf8')
|
||||
/**
|
||||
* @param {Buffer} buf
|
||||
* @param {number} offset
|
||||
* @param {number} value - Accumulated value
|
||||
* @param {number} shift - Current bit shift
|
||||
* @param {number} bytesRead - Bytes consumed so far
|
||||
* @returns {{ value: number, bytesRead: number }} Decoded varint and bytes consumed.
|
||||
*/
|
||||
function readVarint(buf, offset) {
|
||||
let value = 0
|
||||
let shift = 0
|
||||
let bytesRead = 0
|
||||
while (offset + bytesRead < buf.length) {
|
||||
const b = buf[offset + bytesRead]
|
||||
bytesRead += 1
|
||||
value += (b & 0x7F) << shift
|
||||
if ((b & 0x80) === 0) return { value, bytesRead }
|
||||
shift += 7
|
||||
if (shift > 28) return { value: 0, bytesRead: 0 }
|
||||
}
|
||||
return { value, bytesRead }
|
||||
function readVarint(buf, offset, value = 0, shift = 0, bytesRead = 0) {
|
||||
if (offset + bytesRead >= buf.length) return { value, bytesRead }
|
||||
const b = buf[offset + bytesRead]
|
||||
const newValue = value + ((b & 0x7F) << shift)
|
||||
const newBytesRead = bytesRead + 1
|
||||
if ((b & 0x80) === 0) return { value: newValue, bytesRead: newBytesRead }
|
||||
const newShift = shift + 7
|
||||
if (newShift > 28) return { value: 0, bytesRead: 0 }
|
||||
return readVarint(buf, offset, newValue, newShift, newBytesRead)
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -127,12 +126,14 @@ export function parseCotPayload(payload) {
|
||||
const uid = String(event['@_uid'] ?? event.uid ?? '')
|
||||
const eventType = String(event['@_type'] ?? event.type ?? '')
|
||||
const point = findInObject(parsed, 'point') ?? findInObject(event, 'point')
|
||||
let lat = Number.NaN
|
||||
let lng = Number.NaN
|
||||
if (point && typeof point === 'object') {
|
||||
lat = Number(point['@_lat'] ?? point.lat)
|
||||
lng = Number(point['@_lon'] ?? point.lon ?? point['@_lng'] ?? point.lng)
|
||||
const extractCoords = (pt) => {
|
||||
if (!pt || typeof pt !== 'object') return { lat: Number.NaN, lng: Number.NaN }
|
||||
return {
|
||||
lat: Number(pt['@_lat'] ?? pt.lat),
|
||||
lng: Number(pt['@_lon'] ?? pt.lon ?? pt['@_lng'] ?? pt.lng),
|
||||
}
|
||||
}
|
||||
const { lat, lng } = extractCoords(point)
|
||||
if (!Number.isFinite(lat) || !Number.isFinite(lng)) return null
|
||||
|
||||
const detail = findInObject(parsed, 'detail')
|
||||
|
||||
@@ -5,11 +5,13 @@
|
||||
import { SHUTDOWN_TIMEOUT_MS } from './constants.js'
|
||||
|
||||
const cleanupFunctions = []
|
||||
let isShuttingDown = false
|
||||
const shutdownState = {
|
||||
isShuttingDown: false,
|
||||
}
|
||||
|
||||
export function clearCleanup() {
|
||||
cleanupFunctions.length = 0
|
||||
isShuttingDown = false
|
||||
shutdownState.isShuttingDown = false
|
||||
}
|
||||
|
||||
export function registerCleanup(fn) {
|
||||
@@ -17,17 +19,25 @@ export function registerCleanup(fn) {
|
||||
cleanupFunctions.push(fn)
|
||||
}
|
||||
|
||||
async function executeCleanup() {
|
||||
if (isShuttingDown) return
|
||||
isShuttingDown = true
|
||||
for (let i = cleanupFunctions.length - 1; i >= 0; i--) {
|
||||
try {
|
||||
await cleanupFunctions[i]()
|
||||
}
|
||||
catch (error) {
|
||||
console.error(`[shutdown] Cleanup function ${i} failed:`, error?.message || String(error))
|
||||
}
|
||||
const executeCleanupFunction = async (fn, index) => {
|
||||
try {
|
||||
await fn()
|
||||
}
|
||||
catch (error) {
|
||||
console.error(`[shutdown] Cleanup function ${index} failed:`, error?.message || String(error))
|
||||
}
|
||||
}
|
||||
|
||||
const executeCleanupReverse = async (functions, index = functions.length - 1) => {
|
||||
if (index < 0) return
|
||||
await executeCleanupFunction(functions[index], index)
|
||||
return executeCleanupReverse(functions, index - 1)
|
||||
}
|
||||
|
||||
async function executeCleanup() {
|
||||
if (shutdownState.isShuttingDown) return
|
||||
shutdownState.isShuttingDown = true
|
||||
await executeCleanupReverse(cleanupFunctions)
|
||||
}
|
||||
|
||||
export async function graceful(error) {
|
||||
|
||||
Reference in New Issue
Block a user