feat(assist): added support for the cluster mode with sticky sessions

This commit is contained in:
Alexander 2025-03-20 16:56:48 +01:00
parent e04c2aa251
commit b72839f89f
8 changed files with 65 additions and 63 deletions

View file

@ -1,13 +1,11 @@
const {
extractProjectKeyFromRequest,
extractSessionIdFromRequest,
extractPayloadFromRequest,
getAvailableRooms
extractPayloadFromRequest
} = require("./helper");
module.exports = {
extractProjectKeyFromRequest,
extractSessionIdFromRequest,
extractPayloadFromRequest,
getAvailableRooms
extractPayloadFromRequest
}

View file

@ -261,10 +261,6 @@ const uniqueAutocomplete = function (list) {
return _list;
}
const getAvailableRooms = async function (io) {
return io.sockets.adapter.rooms;
}
const getCompressionConfig = function () {
// WS: The theoretical overhead per socket is 19KB (11KB for compressor and 8KB for decompressor)
let perMessageDeflate = false;
@ -305,6 +301,5 @@ module.exports = {
extractPayloadFromRequest,
sortPaginate,
uniqueAutocomplete,
getAvailableRooms,
getCompressionConfig
};

View file

@ -13,7 +13,9 @@ const {
handleEvent
} = require("./stats");
const {
getServer
sendTo,
sendFrom,
fetchSockets
} = require('../utils/wsServer');
const {
IncreaseTotalWSConnections,
@ -26,9 +28,9 @@ const {
const {logger} = require('./logger');
const deepMerge = require('@fastify/deepmerge')({all: true});
const findSessionSocketId = async (io, roomId, tabId) => {
const findSessionSocketId = async (roomId, tabId) => {
let pickFirstSession = tabId === undefined;
const connected_sockets = await io.in(roomId).fetchSockets();
const connected_sockets = await fetchSockets(roomId);
for (let socket of connected_sockets) {
if (socket.handshake.query.identity === IDENTITIES.session) {
if (pickFirstSession) {
@ -41,9 +43,9 @@ const findSessionSocketId = async (io, roomId, tabId) => {
return null;
};
async function getRoomData(io, roomID) {
async function getRoomData(roomID) {
let tabsCount = 0, agentsCount = 0, tabIDs = [], agentIDs = [];
const connected_sockets = await io.in(roomID).fetchSockets();
const connected_sockets = await fetchSockets(roomID);
if (connected_sockets.length > 0) {
for (let socket of connected_sockets) {
if (socket.handshake.query.identity === IDENTITIES.session) {
@ -77,8 +79,7 @@ async function onConnect(socket) {
IncreaseTotalWSConnections(socket.handshake.query.identity);
IncreaseOnlineConnections(socket.handshake.query.identity);
const io = getServer();
const {tabsCount, agentsCount, tabIDs, agentIDs} = await getRoomData(io, socket.handshake.query.roomId);
const {tabsCount, agentsCount, tabIDs, agentIDs} = await getRoomData(socket.handshake.query.roomId);
if (socket.handshake.query.identity === IDENTITIES.session) {
// Check if session with the same tabID already connected, if so, refuse new connexion
@ -86,7 +87,7 @@ async function onConnect(socket) {
for (let tab of tabIDs) {
if (tab === socket.handshake.query.tabId) {
logger.debug(`session already connected, refusing new connexion, peerId: ${socket.handshake.query.peerId}`);
io.to(socket.id).emit(EVENTS_DEFINITION.emit.SESSION_ALREADY_CONNECTED);
sendTo(socket.id, EVENTS_DEFINITION.emit.SESSION_ALREADY_CONNECTED);
return socket.disconnect();
}
}
@ -100,12 +101,12 @@ async function onConnect(socket) {
// Inform all connected agents about reconnected session
if (agentsCount > 0) {
logger.debug(`notifying new session about agent-existence`);
io.to(socket.id).emit(EVENTS_DEFINITION.emit.AGENTS_CONNECTED, agentIDs);
socket.to(socket.handshake.query.roomId).emit(EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id);
sendTo(socket.id, EVENTS_DEFINITION.emit.AGENTS_CONNECTED, agentIDs);
sendFrom(socket, socket.handshake.query.roomId, EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id);
}
} else if (tabsCount <= 0) {
logger.debug(`notifying new agent about no SESSIONS with peerId:${socket.handshake.query.peerId}`);
io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
sendTo(socket.id, EVENTS_DEFINITION.emit.NO_SESSIONS);
}
await socket.join(socket.handshake.query.roomId);
@ -118,7 +119,7 @@ async function onConnect(socket) {
// Stats
startAssist(socket, socket.handshake.query.agentID);
}
socket.to(socket.handshake.query.roomId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo);
sendFrom(socket, socket.handshake.query.roomId, EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo);
}
// Set disconnect handler
@ -144,13 +145,12 @@ async function onDisconnect(socket) {
logger.debug(`${socket.id} disconnected from ${socket.handshake.query.roomId}`);
if (socket.handshake.query.identity === IDENTITIES.agent) {
socket.to(socket.handshake.query.roomId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id);
sendFrom(socket, socket.handshake.query.roomId, EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id);
// Stats
endAssist(socket, socket.handshake.query.agentID);
}
logger.debug("checking for number of connected agents and sessions");
const io = getServer();
let {tabsCount, agentsCount, tabIDs, agentIDs} = await getRoomData(io, socket.handshake.query.roomId);
let {tabsCount, agentsCount, tabIDs, agentIDs} = await getRoomData(socket.handshake.query.roomId);
if (tabsCount === -1 && agentsCount === -1) {
DecreaseOnlineRooms();
@ -159,11 +159,11 @@ async function onDisconnect(socket) {
}
if (tabsCount === 0) {
logger.debug(`notifying everyone in ${socket.handshake.query.roomId} about no SESSIONS`);
socket.to(socket.handshake.query.roomId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
sendFrom(socket, socket.handshake.query.roomId, EVENTS_DEFINITION.emit.NO_SESSIONS);
}
if (agentsCount === 0) {
logger.debug(`notifying everyone in ${socket.handshake.query.roomId} about no AGENTS`);
socket.to(socket.handshake.query.roomId).emit(EVENTS_DEFINITION.emit.NO_AGENTS);
sendFrom(socket, socket.handshake.query.roomId, EVENTS_DEFINITION.emit.NO_AGENTS);
}
}
@ -178,13 +178,12 @@ async function onUpdateEvent(socket, ...args) {
socket.handshake.query.sessionInfo = deepMerge(socket.handshake.query.sessionInfo, args[0]?.data, {tabId: args[0]?.meta?.tabId});
// Update sessionInfo for all agents in the room
const io = getServer();
const connected_sockets = await io.in(socket.handshake.query.roomId).fetchSockets();
const connected_sockets = await fetchSockets(socket.handshake.query.roomId);
for (let item of connected_sockets) {
if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo) {
item.handshake.query.sessionInfo = deepMerge(item.handshake.query.sessionInfo, args[0]?.data, {tabId: args[0]?.meta?.tabId});
} else if (item.handshake.query.identity === IDENTITIES.agent) {
socket.to(item.id).emit(EVENTS_DEFINITION.listen.UPDATE_EVENT, args[0]);
sendFrom(socket, item.id, EVENTS_DEFINITION.emit.UPDATE_EVENT, args[0]);
}
}
}
@ -194,7 +193,7 @@ async function onWebrtcAgentHandler(socket, ...args) {
const agentIdToConnect = args[0]?.data?.toAgentId;
logger.debug(`${socket.id} sent webrtc event to agent:${agentIdToConnect}`);
if (agentIdToConnect && socket.handshake.sessionData.AGENTS_CONNECTED.includes(agentIdToConnect)) {
socket.to(agentIdToConnect).emit(EVENTS_DEFINITION.listen.WEBRTC_AGENT_CALL, args[0]);
sendFrom(socket, agentIdToConnect, EVENTS_DEFINITION.listen.WEBRTC_AGENT_CALL, args[0]);
}
}
}
@ -207,19 +206,18 @@ async function onAny(socket, eventName, ...args) {
args[0] = updateSessionData(socket, args[0])
if (socket.handshake.query.identity === IDENTITIES.session) {
logger.debug(`received event:${eventName}, from:${socket.handshake.query.identity}, sending message to room:${socket.handshake.query.roomId}`);
socket.to(socket.handshake.query.roomId).emit(eventName, args[0]);
sendFrom(socket, socket.handshake.query.roomId, eventName, args[0]);
} else {
// Stats
handleEvent(eventName, socket, args[0]);
logger.debug(`received event:${eventName}, from:${socket.handshake.query.identity}, sending message to session of room:${socket.handshake.query.roomId}`);
const io = getServer();
let socketId = await findSessionSocketId(io, socket.handshake.query.roomId, args[0]?.meta?.tabId);
let socketId = await findSessionSocketId(socket.handshake.query.roomId, args[0]?.meta?.tabId);
if (socketId === null) {
logger.debug(`session not found for:${socket.handshake.query.roomId}`);
io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
sendTo(socket.id, EVENTS_DEFINITION.emit.NO_SESSIONS);
} else {
logger.debug("message sent");
io.to(socketId).emit(eventName, socket.id, args[0]);
sendTo(socket.id, eventName, socket.id, args[0]);
}
}
}
@ -234,4 +232,4 @@ function updateSessionData(socket, sessionData) {
module.exports = {
onConnect,
}
}

View file

@ -3,8 +3,12 @@ const {getCompressionConfig} = require("./helper");
let io;
const getServer = function () {
return io;
function sendFrom(from, to, eventName, ...data) {
from.to(to).emit(eventName, ...data);
}
function sendTo(to, eventName, ...data) {
sendFrom(io, to, eventName, ...data);
}
const fetchSockets = async function (roomID) {
@ -35,6 +39,7 @@ const createSocketIOServer = function (server, prefix) {
module.exports = {
createSocketIOServer,
getServer,
sendTo,
sendFrom,
fetchSockets,
}

View file

@ -47,7 +47,7 @@ module.exports = {
Promise.all([pubClient.connect(), subClient.connect()])
.then(() => {
io.adapter(createAdapter(pubClient, subClient,
{requestsTimeout: process.env.REDIS_REQUESTS_TIMEOUT || 5000}));
{requestsTimeout: parseInt(process.env.REDIS_REQUESTS_TIMEOUT) || 10000}));
logger.info("> redis connected.");
})
.catch((err) => {

View file

@ -1,13 +1,11 @@
const {
extractProjectKeyFromRequest,
extractSessionIdFromRequest,
extractPayloadFromRequest,
getAvailableRooms
extractPayloadFromRequest
} = require('../utils/helper-ee');
module.exports = {
extractProjectKeyFromRequest,
extractSessionIdFromRequest,
extractPayloadFromRequest,
getAvailableRooms
extractPayloadFromRequest
}

View file

@ -91,13 +91,7 @@ const extractPayloadFromRequest = async function (req, res) {
logger.debug("payload/filters:" + JSON.stringify(filters))
return Object.keys(filters).length > 0 ? filters : undefined;
}
const getAvailableRooms = async function (io) {
if (process.env.redis === "true") {
return io.of('/').adapter.allRooms();
} else {
return helper.getAvailableRooms(io);
}
}
const getCompressionConfig = function () {
if (process.env.uws !== "true") {
return helper.getCompressionConfig();
@ -121,6 +115,5 @@ module.exports = {
extractProjectKeyFromRequest,
extractSessionIdFromRequest,
extractPayloadFromRequest,
getCompressionConfig,
getAvailableRooms
getCompressionConfig
};

View file

@ -3,9 +3,9 @@ const {getCompressionConfig} = require("./helper");
const {logger} = require('./logger');
let io;
const getServer = function () {return io;}
const useRedis = process.env.redis === "true";
const useStickySessions = process.env.stickySessions === "true";
let inMemorySocketsCache = [];
let lastCacheUpdateTime = 0;
const CACHE_REFRESH_INTERVAL = parseInt(process.env.cacheRefreshInterval) || 5000;
@ -49,13 +49,16 @@ function startCacheRefresher() {
}, CACHE_REFRESH_INTERVAL / 2);
}
const processSocketsList = function (sockets) {
let res = []
for (let socket of sockets) {
let {handshake} = socket;
res.push({handshake});
function sendFrom(from, to, eventName, ...data) {
if (useStickySessions) {
from.to(to).local().emit(eventName, ...data);
} else {
from.to(to).emit(eventName, ...data);
}
return res
}
function sendTo(to, eventName, ...data) {
sendFrom(io, to, eventName, ...data);
}
const fetchSockets = async function (roomID) {
@ -65,7 +68,16 @@ const fetchSockets = async function (roomID) {
if (!roomID) {
return await doFetchAllSockets();
}
return await io.in(roomID).fetchSockets();
try {
if (useStickySessions) {
return await io.in(roomID).local().fetchSockets();
} else {
return await io.in(roomID).fetchSockets();
}
} catch (error) {
logger.error('Error fetching sockets:', error);
return [];
}
}
const createSocketIOServer = function (server, prefix) {
@ -96,12 +108,15 @@ const createSocketIOServer = function (server, prefix) {
});
io.attachApp(server);
}
startCacheRefresher();
if (useRedis) {
startCacheRefresher();
}
return io;
}
module.exports = {
createSocketIOServer,
getServer,
sendTo,
sendFrom,
fetchSockets,
}