openreplay/ee/assist-server/app/cache.js
2025-04-18 15:33:01 +02:00

180 lines
No EOL
6.5 KiB
JavaScript

const {logger} = require('./logger');
const Redis = require("ioredis");
const crypto = require("crypto");
const { Mutex } = require("async-mutex");
const REDIS_URL = process.env.REDIS_URL || "localhost:6379";
const redisClient = new Redis(REDIS_URL);
redisClient.on("error", (error) => {
logger.error(`Redis cache error : ${error}`);
});
function generateNodeID() {
const buffer = crypto.randomBytes(8);
return "node_"+buffer.readBigUInt64BE(0).toString();
}
const batchSize = parseInt(process.env.REDIS_BATCH_SIZE) || 1000;
const PING_INTERVAL = parseInt(process.env.PING_INTERVAL_SECONDS) || 25;
const CACHE_REFRESH_INTERVAL = parseInt(process.env.CACHE_REFRESH_INTERVAL_SECONDS) || 5;
const pingInterval = Math.floor(PING_INTERVAL + PING_INTERVAL/2);
const cacheRefreshInterval = Math.floor(CACHE_REFRESH_INTERVAL * 4);
const cacheRefreshIntervalMs = CACHE_REFRESH_INTERVAL * 1000;
let lastCacheUpdateTime = 0;
let cacheRefresher = null;
const nodeID = process.env.HOSTNAME || generateNodeID();
const mutex = new Mutex();
const localCache = {
addedSessions: new Set(),
updatedSessions: new Set(),
refreshedSessions: new Set(),
deletedSessions: new Set()
};
const addSession = async function (sessionID) {
await mutex.runExclusive(() => {
localCache.addedSessions.add(sessionID);
});
}
const updateSession = async function (sessionID) {
await mutex.runExclusive(() => {
localCache.addedSessions.add(sessionID); // to update the session's cache
localCache.updatedSessions.add(sessionID); // to add sessionID to the list of recently updated sessions
});
}
const renewSession = async function (sessionID) {
await mutex.runExclusive(() => {
localCache.refreshedSessions.add(sessionID);
})
}
const removeSession = async function (sessionID) {
await mutex.runExclusive(() => {
localCache.deletedSessions.add(sessionID);
});
}
const updateNodeCache = async function (io) {
logger.debug('Background refresh triggered');
try {
const startTime = performance.now();
let currStepTs = performance.now();
const sessionIDs = new Set();
const result = await io.fetchSockets();
let toAdd = new Map();
let toUpdate = [];
let toRenew = [];
let toDelete = [];
await mutex.runExclusive(() => {
result.forEach((socket) => {
if (socket.handshake.query.sessId) {
const sessID = socket.handshake.query.sessId;
if (sessionIDs.has(sessID)) {
return;
}
sessionIDs.add(sessID);
if (localCache.addedSessions.has(sessID)) {
toAdd.set(sessID, socket.handshake.query.sessionInfo);
}
}
});
toUpdate = [...localCache.updatedSessions];
toRenew = [...localCache.refreshedSessions];
toDelete = [...localCache.deletedSessions];
// Clear the local cache
localCache.addedSessions.clear();
localCache.updatedSessions.clear();
localCache.refreshedSessions.clear();
localCache.deletedSessions.clear();
})
// insert new sessions in pipeline
const toAddArray = Array.from(toAdd.keys());
for (let i = 0; i < toAddArray.length; i += batchSize) {
const batch = toAddArray.slice(i, i + batchSize);
const pipeline = redisClient.pipeline();
for (const sessionID of batch) {
pipeline.set(`assist:online_sessions:${sessionID}`, JSON.stringify(toAdd.get(sessionID)), 'EX', pingInterval);
}
await pipeline.exec();
}
logger.info(`step 1 (toAdd) complete: ${(performance.now() - currStepTs).toFixed(2)}ms, ${toAddArray.length} sockets`);
currStepTs = performance.now();
// renew sessions in pipeline
for (let i = 0; i < toRenew.length; i += batchSize) {
const batch = toRenew.slice(i, i + batchSize);
const pipeline = redisClient.pipeline();
for (const sessionID of batch) {
pipeline.expire(`assist:online_sessions:${sessionID}`, pingInterval);
}
await pipeline.exec();
}
logger.info(`step 2 (toRenew) complete: ${(performance.now() - currStepTs).toFixed(2)}ms, ${toRenew.length} sockets`);
currStepTs = performance.now();
// delete sessions in pipeline
for (let i = 0; i < toDelete.length; i += batchSize) {
const batch = toDelete.slice(i, i + batchSize);
const pipeline = redisClient.pipeline();
for (const sessionID of batch) {
pipeline.del(`assist:online_sessions:${sessionID}`);
}
await pipeline.exec();
}
logger.info(`step 3 (toDelete) complete: ${(performance.now() - currStepTs).toFixed(2)}ms, ${toDelete.length} sockets`);
currStepTs = performance.now();
// add recently updated sessions
if (toUpdate.length > 0) {
await redisClient.sadd('assist:updated_sessions', toUpdate);
}
// store the node sessions
await redisClient.set(`assist:nodes:${nodeID}:sessions`, JSON.stringify(Array.from(sessionIDs)), 'EX', cacheRefreshInterval);
logger.info(`step 4 (full list + updated) complete: ${(performance.now() - currStepTs).toFixed(2)}ms, ${toUpdate.length} sockets`);
const duration = performance.now() - startTime;
logger.info(`Background refresh complete: ${duration.toFixed(2)}ms, ${result.length} sockets`);
} catch (error) {
logger.error(`Background refresh error: ${error}`);
}
}
let isFlushing = false;
function startCacheRefresher(io) {
if (cacheRefresher) clearInterval(cacheRefresher);
cacheRefresher = setInterval(async () => {
if (isFlushing) {
logger.warn("Skipping tick: flush in progress");
return;
}
const now = Date.now();
if (now - lastCacheUpdateTime < cacheRefreshIntervalMs) {
return;
}
isFlushing = true;
try {
await updateNodeCache(io);
lastCacheUpdateTime = Date.now();
} catch (err) {
logger.error(`Tick error: ${err}`);
} finally {
isFlushing = false;
}
}, cacheRefreshIntervalMs / 2);
}
module.exports = {
addSession,
updateSession,
renewSession,
removeSession,
startCacheRefresher,
}