feat(assist): tried to implement a blocking queue for a heavy fetchSockets request

This commit is contained in:
Alexander 2025-03-18 11:50:23 +01:00
parent 7365d8639c
commit c68bd2b859
4 changed files with 52 additions and 14 deletions

View file

@ -6,12 +6,13 @@
"packages": {
"": {
"name": "assist-server",
"version": "v1.12.0-ee",
"version": "v1.22.0-ee",
"license": "Elastic License 2.0 (ELv2)",
"dependencies": {
"@fastify/deepmerge": "^2.0.1",
"@maxmind/geoip2-node": "^4.2.0",
"@socket.io/redis-adapter": "^8.2.1",
"async-mutex": "^0.5.0",
"express": "^4.21.1",
"jsonwebtoken": "^9.0.2",
"prom-client": "^15.0.0",
@ -202,6 +203,14 @@
"resolved": "https://registry.npmjs.org/async/-/async-3.2.6.tgz",
"integrity": "sha512-htCUDlxyyCLMgaM3xXg0C0LW2xqfuQ6p05pCEIsXuyQ+a1koYKTuBMzRNwmybfLgvJDMd0r1LTn4+E0Ti6C2AA=="
},
"node_modules/async-mutex": {
"version": "0.5.0",
"resolved": "https://registry.npmjs.org/async-mutex/-/async-mutex-0.5.0.tgz",
"integrity": "sha512-1A94B18jkJ3DYq284ohPxoXbfTA5HsQ7/Mf4DEhcyLx3Bz27Rh59iScbB6EPiP+B+joue6YCxcMXSbFC1tZKwA==",
"dependencies": {
"tslib": "^2.4.0"
}
},
"node_modules/base64id": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/base64id/-/base64id-2.0.0.tgz",
@ -1389,6 +1398,11 @@
"node": ">= 14.0.0"
}
},
"node_modules/tslib": {
"version": "2.8.1",
"resolved": "https://registry.npmjs.org/tslib/-/tslib-2.8.1.tgz",
"integrity": "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w=="
},
"node_modules/type-is": {
"version": "1.6.18",
"resolved": "https://registry.npmjs.org/type-is/-/type-is-1.6.18.tgz",

View file

@ -21,6 +21,7 @@
"@fastify/deepmerge": "^2.0.1",
"@maxmind/geoip2-node": "^4.2.0",
"@socket.io/redis-adapter": "^8.2.1",
"async-mutex": "^0.5.0",
"express": "^4.21.1",
"jsonwebtoken": "^9.0.2",
"prom-client": "^15.0.0",

View file

@ -22,7 +22,7 @@ const {createClient} = require("redis");
const REDIS_URL = (process.env.REDIS_URL || "localhost:6379").replace(/((^\w+:|^)\/\/|^)/, 'redis://');
const pubClient = createClient({url: REDIS_URL});
const subClient = pubClient.duplicate();
logger.info(`Using Redis: ${REDIS_URL}`);
logger.info(`Using Redis in cluster-adapter: ${REDIS_URL}`);
const wsRouter = express.Router();
wsRouter.get(`/sockets-list/:projectKey/autocomplete`, autocomplete); // autocomplete

View file

@ -1,22 +1,27 @@
const _io = require("socket.io");
const {getCompressionConfig} = require("./helper");
const {logger} = require('./logger');
const {Mutex} = require('async-mutex');
let io;
const getServer = function () {return io;}
const getServer = function () {
return io;
}
let redisClient;
const useRedis = process.env.redis === "true";
const cacheExpiration = parseInt(process.env.cacheExpiration) || 10; // in seconds
const mutexTimeout = parseInt(process.env.mutexTimeout) || 10000; // in milliseconds
const fetchMutex = new Mutex();
const fetchAllSocketsResultKey = 'fetchSocketsResult';
let lastKnownResult = [];
// Cache layer
let redisClient;
if (useRedis) {
const {createClient} = require("redis");
const REDIS_URL = (process.env.REDIS_URL || "localhost:6379").replace(/((^\w+:|^)\/\/|^)/, 'redis://');
redisClient = createClient({url: REDIS_URL});
redisClient.on("error", (error) => logger.error(`Redis error : ${error}`));
void redisClient.connect();
logger.info(`Using Redis for cache: ${REDIS_URL}`);
}
const processSocketsList = function (sockets) {
@ -31,19 +36,37 @@ const processSocketsList = function (sockets) {
const doFetchAllSockets = async function () {
if (useRedis) {
try {
let cachedResult = await redisClient.get('fetchSocketsResult');
let cachedResult = await redisClient.get(fetchAllSocketsResultKey);
if (cachedResult) {
return JSON.parse(cachedResult);
}
let result = await io.fetchSockets();
let cachedString = JSON.stringify(processSocketsList(result));
await redisClient.set('fetchSocketsResult', cachedString, {EX: 5});
return result;
return await fetchMutex.runExclusive(async () => {
try {
cachedResult = await redisClient.get(fetchAllSocketsResultKey);
if (cachedResult) {
return JSON.parse(cachedResult);
}
let result = await io.fetchSockets();
let cachedString = JSON.stringify(processSocketsList(result));
lastKnownResult = result;
await redisClient.set(fetchAllSocketsResultKey, cachedString, {EX: cacheExpiration});
return result;
} catch (err) {
logger.error('Error fetching new sockets:', err);
return lastKnownResult;
}
}, mutexTimeout);
} catch (error) {
logger.error('Error setting value with expiration:', error);
logger.error('Error fetching cached sockets:', error);
return lastKnownResult;
}
}
return await io.fetchSockets();
try {
return await io.fetchSockets();
} catch (error) {
logger.error('Error fetching sockets:', error);
return lastKnownResult;
}
}
const fetchSockets = async function (roomID) {