feat(assist-server): better cache management

This commit is contained in:
Alexander 2025-04-16 18:28:02 +02:00
parent f15e84086f
commit c4e4c3d2cd
5 changed files with 123 additions and 53 deletions

View file

@ -1,6 +1,7 @@
const {logger} = require('./logger'); const {logger} = require('./logger');
const {createClient} = require("redis"); const {createClient} = require("redis");
const crypto = require("crypto"); const crypto = require("crypto");
import { Mutex } from 'async-mutex';
let redisClient; let redisClient;
const REDIS_URL = (process.env.REDIS_URL || "localhost:6379").replace(/((^\w+:|^)\/\/|^)/, 'redis://'); const REDIS_URL = (process.env.REDIS_URL || "localhost:6379").replace(/((^\w+:|^)\/\/|^)/, 'redis://');
@ -14,7 +15,7 @@ function generateNodeID() {
} }
const PING_INTERVAL = parseInt(process.env.PING_INTERVAL_SECONDS) || 25; const PING_INTERVAL = parseInt(process.env.PING_INTERVAL_SECONDS) || 25;
const CACHE_REFRESH_INTERVAL = parseInt(process.env.CACHE_REFRESH_INTERVAL_SECONDS) || 10; const CACHE_REFRESH_INTERVAL = parseInt(process.env.CACHE_REFRESH_INTERVAL_SECONDS) || 3;
const pingInterval = Math.floor(PING_INTERVAL + PING_INTERVAL/2); const pingInterval = Math.floor(PING_INTERVAL + PING_INTERVAL/2);
const cacheRefreshInterval = Math.floor(CACHE_REFRESH_INTERVAL + CACHE_REFRESH_INTERVAL/2); const cacheRefreshInterval = Math.floor(CACHE_REFRESH_INTERVAL + CACHE_REFRESH_INTERVAL/2);
const cacheRefreshIntervalMs = CACHE_REFRESH_INTERVAL * 1000; const cacheRefreshIntervalMs = CACHE_REFRESH_INTERVAL * 1000;
@ -22,39 +23,109 @@ let lastCacheUpdateTime = 0;
let cacheRefresher = null; let cacheRefresher = null;
const nodeID = process.env.HOSTNAME || generateNodeID(); const nodeID = process.env.HOSTNAME || generateNodeID();
const addSessionToCache = async function (sessionID, sessionData) { const mutex = new Mutex();
try { const localCache = {
await redisClient.set(`assist:online_sessions:${sessionID}`, JSON.stringify(sessionData), {EX: pingInterval}); addedSessions: new Set(),
logger.debug(`Session ${sessionID} stored in Redis`); updatedSessions: new Set(),
} catch (error) { refreshedSessions: new Set(),
logger.error(error); deletedSessions: new Set()
} };
const addSession = async function (sessionID) {
await mutex.runExclusive(() => {
localCache.addedSessions.add(sessionID);
});
} }
const renewSession = async function (sessionID){ const updateSession = async function (sessionID) {
try { await mutex.runExclusive(() => {
await redisClient.expire(`assist:online_sessions:${sessionID}`, pingInterval); localCache.addedSessions.add(sessionID); // to update the session's cache
logger.debug(`Session ${sessionID} renewed in Redis`); localCache.updatedSessions.add(sessionID); // to add sessionID to the list of recently updated sessions
} catch (error) { });
logger.error(error);
}
} }
const removeSessionFromCache = async function (sessionID) { const renewSession = async function (sessionID) {
try { await mutex.runExclusive(() => {
await redisClient.del(`assist:online_sessions:${sessionID}`); localCache.refreshedSessions.add(sessionID);
logger.debug(`Session ${sessionID} removed from Redis`); })
} catch (error) {
logger.error(error);
}
} }
const setNodeSessions = async function (nodeID, sessionIDs) { const removeSession = async function (sessionID) {
await mutex.runExclusive(() => {
localCache.deletedSessions.add(sessionID);
});
}
const updateNodeCache = async function (io) {
logger.debug('Background refresh triggered');
try { try {
await redisClient.set(`assist:nodes:${nodeID}:sessions`, JSON.stringify(sessionIDs), {EX: cacheRefreshInterval}); const startTime = performance.now();
logger.debug(`Node ${nodeID} sessions stored in Redis`); const sessionIDs = new Set();
const result = await io.fetchSockets();
let toAdd = new Map();
let toUpdate = [];
let toRenew = [];
let toDelete = [];
await mutex.runExclusive(() => {
result.forEach((socket) => {
if (socket.handshake.query.sessId) {
const sessID = socket.handshake.query.sessId;
if (sessionIDs.has(sessID)) {
return;
}
sessionIDs.add(sessID);
if (localCache.addedSessions.has(sessID)) {
toAdd.set(sessID, socket.handshake.query.sessionInfo);
}
}
});
toUpdate = [...localCache.updatedSessions];
toRenew = [...localCache.refreshedSessions];
toDelete = [...localCache.deletedSessions];
// Clear the local cache
localCache.addedSessions.clear();
localCache.updatedSessions.clear();
localCache.refreshedSessions.clear();
localCache.deletedSessions.clear();
})
const batchSize = 1000;
// insert new sessions in pipeline
const toAddArray = Array.from(toAdd.keys());
for (let i = 0; i < toAddArray.length; i += batchSize) {
const batch = toAddArray.slice(i, i + batchSize);
const pipeline = redisClient.pipeline();
for (const sessionID of batch) {
pipeline.set(`assist:online_sessions:${sessionID}`, JSON.stringify(toAdd.get(sessionID)), {EX: pingInterval});
}
await pipeline.exec();
}
// renew sessions in pipeline
for (let i = 0; i < toRenew.length; i += batchSize) {
const batch = toRenew.slice(i, i + batchSize);
const pipeline = redisClient.pipeline();
for (const sessionID of batch) {
pipeline.expire(`assist:online_sessions:${sessionID}`, pingInterval);
}
await pipeline.exec();
}
// delete sessions in pipeline
for (let i = 0; i < toDelete.length; i += batchSize) {
const batch = toDelete.slice(i, i + batchSize);
const pipeline = redisClient.pipeline();
for (const sessionID of batch) {
pipeline.del(`assist:online_sessions:${sessionID}`);
}
await pipeline.exec();
}
// add recently updated sessions
await redisClient.sadd(`assist:updated_sessions`, JSON.stringify(toUpdate));
// store the node sessions
await redisClient.set(`assist:nodes:${nodeID}:sessions`, JSON.stringify(Array.from(sessionIDs)), {EX: cacheRefreshInterval});
const duration = performance.now() - startTime;
logger.info(`Background refresh complete: ${duration}ms, ${result.length} sockets`);
} catch (error) { } catch (error) {
logger.error(error); logger.error(`Background refresh error: ${error}`);
} }
} }
@ -66,29 +137,15 @@ function startCacheRefresher(io) {
if (now - lastCacheUpdateTime < cacheRefreshIntervalMs) { if (now - lastCacheUpdateTime < cacheRefreshIntervalMs) {
return; return;
} }
logger.debug('Background refresh triggered'); await updateNodeCache(io);
try { lastCacheUpdateTime = now;
const startTime = performance.now();
const sessionIDs = new Set();
const result = await io.fetchSockets();
result.forEach((socket) => {
if (socket.handshake.query.sessId) {
sessionIDs.add(socket.handshake.query.sessId);
}
})
await setNodeSessions(nodeID, Array.from(sessionIDs));
lastCacheUpdateTime = now;
const duration = performance.now() - startTime;
logger.info(`Background refresh complete: ${duration}ms, ${result.length} sockets`);
} catch (error) {
logger.error(`Background refresh error: ${error}`);
}
}, cacheRefreshIntervalMs / 2); }, cacheRefreshIntervalMs / 2);
} }
module.exports = { module.exports = {
addSessionToCache, addSession,
updateSession,
renewSession, renewSession,
removeSessionFromCache, removeSession,
startCacheRefresher, startCacheRefresher,
} }

