diff --git a/ee/assist/package-lock.json b/ee/assist/package-lock.json index b7ea390b0..68ba51f15 100644 --- a/ee/assist/package-lock.json +++ b/ee/assist/package-lock.json @@ -6,12 +6,13 @@ "packages": { "": { "name": "assist-server", - "version": "v1.12.0-ee", + "version": "v1.22.0-ee", "license": "Elastic License 2.0 (ELv2)", "dependencies": { "@fastify/deepmerge": "^2.0.1", "@maxmind/geoip2-node": "^4.2.0", "@socket.io/redis-adapter": "^8.2.1", + "async-mutex": "^0.5.0", "express": "^4.21.1", "jsonwebtoken": "^9.0.2", "prom-client": "^15.0.0", @@ -202,6 +203,14 @@ "resolved": "https://registry.npmjs.org/async/-/async-3.2.6.tgz", "integrity": "sha512-htCUDlxyyCLMgaM3xXg0C0LW2xqfuQ6p05pCEIsXuyQ+a1koYKTuBMzRNwmybfLgvJDMd0r1LTn4+E0Ti6C2AA==" }, + "node_modules/async-mutex": { + "version": "0.5.0", + "resolved": "https://registry.npmjs.org/async-mutex/-/async-mutex-0.5.0.tgz", + "integrity": "sha512-1A94B18jkJ3DYq284ohPxoXbfTA5HsQ7/Mf4DEhcyLx3Bz27Rh59iScbB6EPiP+B+joue6YCxcMXSbFC1tZKwA==", + "dependencies": { + "tslib": "^2.4.0" + } + }, "node_modules/base64id": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/base64id/-/base64id-2.0.0.tgz", @@ -1389,6 +1398,11 @@ "node": ">= 14.0.0" } }, + "node_modules/tslib": { + "version": "2.8.1", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.8.1.tgz", + "integrity": "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w==" + }, "node_modules/type-is": { "version": "1.6.18", "resolved": "https://registry.npmjs.org/type-is/-/type-is-1.6.18.tgz", diff --git a/ee/assist/package.json b/ee/assist/package.json index 04b257f80..df9b24703 100644 --- a/ee/assist/package.json +++ b/ee/assist/package.json @@ -21,6 +21,7 @@ "@fastify/deepmerge": "^2.0.1", "@maxmind/geoip2-node": "^4.2.0", "@socket.io/redis-adapter": "^8.2.1", + "async-mutex": "^0.5.0", "express": "^4.21.1", "jsonwebtoken": "^9.0.2", "prom-client": "^15.0.0", diff --git a/ee/assist/servers/websocket-cluster.js b/ee/assist/servers/websocket-cluster.js index 6967bd518..be7408fe7 100644 --- a/ee/assist/servers/websocket-cluster.js +++ b/ee/assist/servers/websocket-cluster.js @@ -22,7 +22,7 @@ const {createClient} = require("redis"); const REDIS_URL = (process.env.REDIS_URL || "localhost:6379").replace(/((^\w+:|^)\/\/|^)/, 'redis://'); const pubClient = createClient({url: REDIS_URL}); const subClient = pubClient.duplicate(); -logger.info(`Using Redis: ${REDIS_URL}`); +logger.info(`Using Redis in cluster-adapter: ${REDIS_URL}`); const wsRouter = express.Router(); wsRouter.get(`/sockets-list/:projectKey/autocomplete`, autocomplete); // autocomplete diff --git a/ee/assist/utils/wsServer.js b/ee/assist/utils/wsServer.js index 97fa6c268..0eaf37abd 100644 --- a/ee/assist/utils/wsServer.js +++ b/ee/assist/utils/wsServer.js @@ -1,22 +1,27 @@ const _io = require("socket.io"); const {getCompressionConfig} = require("./helper"); const {logger} = require('./logger'); +const {Mutex} = require('async-mutex'); let io; +const getServer = function () {return io;} -const getServer = function () { - return io; -} - -let redisClient; const useRedis = process.env.redis === "true"; +const cacheExpiration = parseInt(process.env.cacheExpiration) || 10; // in seconds +const mutexTimeout = parseInt(process.env.mutexTimeout) || 10000; // in milliseconds +const fetchMutex = new Mutex(); +const fetchAllSocketsResultKey = 'fetchSocketsResult'; +let lastKnownResult = []; +// Cache layer +let redisClient; if (useRedis) { const {createClient} = require("redis"); const REDIS_URL = (process.env.REDIS_URL || "localhost:6379").replace(/((^\w+:|^)\/\/|^)/, 'redis://'); redisClient = createClient({url: REDIS_URL}); redisClient.on("error", (error) => logger.error(`Redis error : ${error}`)); void redisClient.connect(); + logger.info(`Using Redis for cache: ${REDIS_URL}`); } const processSocketsList = function (sockets) { @@ -31,19 +36,37 @@ const processSocketsList = function (sockets) { const doFetchAllSockets = async function () { if (useRedis) { try { - let cachedResult = await redisClient.get('fetchSocketsResult'); + let cachedResult = await redisClient.get(fetchAllSocketsResultKey); if (cachedResult) { return JSON.parse(cachedResult); } - let result = await io.fetchSockets(); - let cachedString = JSON.stringify(processSocketsList(result)); - await redisClient.set('fetchSocketsResult', cachedString, {EX: 5}); - return result; + return await fetchMutex.runExclusive(async () => { + try { + cachedResult = await redisClient.get(fetchAllSocketsResultKey); + if (cachedResult) { + return JSON.parse(cachedResult); + } + let result = await io.fetchSockets(); + let cachedString = JSON.stringify(processSocketsList(result)); + lastKnownResult = result; + await redisClient.set(fetchAllSocketsResultKey, cachedString, {EX: cacheExpiration}); + return result; + } catch (err) { + logger.error('Error fetching new sockets:', err); + return lastKnownResult; + } + }, mutexTimeout); } catch (error) { - logger.error('Error setting value with expiration:', error); + logger.error('Error fetching cached sockets:', error); + return lastKnownResult; } } - return await io.fetchSockets(); + try { + return await io.fetchSockets(); + } catch (error) { + logger.error('Error fetching sockets:', error); + return lastKnownResult; + } } const fetchSockets = async function (roomID) {