feat(assist): improved caching mechanism for cluster mode
This commit is contained in:
parent
8e68eb9a20
commit
91e0ebeb56
3 changed files with 51 additions and 31 deletions
|
|
@ -27,9 +27,14 @@ const respond = function (req, res, data) {
|
||||||
res.setHeader('Content-Type', 'application/json');
|
res.setHeader('Content-Type', 'application/json');
|
||||||
res.end(JSON.stringify(result));
|
res.end(JSON.stringify(result));
|
||||||
} else {
|
} else {
|
||||||
res.cork(() => {
|
if (!res.aborted) {
|
||||||
res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify(result));
|
res.cork(() => {
|
||||||
});
|
res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify(result));
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
logger.debug("response aborted");
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
const duration = performance.now() - req.startTs;
|
const duration = performance.now() - req.startTs;
|
||||||
IncreaseTotalRequests();
|
IncreaseTotalRequests();
|
||||||
|
|
|
||||||
|
|
@ -83,9 +83,11 @@ if (process.env.uws !== "true") {
|
||||||
const uWrapper = function (fn) {
|
const uWrapper = function (fn) {
|
||||||
return (res, req) => {
|
return (res, req) => {
|
||||||
res.id = 1;
|
res.id = 1;
|
||||||
|
res.aborted = false;
|
||||||
req.startTs = performance.now(); // track request's start timestamp
|
req.startTs = performance.now(); // track request's start timestamp
|
||||||
req.method = req.getMethod();
|
req.method = req.getMethod();
|
||||||
res.onAborted(() => {
|
res.onAborted(() => {
|
||||||
|
res.aborted = true;
|
||||||
onAbortedOrFinishedResponse(res);
|
onAbortedOrFinishedResponse(res);
|
||||||
});
|
});
|
||||||
return fn(req, res);
|
return fn(req, res);
|
||||||
|
|
|
||||||
|
|
@ -3,20 +3,50 @@ const {getCompressionConfig} = require("./helper");
|
||||||
const {logger} = require('./logger');
|
const {logger} = require('./logger');
|
||||||
|
|
||||||
let io;
|
let io;
|
||||||
|
const getServer = function () {return io;}
|
||||||
|
|
||||||
const getServer = function () {
|
const useRedis = process.env.redis === "true";
|
||||||
return io;
|
let inMemorySocketsCache = [];
|
||||||
|
let lastCacheUpdateTime = 0;
|
||||||
|
const CACHE_REFRESH_INTERVAL = parseInt(process.env.cacheRefreshInterval) || 5000;
|
||||||
|
|
||||||
|
const doFetchAllSockets = async function () {
|
||||||
|
if (useRedis) {
|
||||||
|
const now = Date.now();
|
||||||
|
logger.info(`Using in-memory cache (age: ${now - lastCacheUpdateTime}ms)`);
|
||||||
|
return inMemorySocketsCache;
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
return await io.fetchSockets();
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Error fetching sockets:', error);
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let redisClient;
|
// Background refresher that runs independently of requests
|
||||||
const useRedis = process.env.redis === "true";
|
let cacheRefresher = null;
|
||||||
|
function startCacheRefresher() {
|
||||||
|
if (cacheRefresher) clearInterval(cacheRefresher);
|
||||||
|
|
||||||
if (useRedis) {
|
cacheRefresher = setInterval(async () => {
|
||||||
const {createClient} = require("redis");
|
const now = Date.now();
|
||||||
const REDIS_URL = (process.env.REDIS_URL || "localhost:6379").replace(/((^\w+:|^)\/\/|^)/, 'redis://');
|
// Only refresh if cache is stale
|
||||||
redisClient = createClient({url: REDIS_URL});
|
if (now - lastCacheUpdateTime >= CACHE_REFRESH_INTERVAL) {
|
||||||
redisClient.on("error", (error) => logger.error(`Redis error : ${error}`));
|
logger.debug('Background refresh triggered');
|
||||||
void redisClient.connect();
|
try {
|
||||||
|
const startTime = performance.now();
|
||||||
|
const result = await io.fetchSockets();
|
||||||
|
inMemorySocketsCache = result;
|
||||||
|
lastCacheUpdateTime = now;
|
||||||
|
const duration = performance.now() - startTime;
|
||||||
|
logger.info(`Background refresh complete: ${duration}ms, ${result.length} sockets`);
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`Background refresh error: ${error}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, CACHE_REFRESH_INTERVAL / 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
const processSocketsList = function (sockets) {
|
const processSocketsList = function (sockets) {
|
||||||
|
|
@ -28,24 +58,6 @@ const processSocketsList = function (sockets) {
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
const doFetchAllSockets = async function () {
|
|
||||||
if (useRedis) {
|
|
||||||
try {
|
|
||||||
let cachedResult = await redisClient.get('fetchSocketsResult');
|
|
||||||
if (cachedResult) {
|
|
||||||
return JSON.parse(cachedResult);
|
|
||||||
}
|
|
||||||
let result = await io.fetchSockets();
|
|
||||||
let cachedString = JSON.stringify(processSocketsList(result));
|
|
||||||
await redisClient.set('fetchSocketsResult', cachedString, {EX: 5});
|
|
||||||
return result;
|
|
||||||
} catch (error) {
|
|
||||||
logger.error('Error setting value with expiration:', error);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return await io.fetchSockets();
|
|
||||||
}
|
|
||||||
|
|
||||||
const fetchSockets = async function (roomID) {
|
const fetchSockets = async function (roomID) {
|
||||||
if (!io) {
|
if (!io) {
|
||||||
return [];
|
return [];
|
||||||
|
|
@ -84,6 +96,7 @@ const createSocketIOServer = function (server, prefix) {
|
||||||
});
|
});
|
||||||
io.attachApp(server);
|
io.attachApp(server);
|
||||||
}
|
}
|
||||||
|
startCacheRefresher();
|
||||||
return io;
|
return io;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue