feat(assist): improved caching mechanism for cluster mode (#3180)

This commit is contained in:
Alexander 2025-03-19 14:53:58 +01:00 committed by GitHub
parent 4be3050e61
commit ae95b48760
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 51 additions and 31 deletions

View file

@ -27,9 +27,14 @@ const respond = function (req, res, data) {
res.setHeader('Content-Type', 'application/json'); res.setHeader('Content-Type', 'application/json');
res.end(JSON.stringify(result)); res.end(JSON.stringify(result));
} else { } else {
res.cork(() => { if (!res.aborted) {
res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify(result)); res.cork(() => {
}); res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify(result));
});
} else {
logger.debug("response aborted");
return;
}
} }
const duration = performance.now() - req.startTs; const duration = performance.now() - req.startTs;
IncreaseTotalRequests(); IncreaseTotalRequests();

View file

@ -83,9 +83,11 @@ if (process.env.uws !== "true") {
const uWrapper = function (fn) { const uWrapper = function (fn) {
return (res, req) => { return (res, req) => {
res.id = 1; res.id = 1;
res.aborted = false;
req.startTs = performance.now(); // track request's start timestamp req.startTs = performance.now(); // track request's start timestamp
req.method = req.getMethod(); req.method = req.getMethod();
res.onAborted(() => { res.onAborted(() => {
res.aborted = true;
onAbortedOrFinishedResponse(res); onAbortedOrFinishedResponse(res);
}); });
return fn(req, res); return fn(req, res);

View file

@ -3,20 +3,50 @@ const {getCompressionConfig} = require("./helper");
const {logger} = require('./logger'); const {logger} = require('./logger');
let io; let io;
const getServer = function () {return io;}
const getServer = function () { const useRedis = process.env.redis === "true";
return io; let inMemorySocketsCache = [];
let lastCacheUpdateTime = 0;
const CACHE_REFRESH_INTERVAL = parseInt(process.env.cacheRefreshInterval) || 5000;
const doFetchAllSockets = async function () {
if (useRedis) {
const now = Date.now();
logger.info(`Using in-memory cache (age: ${now - lastCacheUpdateTime}ms)`);
return inMemorySocketsCache;
} else {
try {
return await io.fetchSockets();
} catch (error) {
logger.error('Error fetching sockets:', error);
return [];
}
}
} }
let redisClient; // Background refresher that runs independently of requests
const useRedis = process.env.redis === "true"; let cacheRefresher = null;
function startCacheRefresher() {
if (cacheRefresher) clearInterval(cacheRefresher);
if (useRedis) { cacheRefresher = setInterval(async () => {
const {createClient} = require("redis"); const now = Date.now();
const REDIS_URL = (process.env.REDIS_URL || "localhost:6379").replace(/((^\w+:|^)\/\/|^)/, 'redis://'); // Only refresh if cache is stale
redisClient = createClient({url: REDIS_URL}); if (now - lastCacheUpdateTime >= CACHE_REFRESH_INTERVAL) {
redisClient.on("error", (error) => logger.error(`Redis error : ${error}`)); logger.debug('Background refresh triggered');
void redisClient.connect(); try {
const startTime = performance.now();
const result = await io.fetchSockets();
inMemorySocketsCache = result;
lastCacheUpdateTime = now;
const duration = performance.now() - startTime;
logger.info(`Background refresh complete: ${duration}ms, ${result.length} sockets`);
} catch (error) {
logger.error(`Background refresh error: ${error}`);
}
}
}, CACHE_REFRESH_INTERVAL / 2);
} }
const processSocketsList = function (sockets) { const processSocketsList = function (sockets) {
@ -28,24 +58,6 @@ const processSocketsList = function (sockets) {
return res return res
} }
const doFetchAllSockets = async function () {
if (useRedis) {
try {
let cachedResult = await redisClient.get('fetchSocketsResult');
if (cachedResult) {
return JSON.parse(cachedResult);
}
let result = await io.fetchSockets();
let cachedString = JSON.stringify(processSocketsList(result));
await redisClient.set('fetchSocketsResult', cachedString, {EX: 5});
return result;
} catch (error) {
logger.error('Error setting value with expiration:', error);
}
}
return await io.fetchSockets();
}
const fetchSockets = async function (roomID) { const fetchSockets = async function (roomID) {
if (!io) { if (!io) {
return []; return [];
@ -84,6 +96,7 @@ const createSocketIOServer = function (server, prefix) {
}); });
io.attachApp(server); io.attachApp(server);
} }
startCacheRefresher();
return io; return io;
} }