diff --git a/ee/assist-server/app/cache.js b/ee/assist-server/app/cache.js index c8e2296c9..200de7cbe 100644 --- a/ee/assist-server/app/cache.js +++ b/ee/assist-server/app/cache.js @@ -1,6 +1,7 @@ const {logger} = require('./logger'); const {createClient} = require("redis"); const crypto = require("crypto"); +import { Mutex } from 'async-mutex'; let redisClient; const REDIS_URL = (process.env.REDIS_URL || "localhost:6379").replace(/((^\w+:|^)\/\/|^)/, 'redis://'); @@ -14,7 +15,7 @@ function generateNodeID() { } const PING_INTERVAL = parseInt(process.env.PING_INTERVAL_SECONDS) || 25; -const CACHE_REFRESH_INTERVAL = parseInt(process.env.CACHE_REFRESH_INTERVAL_SECONDS) || 10; +const CACHE_REFRESH_INTERVAL = parseInt(process.env.CACHE_REFRESH_INTERVAL_SECONDS) || 3; const pingInterval = Math.floor(PING_INTERVAL + PING_INTERVAL/2); const cacheRefreshInterval = Math.floor(CACHE_REFRESH_INTERVAL + CACHE_REFRESH_INTERVAL/2); const cacheRefreshIntervalMs = CACHE_REFRESH_INTERVAL * 1000; @@ -22,39 +23,109 @@ let lastCacheUpdateTime = 0; let cacheRefresher = null; const nodeID = process.env.HOSTNAME || generateNodeID(); -const addSessionToCache = async function (sessionID, sessionData) { - try { - await redisClient.set(`assist:online_sessions:${sessionID}`, JSON.stringify(sessionData), {EX: pingInterval}); - logger.debug(`Session ${sessionID} stored in Redis`); - } catch (error) { - logger.error(error); - } +const mutex = new Mutex(); +const localCache = { + addedSessions: new Set(), + updatedSessions: new Set(), + refreshedSessions: new Set(), + deletedSessions: new Set() +}; + +const addSession = async function (sessionID) { + await mutex.runExclusive(() => { + localCache.addedSessions.add(sessionID); + }); } -const renewSession = async function (sessionID){ - try { - await redisClient.expire(`assist:online_sessions:${sessionID}`, pingInterval); - logger.debug(`Session ${sessionID} renewed in Redis`); - } catch (error) { - logger.error(error); - } +const updateSession = async function (sessionID) { + await mutex.runExclusive(() => { + localCache.addedSessions.add(sessionID); // to update the session's cache + localCache.updatedSessions.add(sessionID); // to add sessionID to the list of recently updated sessions + }); } -const removeSessionFromCache = async function (sessionID) { - try { - await redisClient.del(`assist:online_sessions:${sessionID}`); - logger.debug(`Session ${sessionID} removed from Redis`); - } catch (error) { - logger.error(error); - } +const renewSession = async function (sessionID) { + await mutex.runExclusive(() => { + localCache.refreshedSessions.add(sessionID); + }) } -const setNodeSessions = async function (nodeID, sessionIDs) { +const removeSession = async function (sessionID) { + await mutex.runExclusive(() => { + localCache.deletedSessions.add(sessionID); + }); +} + +const updateNodeCache = async function (io) { + logger.debug('Background refresh triggered'); try { - await redisClient.set(`assist:nodes:${nodeID}:sessions`, JSON.stringify(sessionIDs), {EX: cacheRefreshInterval}); - logger.debug(`Node ${nodeID} sessions stored in Redis`); + const startTime = performance.now(); + const sessionIDs = new Set(); + const result = await io.fetchSockets(); + let toAdd = new Map(); + let toUpdate = []; + let toRenew = []; + let toDelete = []; + await mutex.runExclusive(() => { + result.forEach((socket) => { + if (socket.handshake.query.sessId) { + const sessID = socket.handshake.query.sessId; + if (sessionIDs.has(sessID)) { + return; + } + sessionIDs.add(sessID); + if (localCache.addedSessions.has(sessID)) { + toAdd.set(sessID, socket.handshake.query.sessionInfo); + } + } + }); + toUpdate = [...localCache.updatedSessions]; + toRenew = [...localCache.refreshedSessions]; + toDelete = [...localCache.deletedSessions]; + // Clear the local cache + localCache.addedSessions.clear(); + localCache.updatedSessions.clear(); + localCache.refreshedSessions.clear(); + localCache.deletedSessions.clear(); + }) + const batchSize = 1000; + // insert new sessions in pipeline + const toAddArray = Array.from(toAdd.keys()); + for (let i = 0; i < toAddArray.length; i += batchSize) { + const batch = toAddArray.slice(i, i + batchSize); + const pipeline = redisClient.pipeline(); + for (const sessionID of batch) { + pipeline.set(`assist:online_sessions:${sessionID}`, JSON.stringify(toAdd.get(sessionID)), {EX: pingInterval}); + } + await pipeline.exec(); + } + // renew sessions in pipeline + for (let i = 0; i < toRenew.length; i += batchSize) { + const batch = toRenew.slice(i, i + batchSize); + const pipeline = redisClient.pipeline(); + for (const sessionID of batch) { + pipeline.expire(`assist:online_sessions:${sessionID}`, pingInterval); + } + await pipeline.exec(); + } + // delete sessions in pipeline + for (let i = 0; i < toDelete.length; i += batchSize) { + const batch = toDelete.slice(i, i + batchSize); + const pipeline = redisClient.pipeline(); + for (const sessionID of batch) { + pipeline.del(`assist:online_sessions:${sessionID}`); + } + await pipeline.exec(); + } + // add recently updated sessions + await redisClient.sadd(`assist:updated_sessions`, JSON.stringify(toUpdate)); + // store the node sessions + await redisClient.set(`assist:nodes:${nodeID}:sessions`, JSON.stringify(Array.from(sessionIDs)), {EX: cacheRefreshInterval}); + + const duration = performance.now() - startTime; + logger.info(`Background refresh complete: ${duration}ms, ${result.length} sockets`); } catch (error) { - logger.error(error); + logger.error(`Background refresh error: ${error}`); } } @@ -66,29 +137,15 @@ function startCacheRefresher(io) { if (now - lastCacheUpdateTime < cacheRefreshIntervalMs) { return; } - logger.debug('Background refresh triggered'); - try { - const startTime = performance.now(); - const sessionIDs = new Set(); - const result = await io.fetchSockets(); - result.forEach((socket) => { - if (socket.handshake.query.sessId) { - sessionIDs.add(socket.handshake.query.sessId); - } - }) - await setNodeSessions(nodeID, Array.from(sessionIDs)); - lastCacheUpdateTime = now; - const duration = performance.now() - startTime; - logger.info(`Background refresh complete: ${duration}ms, ${result.length} sockets`); - } catch (error) { - logger.error(`Background refresh error: ${error}`); - } + await updateNodeCache(io); + lastCacheUpdateTime = now; }, cacheRefreshIntervalMs / 2); } module.exports = { - addSessionToCache, + addSession, + updateSession, renewSession, - removeSessionFromCache, + removeSession, startCacheRefresher, } \ No newline at end of file diff --git a/ee/assist-server/app/socket.js b/ee/assist-server/app/socket.js index d6f39ae16..f2bb882ae 100644 --- a/ee/assist-server/app/socket.js +++ b/ee/assist-server/app/socket.js @@ -6,9 +6,10 @@ const { errorHandler } = require("./assist"); const { - addSessionToCache, + addSession, + updateSession, renewSession, - removeSessionFromCache + removeSession } = require('./cache'); const { logger @@ -125,7 +126,7 @@ async function onConnect(socket) { // Add session to cache if (socket.handshake.query.identity === IDENTITIES.session) { - await addSessionToCache(socket.handshake.query.sessId, socket.handshake.query.sessionInfo); + await addSession(socket.handshake.query.sessId, socket.handshake.query.sessionInfo); } if (socket.handshake.query.identity === IDENTITIES.agent) { @@ -170,7 +171,7 @@ async function onDisconnect(socket) { let {tabsCount, agentsCount, tabIDs, agentIDs} = await getRoomData(socket.handshake.query.roomId); if (tabsCount <= 0) { - await removeSessionFromCache(socket.handshake.query.sessId); + await removeSession(socket.handshake.query.sessId); } if (tabsCount === -1 && agentsCount === -1) { @@ -198,7 +199,7 @@ async function onUpdateEvent(socket, ...args) { socket.handshake.query.sessionInfo = deepMerge(socket.handshake.query.sessionInfo, args[0]?.data, {tabId: args[0]?.meta?.tabId}); // update session cache - await addSessionToCache(socket.handshake.query.sessId, socket.handshake.query.sessionInfo); + await updateSession(socket.handshake.query.sessId, socket.handshake.query.sessionInfo); // Update sessionInfo for all agents in the room const connected_sockets = await fetchSockets(socket.handshake.query.roomId); diff --git a/ee/assist-server/package-lock.json b/ee/assist-server/package-lock.json index 8d8f17e93..0dfaeea78 100644 --- a/ee/assist-server/package-lock.json +++ b/ee/assist-server/package-lock.json @@ -11,6 +11,7 @@ "dependencies": { "@fastify/deepmerge": "^3.0.0", "@maxmind/geoip2-node": "^6.0.0", + "async-mutex": "^0.5.0", "express": "^4.21.2", "jsonwebtoken": "^9.0.2", "redis": "^4.7.0", @@ -172,6 +173,14 @@ "resolved": "https://registry.npmjs.org/async/-/async-3.2.6.tgz", "integrity": "sha512-htCUDlxyyCLMgaM3xXg0C0LW2xqfuQ6p05pCEIsXuyQ+a1koYKTuBMzRNwmybfLgvJDMd0r1LTn4+E0Ti6C2AA==" }, + "node_modules/async-mutex": { + "version": "0.5.0", + "resolved": "https://registry.npmjs.org/async-mutex/-/async-mutex-0.5.0.tgz", + "integrity": "sha512-1A94B18jkJ3DYq284ohPxoXbfTA5HsQ7/Mf4DEhcyLx3Bz27Rh59iScbB6EPiP+B+joue6YCxcMXSbFC1tZKwA==", + "dependencies": { + "tslib": "^2.4.0" + } + }, "node_modules/asynckit": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", @@ -1574,6 +1583,11 @@ "node": ">= 14.0.0" } }, + "node_modules/tslib": { + "version": "2.8.1", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.8.1.tgz", + "integrity": "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w==" + }, "node_modules/type-is": { "version": "1.6.18", "resolved": "https://registry.npmjs.org/type-is/-/type-is-1.6.18.tgz", diff --git a/ee/assist-server/package.json b/ee/assist-server/package.json index 50c1fbf95..e909ec77a 100644 --- a/ee/assist-server/package.json +++ b/ee/assist-server/package.json @@ -12,6 +12,7 @@ "dependencies": { "@fastify/deepmerge": "^3.0.0", "@maxmind/geoip2-node": "^6.0.0", + "async-mutex": "^0.5.0", "express": "^4.21.2", "jsonwebtoken": "^9.0.2", "redis": "^4.7.0", diff --git a/ee/assist-server/server.js b/ee/assist-server/server.js index c143ab0d4..a5ec880e3 100644 --- a/ee/assist-server/server.js +++ b/ee/assist-server/server.js @@ -46,9 +46,6 @@ const io = new Server({ io.use(async (socket, next) => await authorizer.check(socket, next)); io.on('connection', (socket) => onConnect(socket)); io.attachApp(app); -io.engine.on("headers", (headers) => { - headers["x-host-id"] = process.env.HOSTNAME || "unknown"; -}); setSocketIOServer(io); const HOST = process.env.LISTEN_HOST || '0.0.0.0';