From b72839f89f0035cbf90397f7cf6cde1fe1a09d6b Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 20 Mar 2025 16:56:48 +0100 Subject: [PATCH] feat(assist): added support for the cluster mode with sticky sessions --- assist/utils/extractors.js | 6 +-- assist/utils/helper.js | 5 --- assist/utils/socketHandlers.js | 52 +++++++++++++------------- assist/utils/wsServer.js | 11 ++++-- ee/assist/servers/websocket-cluster.js | 2 +- ee/assist/utils/extractors.js | 6 +-- ee/assist/utils/helper-ee.js | 11 +----- ee/assist/utils/wsServer.js | 35 ++++++++++++----- 8 files changed, 65 insertions(+), 63 deletions(-) diff --git a/assist/utils/extractors.js b/assist/utils/extractors.js index 9908bc960..34882d399 100644 --- a/assist/utils/extractors.js +++ b/assist/utils/extractors.js @@ -1,13 +1,11 @@ const { extractProjectKeyFromRequest, extractSessionIdFromRequest, - extractPayloadFromRequest, - getAvailableRooms + extractPayloadFromRequest } = require("./helper"); module.exports = { extractProjectKeyFromRequest, extractSessionIdFromRequest, - extractPayloadFromRequest, - getAvailableRooms + extractPayloadFromRequest } \ No newline at end of file diff --git a/assist/utils/helper.js b/assist/utils/helper.js index f91c88380..fd6133f4f 100644 --- a/assist/utils/helper.js +++ b/assist/utils/helper.js @@ -261,10 +261,6 @@ const uniqueAutocomplete = function (list) { return _list; } -const getAvailableRooms = async function (io) { - return io.sockets.adapter.rooms; -} - const getCompressionConfig = function () { // WS: The theoretical overhead per socket is 19KB (11KB for compressor and 8KB for decompressor) let perMessageDeflate = false; @@ -305,6 +301,5 @@ module.exports = { extractPayloadFromRequest, sortPaginate, uniqueAutocomplete, - getAvailableRooms, getCompressionConfig }; \ No newline at end of file diff --git a/assist/utils/socketHandlers.js b/assist/utils/socketHandlers.js index 503969c08..67e47cdab 100644 --- a/assist/utils/socketHandlers.js +++ b/assist/utils/socketHandlers.js @@ -13,7 +13,9 @@ const { handleEvent } = require("./stats"); const { - getServer + sendTo, + sendFrom, + fetchSockets } = require('../utils/wsServer'); const { IncreaseTotalWSConnections, @@ -26,9 +28,9 @@ const { const {logger} = require('./logger'); const deepMerge = require('@fastify/deepmerge')({all: true}); -const findSessionSocketId = async (io, roomId, tabId) => { +const findSessionSocketId = async (roomId, tabId) => { let pickFirstSession = tabId === undefined; - const connected_sockets = await io.in(roomId).fetchSockets(); + const connected_sockets = await fetchSockets(roomId); for (let socket of connected_sockets) { if (socket.handshake.query.identity === IDENTITIES.session) { if (pickFirstSession) { @@ -41,9 +43,9 @@ const findSessionSocketId = async (io, roomId, tabId) => { return null; }; -async function getRoomData(io, roomID) { +async function getRoomData(roomID) { let tabsCount = 0, agentsCount = 0, tabIDs = [], agentIDs = []; - const connected_sockets = await io.in(roomID).fetchSockets(); + const connected_sockets = await fetchSockets(roomID); if (connected_sockets.length > 0) { for (let socket of connected_sockets) { if (socket.handshake.query.identity === IDENTITIES.session) { @@ -77,8 +79,7 @@ async function onConnect(socket) { IncreaseTotalWSConnections(socket.handshake.query.identity); IncreaseOnlineConnections(socket.handshake.query.identity); - const io = getServer(); - const {tabsCount, agentsCount, tabIDs, agentIDs} = await getRoomData(io, socket.handshake.query.roomId); + const {tabsCount, agentsCount, tabIDs, agentIDs} = await getRoomData(socket.handshake.query.roomId); if (socket.handshake.query.identity === IDENTITIES.session) { // Check if session with the same tabID already connected, if so, refuse new connexion @@ -86,7 +87,7 @@ async function onConnect(socket) { for (let tab of tabIDs) { if (tab === socket.handshake.query.tabId) { logger.debug(`session already connected, refusing new connexion, peerId: ${socket.handshake.query.peerId}`); - io.to(socket.id).emit(EVENTS_DEFINITION.emit.SESSION_ALREADY_CONNECTED); + sendTo(socket.id, EVENTS_DEFINITION.emit.SESSION_ALREADY_CONNECTED); return socket.disconnect(); } } @@ -100,12 +101,12 @@ async function onConnect(socket) { // Inform all connected agents about reconnected session if (agentsCount > 0) { logger.debug(`notifying new session about agent-existence`); - io.to(socket.id).emit(EVENTS_DEFINITION.emit.AGENTS_CONNECTED, agentIDs); - socket.to(socket.handshake.query.roomId).emit(EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id); + sendTo(socket.id, EVENTS_DEFINITION.emit.AGENTS_CONNECTED, agentIDs); + sendFrom(socket, socket.handshake.query.roomId, EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id); } } else if (tabsCount <= 0) { logger.debug(`notifying new agent about no SESSIONS with peerId:${socket.handshake.query.peerId}`); - io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); + sendTo(socket.id, EVENTS_DEFINITION.emit.NO_SESSIONS); } await socket.join(socket.handshake.query.roomId); @@ -118,7 +119,7 @@ async function onConnect(socket) { // Stats startAssist(socket, socket.handshake.query.agentID); } - socket.to(socket.handshake.query.roomId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo); + sendFrom(socket, socket.handshake.query.roomId, EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo); } // Set disconnect handler @@ -144,13 +145,12 @@ async function onDisconnect(socket) { logger.debug(`${socket.id} disconnected from ${socket.handshake.query.roomId}`); if (socket.handshake.query.identity === IDENTITIES.agent) { - socket.to(socket.handshake.query.roomId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id); + sendFrom(socket, socket.handshake.query.roomId, EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id); // Stats endAssist(socket, socket.handshake.query.agentID); } logger.debug("checking for number of connected agents and sessions"); - const io = getServer(); - let {tabsCount, agentsCount, tabIDs, agentIDs} = await getRoomData(io, socket.handshake.query.roomId); + let {tabsCount, agentsCount, tabIDs, agentIDs} = await getRoomData(socket.handshake.query.roomId); if (tabsCount === -1 && agentsCount === -1) { DecreaseOnlineRooms(); @@ -159,11 +159,11 @@ async function onDisconnect(socket) { } if (tabsCount === 0) { logger.debug(`notifying everyone in ${socket.handshake.query.roomId} about no SESSIONS`); - socket.to(socket.handshake.query.roomId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); + sendFrom(socket, socket.handshake.query.roomId, EVENTS_DEFINITION.emit.NO_SESSIONS); } if (agentsCount === 0) { logger.debug(`notifying everyone in ${socket.handshake.query.roomId} about no AGENTS`); - socket.to(socket.handshake.query.roomId).emit(EVENTS_DEFINITION.emit.NO_AGENTS); + sendFrom(socket, socket.handshake.query.roomId, EVENTS_DEFINITION.emit.NO_AGENTS); } } @@ -178,13 +178,12 @@ async function onUpdateEvent(socket, ...args) { socket.handshake.query.sessionInfo = deepMerge(socket.handshake.query.sessionInfo, args[0]?.data, {tabId: args[0]?.meta?.tabId}); // Update sessionInfo for all agents in the room - const io = getServer(); - const connected_sockets = await io.in(socket.handshake.query.roomId).fetchSockets(); + const connected_sockets = await fetchSockets(socket.handshake.query.roomId); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo) { item.handshake.query.sessionInfo = deepMerge(item.handshake.query.sessionInfo, args[0]?.data, {tabId: args[0]?.meta?.tabId}); } else if (item.handshake.query.identity === IDENTITIES.agent) { - socket.to(item.id).emit(EVENTS_DEFINITION.listen.UPDATE_EVENT, args[0]); + sendFrom(socket, item.id, EVENTS_DEFINITION.emit.UPDATE_EVENT, args[0]); } } } @@ -194,7 +193,7 @@ async function onWebrtcAgentHandler(socket, ...args) { const agentIdToConnect = args[0]?.data?.toAgentId; logger.debug(`${socket.id} sent webrtc event to agent:${agentIdToConnect}`); if (agentIdToConnect && socket.handshake.sessionData.AGENTS_CONNECTED.includes(agentIdToConnect)) { - socket.to(agentIdToConnect).emit(EVENTS_DEFINITION.listen.WEBRTC_AGENT_CALL, args[0]); + sendFrom(socket, agentIdToConnect, EVENTS_DEFINITION.listen.WEBRTC_AGENT_CALL, args[0]); } } } @@ -207,19 +206,18 @@ async function onAny(socket, eventName, ...args) { args[0] = updateSessionData(socket, args[0]) if (socket.handshake.query.identity === IDENTITIES.session) { logger.debug(`received event:${eventName}, from:${socket.handshake.query.identity}, sending message to room:${socket.handshake.query.roomId}`); - socket.to(socket.handshake.query.roomId).emit(eventName, args[0]); + sendFrom(socket, socket.handshake.query.roomId, eventName, args[0]); } else { // Stats handleEvent(eventName, socket, args[0]); logger.debug(`received event:${eventName}, from:${socket.handshake.query.identity}, sending message to session of room:${socket.handshake.query.roomId}`); - const io = getServer(); - let socketId = await findSessionSocketId(io, socket.handshake.query.roomId, args[0]?.meta?.tabId); + let socketId = await findSessionSocketId(socket.handshake.query.roomId, args[0]?.meta?.tabId); if (socketId === null) { logger.debug(`session not found for:${socket.handshake.query.roomId}`); - io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); + sendTo(socket.id, EVENTS_DEFINITION.emit.NO_SESSIONS); } else { logger.debug("message sent"); - io.to(socketId).emit(eventName, socket.id, args[0]); + sendTo(socket.id, eventName, socket.id, args[0]); } } } @@ -234,4 +232,4 @@ function updateSessionData(socket, sessionData) { module.exports = { onConnect, -} \ No newline at end of file +} diff --git a/assist/utils/wsServer.js b/assist/utils/wsServer.js index bac62e750..e672a9f7e 100644 --- a/assist/utils/wsServer.js +++ b/assist/utils/wsServer.js @@ -3,8 +3,12 @@ const {getCompressionConfig} = require("./helper"); let io; -const getServer = function () { - return io; +function sendFrom(from, to, eventName, ...data) { + from.to(to).emit(eventName, ...data); +} + +function sendTo(to, eventName, ...data) { + sendFrom(io, to, eventName, ...data); } const fetchSockets = async function (roomID) { @@ -35,6 +39,7 @@ const createSocketIOServer = function (server, prefix) { module.exports = { createSocketIOServer, - getServer, + sendTo, + sendFrom, fetchSockets, } \ No newline at end of file diff --git a/ee/assist/servers/websocket-cluster.js b/ee/assist/servers/websocket-cluster.js index 6967bd518..abbde07fd 100644 --- a/ee/assist/servers/websocket-cluster.js +++ b/ee/assist/servers/websocket-cluster.js @@ -47,7 +47,7 @@ module.exports = { Promise.all([pubClient.connect(), subClient.connect()]) .then(() => { io.adapter(createAdapter(pubClient, subClient, - {requestsTimeout: process.env.REDIS_REQUESTS_TIMEOUT || 5000})); + {requestsTimeout: parseInt(process.env.REDIS_REQUESTS_TIMEOUT) || 10000})); logger.info("> redis connected."); }) .catch((err) => { diff --git a/ee/assist/utils/extractors.js b/ee/assist/utils/extractors.js index 8a19cf2e6..9a7e5e904 100644 --- a/ee/assist/utils/extractors.js +++ b/ee/assist/utils/extractors.js @@ -1,13 +1,11 @@ const { extractProjectKeyFromRequest, extractSessionIdFromRequest, - extractPayloadFromRequest, - getAvailableRooms + extractPayloadFromRequest } = require('../utils/helper-ee'); module.exports = { extractProjectKeyFromRequest, extractSessionIdFromRequest, - extractPayloadFromRequest, - getAvailableRooms + extractPayloadFromRequest } \ No newline at end of file diff --git a/ee/assist/utils/helper-ee.js b/ee/assist/utils/helper-ee.js index 4abe98b2a..6e6d01e81 100644 --- a/ee/assist/utils/helper-ee.js +++ b/ee/assist/utils/helper-ee.js @@ -91,13 +91,7 @@ const extractPayloadFromRequest = async function (req, res) { logger.debug("payload/filters:" + JSON.stringify(filters)) return Object.keys(filters).length > 0 ? filters : undefined; } -const getAvailableRooms = async function (io) { - if (process.env.redis === "true") { - return io.of('/').adapter.allRooms(); - } else { - return helper.getAvailableRooms(io); - } -} + const getCompressionConfig = function () { if (process.env.uws !== "true") { return helper.getCompressionConfig(); @@ -121,6 +115,5 @@ module.exports = { extractProjectKeyFromRequest, extractSessionIdFromRequest, extractPayloadFromRequest, - getCompressionConfig, - getAvailableRooms + getCompressionConfig }; \ No newline at end of file diff --git a/ee/assist/utils/wsServer.js b/ee/assist/utils/wsServer.js index e774eb8bf..b9edfa854 100644 --- a/ee/assist/utils/wsServer.js +++ b/ee/assist/utils/wsServer.js @@ -3,9 +3,9 @@ const {getCompressionConfig} = require("./helper"); const {logger} = require('./logger'); let io; -const getServer = function () {return io;} const useRedis = process.env.redis === "true"; +const useStickySessions = process.env.stickySessions === "true"; let inMemorySocketsCache = []; let lastCacheUpdateTime = 0; const CACHE_REFRESH_INTERVAL = parseInt(process.env.cacheRefreshInterval) || 5000; @@ -49,13 +49,16 @@ function startCacheRefresher() { }, CACHE_REFRESH_INTERVAL / 2); } -const processSocketsList = function (sockets) { - let res = [] - for (let socket of sockets) { - let {handshake} = socket; - res.push({handshake}); +function sendFrom(from, to, eventName, ...data) { + if (useStickySessions) { + from.to(to).local().emit(eventName, ...data); + } else { + from.to(to).emit(eventName, ...data); } - return res +} + +function sendTo(to, eventName, ...data) { + sendFrom(io, to, eventName, ...data); } const fetchSockets = async function (roomID) { @@ -65,7 +68,16 @@ const fetchSockets = async function (roomID) { if (!roomID) { return await doFetchAllSockets(); } - return await io.in(roomID).fetchSockets(); + try { + if (useStickySessions) { + return await io.in(roomID).local().fetchSockets(); + } else { + return await io.in(roomID).fetchSockets(); + } + } catch (error) { + logger.error('Error fetching sockets:', error); + return []; + } } const createSocketIOServer = function (server, prefix) { @@ -96,12 +108,15 @@ const createSocketIOServer = function (server, prefix) { }); io.attachApp(server); } - startCacheRefresher(); + if (useRedis) { + startCacheRefresher(); + } return io; } module.exports = { createSocketIOServer, - getServer, + sendTo, + sendFrom, fetchSockets, } \ No newline at end of file