diff --git a/assist/utils/httpHandlers.js b/assist/utils/httpHandlers.js index c6514e2dc..500e4fdb5 100644 --- a/assist/utils/httpHandlers.js +++ b/assist/utils/httpHandlers.js @@ -27,9 +27,14 @@ const respond = function (req, res, data) { res.setHeader('Content-Type', 'application/json'); res.end(JSON.stringify(result)); } else { - res.cork(() => { - res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify(result)); - }); + if (!res.aborted) { + res.cork(() => { + res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify(result)); + }); + } else { + logger.debug("response aborted"); + return; + } } const duration = performance.now() - req.startTs; IncreaseTotalRequests(); diff --git a/ee/assist/server.js b/ee/assist/server.js index 123501634..2a6ec5bee 100644 --- a/ee/assist/server.js +++ b/ee/assist/server.js @@ -83,9 +83,11 @@ if (process.env.uws !== "true") { const uWrapper = function (fn) { return (res, req) => { res.id = 1; + res.aborted = false; req.startTs = performance.now(); // track request's start timestamp req.method = req.getMethod(); res.onAborted(() => { + res.aborted = true; onAbortedOrFinishedResponse(res); }); return fn(req, res); diff --git a/ee/assist/utils/wsServer.js b/ee/assist/utils/wsServer.js index 97fa6c268..e774eb8bf 100644 --- a/ee/assist/utils/wsServer.js +++ b/ee/assist/utils/wsServer.js @@ -3,20 +3,50 @@ const {getCompressionConfig} = require("./helper"); const {logger} = require('./logger'); let io; +const getServer = function () {return io;} -const getServer = function () { - return io; +const useRedis = process.env.redis === "true"; +let inMemorySocketsCache = []; +let lastCacheUpdateTime = 0; +const CACHE_REFRESH_INTERVAL = parseInt(process.env.cacheRefreshInterval) || 5000; + +const doFetchAllSockets = async function () { + if (useRedis) { + const now = Date.now(); + logger.info(`Using in-memory cache (age: ${now - lastCacheUpdateTime}ms)`); + return inMemorySocketsCache; + } else { + try { + return await io.fetchSockets(); + } catch (error) { + logger.error('Error fetching sockets:', error); + return []; + } + } } -let redisClient; -const useRedis = process.env.redis === "true"; +// Background refresher that runs independently of requests +let cacheRefresher = null; +function startCacheRefresher() { + if (cacheRefresher) clearInterval(cacheRefresher); -if (useRedis) { - const {createClient} = require("redis"); - const REDIS_URL = (process.env.REDIS_URL || "localhost:6379").replace(/((^\w+:|^)\/\/|^)/, 'redis://'); - redisClient = createClient({url: REDIS_URL}); - redisClient.on("error", (error) => logger.error(`Redis error : ${error}`)); - void redisClient.connect(); + cacheRefresher = setInterval(async () => { + const now = Date.now(); + // Only refresh if cache is stale + if (now - lastCacheUpdateTime >= CACHE_REFRESH_INTERVAL) { + logger.debug('Background refresh triggered'); + try { + const startTime = performance.now(); + const result = await io.fetchSockets(); + inMemorySocketsCache = result; + lastCacheUpdateTime = now; + const duration = performance.now() - startTime; + logger.info(`Background refresh complete: ${duration}ms, ${result.length} sockets`); + } catch (error) { + logger.error(`Background refresh error: ${error}`); + } + } + }, CACHE_REFRESH_INTERVAL / 2); } const processSocketsList = function (sockets) { @@ -28,24 +58,6 @@ const processSocketsList = function (sockets) { return res } -const doFetchAllSockets = async function () { - if (useRedis) { - try { - let cachedResult = await redisClient.get('fetchSocketsResult'); - if (cachedResult) { - return JSON.parse(cachedResult); - } - let result = await io.fetchSockets(); - let cachedString = JSON.stringify(processSocketsList(result)); - await redisClient.set('fetchSocketsResult', cachedString, {EX: 5}); - return result; - } catch (error) { - logger.error('Error setting value with expiration:', error); - } - } - return await io.fetchSockets(); -} - const fetchSockets = async function (roomID) { if (!io) { return []; @@ -84,6 +96,7 @@ const createSocketIOServer = function (server, prefix) { }); io.attachApp(server); } + startCacheRefresher(); return io; }