feat(assist-server): slightly improved cacher

This commit is contained in:
Alexander 2025-04-17 08:47:58 +02:00
parent 3ff994490e
commit 59251af8c6

View file

@ -14,10 +14,11 @@ function generateNodeID() {
return "node_"+buffer.readBigUInt64BE(0).toString();
}
const batchSize = parseInt(process.env.REDIS_BATCH_SIZE) || 1000;
const PING_INTERVAL = parseInt(process.env.PING_INTERVAL_SECONDS) || 25;
const CACHE_REFRESH_INTERVAL = parseInt(process.env.CACHE_REFRESH_INTERVAL_SECONDS) || 3;
const CACHE_REFRESH_INTERVAL = parseInt(process.env.CACHE_REFRESH_INTERVAL_SECONDS) || 5;
const pingInterval = Math.floor(PING_INTERVAL + PING_INTERVAL/2);
const cacheRefreshInterval = Math.floor(CACHE_REFRESH_INTERVAL + CACHE_REFRESH_INTERVAL/2);
const cacheRefreshInterval = Math.floor(CACHE_REFRESH_INTERVAL * 4);
const cacheRefreshIntervalMs = CACHE_REFRESH_INTERVAL * 1000;
let lastCacheUpdateTime = 0;
let cacheRefresher = null;
@ -60,6 +61,7 @@ const updateNodeCache = async function (io) {
logger.debug('Background refresh triggered');
try {
const startTime = performance.now();
let currStepTs = performance.now();
const sessionIDs = new Set();
const result = await io.fetchSockets();
let toAdd = new Map();
@ -88,7 +90,7 @@ const updateNodeCache = async function (io) {
localCache.refreshedSessions.clear();
localCache.deletedSessions.clear();
})
const batchSize = 1000;
// insert new sessions in pipeline
const toAddArray = Array.from(toAdd.keys());
for (let i = 0; i < toAddArray.length; i += batchSize) {
@ -99,6 +101,9 @@ const updateNodeCache = async function (io) {
}
await pipeline.exec();
}
logger.info(`step 1 (toAdd) complete: ${performance.now() - currStepTs}ms, ${toAddArray.length} sockets`);
currStepTs = performance.now();
// renew sessions in pipeline
for (let i = 0; i < toRenew.length; i += batchSize) {
const batch = toRenew.slice(i, i + batchSize);
@ -108,6 +113,9 @@ const updateNodeCache = async function (io) {
}
await pipeline.exec();
}
logger.info(`step 2 (toRenew) complete: ${performance.now() - currStepTs}ms, ${toRenew.length} sockets`);
currStepTs = performance.now();
// delete sessions in pipeline
for (let i = 0; i < toDelete.length; i += batchSize) {
const batch = toDelete.slice(i, i + batchSize);
@ -117,12 +125,16 @@ const updateNodeCache = async function (io) {
}
await pipeline.exec();
}
logger.info(`step 3 (toDelete) complete: ${performance.now() - currStepTs}ms, ${toDelete.length} sockets`);
currStepTs = performance.now();
// add recently updated sessions
if (toUpdate.length > 0) {
await redisClient.sadd('assist:updated_sessions', toUpdate);
}
// store the node sessions
await redisClient.set(`assist:nodes:${nodeID}:sessions`, JSON.stringify(Array.from(sessionIDs)), 'EX', cacheRefreshInterval);
logger.info(`step 4 (full list + updated) complete: ${performance.now() - currStepTs}ms, ${toUpdate.length} sockets`);
const duration = performance.now() - startTime;
logger.info(`Background refresh complete: ${duration}ms, ${result.length} sockets`);
@ -131,16 +143,31 @@ const updateNodeCache = async function (io) {
}
}
let isFlushing = false;
function startCacheRefresher(io) {
if (cacheRefresher) clearInterval(cacheRefresher);
cacheRefresher = setInterval(async () => {
if (isFlushing) {
logger.warn("Skipping tick: flush in progress");
return;
}
const now = Date.now();
if (now - lastCacheUpdateTime < cacheRefreshIntervalMs) {
return;
}
await updateNodeCache(io);
lastCacheUpdateTime = now;
isFlushing = true;
try {
await updateNodeCache(io);
lastCacheUpdateTime = Date.now();
} catch (err) {
logger.error(`Tick error: ${err}`);
} finally {
isFlushing = false;
}
}, cacheRefreshIntervalMs / 2);
}