View file

@ -6,9 +6,10 @@ const {
errorHandler errorHandler
} = require("./assist"); } = require("./assist");
const { const {
addSessionToCache, addSession,
updateSession,
renewSession, renewSession,
removeSessionFromCache removeSession
} = require('./cache'); } = require('./cache');
const { const {
logger logger
@ -125,7 +126,7 @@ async function onConnect(socket) {
// Add session to cache // Add session to cache
if (socket.handshake.query.identity === IDENTITIES.session) { if (socket.handshake.query.identity === IDENTITIES.session) {
await addSessionToCache(socket.handshake.query.sessId, socket.handshake.query.sessionInfo); await addSession(socket.handshake.query.sessId, socket.handshake.query.sessionInfo);
} }
if (socket.handshake.query.identity === IDENTITIES.agent) { if (socket.handshake.query.identity === IDENTITIES.agent) {
@ -170,7 +171,7 @@ async function onDisconnect(socket) {
let {tabsCount, agentsCount, tabIDs, agentIDs} = await getRoomData(socket.handshake.query.roomId); let {tabsCount, agentsCount, tabIDs, agentIDs} = await getRoomData(socket.handshake.query.roomId);
if (tabsCount <= 0) { if (tabsCount <= 0) {
await removeSessionFromCache(socket.handshake.query.sessId); await removeSession(socket.handshake.query.sessId);
} }
if (tabsCount === -1 && agentsCount === -1) { if (tabsCount === -1 && agentsCount === -1) {
@ -198,7 +199,7 @@ async function onUpdateEvent(socket, ...args) {
socket.handshake.query.sessionInfo = deepMerge(socket.handshake.query.sessionInfo, args[0]?.data, {tabId: args[0]?.meta?.tabId}); socket.handshake.query.sessionInfo = deepMerge(socket.handshake.query.sessionInfo, args[0]?.data, {tabId: args[0]?.meta?.tabId});
// update session cache // update session cache
await addSessionToCache(socket.handshake.query.sessId, socket.handshake.query.sessionInfo); await updateSession(socket.handshake.query.sessId, socket.handshake.query.sessionInfo);
// Update sessionInfo for all agents in the room // Update sessionInfo for all agents in the room
const connected_sockets = await fetchSockets(socket.handshake.query.roomId); const connected_sockets = await fetchSockets(socket.handshake.query.roomId);

View file

@ -11,6 +11,7 @@
"dependencies": { "dependencies": {
"@fastify/deepmerge": "^3.0.0", "@fastify/deepmerge": "^3.0.0",
"@maxmind/geoip2-node": "^6.0.0", "@maxmind/geoip2-node": "^6.0.0",
"async-mutex": "^0.5.0",
"express": "^4.21.2", "express": "^4.21.2",
"jsonwebtoken": "^9.0.2", "jsonwebtoken": "^9.0.2",
"redis": "^4.7.0", "redis": "^4.7.0",
@ -172,6 +173,14 @@
"resolved": "https://registry.npmjs.org/async/-/async-3.2.6.tgz", "resolved": "https://registry.npmjs.org/async/-/async-3.2.6.tgz",
"integrity": "sha512-htCUDlxyyCLMgaM3xXg0C0LW2xqfuQ6p05pCEIsXuyQ+a1koYKTuBMzRNwmybfLgvJDMd0r1LTn4+E0Ti6C2AA==" "integrity": "sha512-htCUDlxyyCLMgaM3xXg0C0LW2xqfuQ6p05pCEIsXuyQ+a1koYKTuBMzRNwmybfLgvJDMd0r1LTn4+E0Ti6C2AA=="
}, },
"node_modules/async-mutex": {
"version": "0.5.0",
"resolved": "https://registry.npmjs.org/async-mutex/-/async-mutex-0.5.0.tgz",
"integrity": "sha512-1A94B18jkJ3DYq284ohPxoXbfTA5HsQ7/Mf4DEhcyLx3Bz27Rh59iScbB6EPiP+B+joue6YCxcMXSbFC1tZKwA==",
"dependencies": {
"tslib": "^2.4.0"
}
},
"node_modules/asynckit": { "node_modules/asynckit": {
"version": "0.4.0", "version": "0.4.0",
"resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz",
@ -1574,6 +1583,11 @@
"node": ">= 14.0.0" "node": ">= 14.0.0"
} }
}, },
"node_modules/tslib": {
"version": "2.8.1",
"resolved": "https://registry.npmjs.org/tslib/-/tslib-2.8.1.tgz",
"integrity": "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w=="
},
"node_modules/type-is": { "node_modules/type-is": {
"version": "1.6.18", "version": "1.6.18",
"resolved": "https://registry.npmjs.org/type-is/-/type-is-1.6.18.tgz", "resolved": "https://registry.npmjs.org/type-is/-/type-is-1.6.18.tgz",

View file

@ -12,6 +12,7 @@
"dependencies": { "dependencies": {
"@fastify/deepmerge": "^3.0.0", "@fastify/deepmerge": "^3.0.0",
"@maxmind/geoip2-node": "^6.0.0", "@maxmind/geoip2-node": "^6.0.0",
"async-mutex": "^0.5.0",
"express": "^4.21.2", "express": "^4.21.2",
"jsonwebtoken": "^9.0.2", "jsonwebtoken": "^9.0.2",
"redis": "^4.7.0", "redis": "^4.7.0",

View file

@ -46,9 +46,6 @@ const io = new Server({
io.use(async (socket, next) => await authorizer.check(socket, next)); io.use(async (socket, next) => await authorizer.check(socket, next));
io.on('connection', (socket) => onConnect(socket)); io.on('connection', (socket) => onConnect(socket));
io.attachApp(app); io.attachApp(app);
io.engine.on("headers", (headers) => {
headers["x-host-id"] = process.env.HOSTNAME || "unknown";
});
setSocketIOServer(io); setSocketIOServer(io);
const HOST = process.env.LISTEN_HOST || '0.0.0.0'; const HOST = process.env.LISTEN_HOST || '0.0.0.0';