From 59251af8c673d8fa6522f01073a657cf4add7025 Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 17 Apr 2025 08:47:58 +0200 Subject: [PATCH] feat(assist-server): slightly improved cacher --- ee/assist-server/app/cache.js | 37 ++++++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/ee/assist-server/app/cache.js b/ee/assist-server/app/cache.js index de39a938d..20cdce96b 100644 --- a/ee/assist-server/app/cache.js +++ b/ee/assist-server/app/cache.js @@ -14,10 +14,11 @@ function generateNodeID() { return "node_"+buffer.readBigUInt64BE(0).toString(); } +const batchSize = parseInt(process.env.REDIS_BATCH_SIZE) || 1000; const PING_INTERVAL = parseInt(process.env.PING_INTERVAL_SECONDS) || 25; -const CACHE_REFRESH_INTERVAL = parseInt(process.env.CACHE_REFRESH_INTERVAL_SECONDS) || 3; +const CACHE_REFRESH_INTERVAL = parseInt(process.env.CACHE_REFRESH_INTERVAL_SECONDS) || 5; const pingInterval = Math.floor(PING_INTERVAL + PING_INTERVAL/2); -const cacheRefreshInterval = Math.floor(CACHE_REFRESH_INTERVAL + CACHE_REFRESH_INTERVAL/2); +const cacheRefreshInterval = Math.floor(CACHE_REFRESH_INTERVAL * 4); const cacheRefreshIntervalMs = CACHE_REFRESH_INTERVAL * 1000; let lastCacheUpdateTime = 0; let cacheRefresher = null; @@ -60,6 +61,7 @@ const updateNodeCache = async function (io) { logger.debug('Background refresh triggered'); try { const startTime = performance.now(); + let currStepTs = performance.now(); const sessionIDs = new Set(); const result = await io.fetchSockets(); let toAdd = new Map(); @@ -88,7 +90,7 @@ const updateNodeCache = async function (io) { localCache.refreshedSessions.clear(); localCache.deletedSessions.clear(); }) - const batchSize = 1000; + // insert new sessions in pipeline const toAddArray = Array.from(toAdd.keys()); for (let i = 0; i < toAddArray.length; i += batchSize) { @@ -99,6 +101,9 @@ const updateNodeCache = async function (io) { } await pipeline.exec(); } + logger.info(`step 1 (toAdd) complete: ${performance.now() - currStepTs}ms, ${toAddArray.length} sockets`); + currStepTs = performance.now(); + // renew sessions in pipeline for (let i = 0; i < toRenew.length; i += batchSize) { const batch = toRenew.slice(i, i + batchSize); @@ -108,6 +113,9 @@ const updateNodeCache = async function (io) { } await pipeline.exec(); } + logger.info(`step 2 (toRenew) complete: ${performance.now() - currStepTs}ms, ${toRenew.length} sockets`); + currStepTs = performance.now(); + // delete sessions in pipeline for (let i = 0; i < toDelete.length; i += batchSize) { const batch = toDelete.slice(i, i + batchSize); @@ -117,12 +125,16 @@ const updateNodeCache = async function (io) { } await pipeline.exec(); } + logger.info(`step 3 (toDelete) complete: ${performance.now() - currStepTs}ms, ${toDelete.length} sockets`); + currStepTs = performance.now(); + // add recently updated sessions if (toUpdate.length > 0) { await redisClient.sadd('assist:updated_sessions', toUpdate); } // store the node sessions await redisClient.set(`assist:nodes:${nodeID}:sessions`, JSON.stringify(Array.from(sessionIDs)), 'EX', cacheRefreshInterval); + logger.info(`step 4 (full list + updated) complete: ${performance.now() - currStepTs}ms, ${toUpdate.length} sockets`); const duration = performance.now() - startTime; logger.info(`Background refresh complete: ${duration}ms, ${result.length} sockets`); @@ -131,16 +143,31 @@ const updateNodeCache = async function (io) { } } +let isFlushing = false; + function startCacheRefresher(io) { if (cacheRefresher) clearInterval(cacheRefresher); cacheRefresher = setInterval(async () => { + if (isFlushing) { + logger.warn("Skipping tick: flush in progress"); + return; + } + const now = Date.now(); if (now - lastCacheUpdateTime < cacheRefreshIntervalMs) { return; } - await updateNodeCache(io); - lastCacheUpdateTime = now; + + isFlushing = true; + try { + await updateNodeCache(io); + lastCacheUpdateTime = Date.now(); + } catch (err) { + logger.error(`Tick error: ${err}`); + } finally { + isFlushing = false; + } }, cacheRefreshIntervalMs / 2); }