From 0de0dd4cbfa175b476acf6582bf73497522191be Mon Sep 17 00:00:00 2001 From: Alexander Date: Tue, 6 Feb 2024 16:30:43 +0100 Subject: [PATCH] feat(assist): improved assist performance (for one-node mode and cluster mode) (#1880) --- assist/servers/websocket.js | 2 +- assist/utils/helper.js | 2 +- assist/utils/httpHandlers.js | 94 +++++++++---- assist/utils/rooms.js | 49 ------- assist/utils/socketHandlers.js | 96 ++++++------- assist/utils/wsServer.js | 11 ++ ee/assist/servers/websocket-cluster.js | 2 +- ee/assist/utils/httpHandlers-cluster.js | 176 ------------------------ ee/assist/utils/wsServer.js | 49 +++++++ 9 files changed, 172 insertions(+), 309 deletions(-) delete mode 100644 assist/utils/rooms.js delete mode 100644 ee/assist/utils/httpHandlers-cluster.js diff --git a/assist/servers/websocket.js b/assist/servers/websocket.js index 4ea23b67d..382498b56 100644 --- a/assist/servers/websocket.js +++ b/assist/servers/websocket.js @@ -40,4 +40,4 @@ module.exports = { socketsLiveByProject, socketsLiveBySession } -}; +}; \ No newline at end of file diff --git a/assist/utils/helper.js b/assist/utils/helper.js index 2a03c58b3..79860fa21 100644 --- a/assist/utils/helper.js +++ b/assist/utils/helper.js @@ -20,7 +20,7 @@ const extractTabId = (peerId) => { const extractPeerId = (peerId) => { let splited = peerId.split("-"); if (splited.length < 2 || splited.length > 3) { - debug && console.error(`cannot split peerId: ${peerId}`); + // debug && console.error(`cannot split peerId: ${peerId}`); return {}; } if (PROJECT_KEY_LENGTH > 0 && splited[0].length !== PROJECT_KEY_LENGTH) { diff --git a/assist/utils/httpHandlers.js b/assist/utils/httpHandlers.js index 092e9102e..381cdc98a 100644 --- a/assist/utils/httpHandlers.js +++ b/assist/utils/httpHandlers.js @@ -15,16 +15,13 @@ const { RecordRequestDuration, IncreaseTotalRequests } = require('../utils/metrics'); -const { - GetRoomInfo, - GetRooms, - GetSessions, -} = require('../utils/rooms'); +const {fetchSockets} = require("./wsServer"); +const {IDENTITIES} = require("./assistHelper"); const debug_log = process.env.debug === "1"; const respond = function (req, res, data) { - console.log("responding with data: ", data) + console.log("responding with data: ", JSON.stringify(data)) let result = {data} if (process.env.uws !== "true") { res.statusCode = 200; @@ -38,8 +35,18 @@ const respond = function (req, res, data) { RecordRequestDuration(req.method.toLowerCase(), res.handlerName, 200, duration/1000.0); } -const getParticularSession = function (sessionId, filters) { - const sessInfo = GetRoomInfo(sessionId); +const getParticularSession = async function (roomId, filters) { + let connected_sockets = await fetchSockets(roomId); + if (connected_sockets.length === 0) { + return null; + } + let sessInfo; + for (let item of connected_sockets) { + if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo) { + sessInfo = item.handshake.query.sessionInfo; + break; + } + } if (!sessInfo) { return null; } @@ -52,23 +59,41 @@ const getParticularSession = function (sessionId, filters) { return null; } -const getAllSessions = function (projectKey, filters, onlineOnly= false) { +const getAllSessions = async function (projectKey, filters, onlineOnly= false) { const sessions = []; - const allRooms = onlineOnly ? GetSessions(projectKey) : GetRooms(projectKey); + const connected_sockets = await fetchSockets(); + if (connected_sockets.length === 0) { + return sessions; + } - for (let sessionId of allRooms) { - let sessInfo = GetRoomInfo(sessionId); - if (!sessInfo) { + const rooms = new Map(); + for (let item of connected_sockets) { + // Prefilter checks + if (rooms.has(item.handshake.query.roomId)) { continue; } + if (item.handshake.query.projectKey !== projectKey || !item.handshake.query.sessionInfo) { + continue; + } + if (onlineOnly && item.handshake.query.identity !== IDENTITIES.session) { + continue; + } + + // Mark this room as visited + rooms.set(item.handshake.query.roomId, true); + + // Add session to the list without filtering if (!hasFilters(filters)) { - sessions.push(sessInfo); + sessions.push(item.handshake.query.sessionInfo); continue; } - if (isValidSession(sessInfo, filters.filter)) { - sessions.push(sessInfo); + + // Add session to the list if it passes the filter + if (isValidSession(item.handshake.query.sessionInfo, filters.filter)) { + sessions.push(item.handshake.query.sessionInfo); } } + return sessions } @@ -83,11 +108,12 @@ const socketsListByProject = async function (req, res) { // find a particular session if (_sessionId) { - return respond(req, res, getParticularSession(_sessionId, filters)); + const sessInfo = await getParticularSession(`${_projectKey}-${_sessionId}`, filters); + return respond(req, res, sessInfo); } // find all sessions for a project - const sessions = getAllSessions(_projectKey, filters); + const sessions = await getAllSessions(_projectKey, filters); // send response respond(req, res, sortPaginate(sessions, filters)); @@ -104,11 +130,12 @@ const socketsLiveByProject = async function (req, res) { // find a particular session if (_sessionId) { - return respond(req, res, getParticularSession(_sessionId, filters)); + let sessInfo = await getParticularSession(`${_projectKey}-${_sessionId}`, filters); + return respond(req, res, sessInfo); } // find all sessions for a project - const sessions = getAllSessions(_projectKey, filters, true); + const sessions = await getAllSessions(_projectKey, filters, true); // send response respond(req, res, sortPaginate(sessions, filters)); @@ -119,12 +146,14 @@ const socketsLiveBySession = async function (req, res) { debug_log && console.log("[WS]looking for LIVE session"); res.handlerName = 'socketsLiveBySession'; + const _projectKey = extractProjectKeyFromRequest(req); const _sessionId = extractSessionIdFromRequest(req); const filters = await extractPayloadFromRequest(req, res); // find a particular session if (_sessionId) { - return respond(req, res, getParticularSession(_sessionId, filters)); + let sessInfo = await getParticularSession(`${_projectKey}-${_sessionId}`, filters); + return respond(req, res, sessInfo); } return respond(req, res, null); } @@ -140,14 +169,27 @@ const autocomplete = async function (req, res) { if (!hasQuery(filters)) { return respond(req, res, results); } - let allSessions = GetSessions(_projectKey); - for (let sessionId of allSessions) { - let sessInfo = GetRoomInfo(sessionId); - if (!sessInfo) { + + let connected_sockets = await fetchSockets(); + if (connected_sockets.length === 0) { + return results; + } + + const rooms = new Map(); + for (let item of connected_sockets) { + if (rooms.has(item.handshake.query.roomId)) { continue; } - results = [...results, ...getValidAttributes(sessInfo, filters.query)]; + if (item.handshake.query.sessionInfo) { + if ((item.handshake.query.projectKey !== _projectKey) || (item.handshake.query.identity !== IDENTITIES.session)) { + continue; + } + // Mark this room as visited + rooms.set(item.handshake.query.roomId, true); + results.push(...getValidAttributes(item.handshake.query.sessionInfo, filters.query)) + } } + respond(req, res, uniqueAutocomplete(results)); } diff --git a/assist/utils/rooms.js b/assist/utils/rooms.js deleted file mode 100644 index 4e4504253..000000000 --- a/assist/utils/rooms.js +++ /dev/null @@ -1,49 +0,0 @@ -const roomsInfo = new Map(); // sessionID -> sessionInfo -const projectSessions = new Map(); // projectKey -> Set(sessionIDs) // all rooms (even with agent only) -const projectRooms = new Map(); // projectKey -> Set(roomIDs) // online rooms - -function AddRoom(projKey, sessID, sessInfo) { - roomsInfo.set(sessID, sessInfo); - if (!projectRooms.has(projKey)) { - projectRooms.set(projKey, new Set()); - } - projectRooms.get(projKey).add(sessID); - if (!projectSessions.has(projKey)) { - projectSessions.set(projKey, new Set()); - } - projectSessions.get(projKey).add(sessID); -} - -function UpdateRoom(sessID, sessInfo) { - roomsInfo.set(sessID, sessInfo); -} - -function DeleteSession(projKey, sessID) { - projectSessions.get(projKey)?.delete(sessID); -} - -function DeleteRoom(projKey, sessID) { - projectRooms.get(projKey)?.delete(sessID); -} - -function GetRoomInfo(sessID) { - return roomsInfo.get(sessID); -} - -function GetRooms(projectKey) { - return projectRooms.get(projectKey) || new Set(); -} - -function GetSessions(projectKey) { - return projectSessions.get(projectKey) || new Set(); -} - -module.exports = { - AddRoom, - UpdateRoom, - DeleteRoom, - DeleteSession, - GetRoomInfo, - GetRooms, - GetSessions, -} \ No newline at end of file diff --git a/assist/utils/socketHandlers.js b/assist/utils/socketHandlers.js index 9aceafaf4..0638fca37 100644 --- a/assist/utils/socketHandlers.js +++ b/assist/utils/socketHandlers.js @@ -23,12 +23,6 @@ const { IncreaseOnlineRooms, DecreaseOnlineRooms, } = require('../utils/metrics'); -const { - AddRoom, - UpdateRoom, - DeleteRoom, - DeleteSession, -} = require('../utils/rooms'); const debug_log = process.env.debug === "1"; const error_log = process.env.ERROR === "1"; @@ -70,32 +64,29 @@ async function getRoomData(io, roomID) { function processNewSocket(socket) { socket._connectedAt = new Date(); - socket.identity = socket.handshake.query.identity; - socket.peerId = socket.handshake.query.peerId; - let {projectKey: connProjectKey, sessionId: connSessionId, tabId: connTabId} = extractPeerId(socket.peerId); - socket.roomId = `${connProjectKey}-${connSessionId}`; - socket.projectId = socket.handshake.query.projectId; - socket.projectKey = connProjectKey; - socket.sessId = connSessionId; - socket.tabId = connTabId; - debug_log && console.log(`connProjectKey:${connProjectKey}, connSessionId:${connSessionId}, connTabId:${connTabId}, roomId:${socket.roomId}`); + let {projectKey: connProjectKey, sessionId: connSessionId, tabId: connTabId} = extractPeerId(socket.handshake.query.peerId); + socket.handshake.query.roomId = `${connProjectKey}-${connSessionId}`; + socket.handshake.query.projectKey = connProjectKey; + socket.handshake.query.sessId = connSessionId; + socket.handshake.query.tabId = connTabId; + debug_log && console.log(`connProjectKey:${connProjectKey}, connSessionId:${connSessionId}, connTabId:${connTabId}, roomId:${socket.handshake.query.roomId}`); } async function onConnect(socket) { debug_log && console.log(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`); processNewSocket(socket); - IncreaseTotalWSConnections(socket.identity); - IncreaseOnlineConnections(socket.identity); + IncreaseTotalWSConnections(socket.handshake.query.identity); + IncreaseOnlineConnections(socket.handshake.query.identity); const io = getServer(); - const {tabsCount, agentsCount, tabIDs, agentIDs} = await getRoomData(io, socket.roomId); + const {tabsCount, agentsCount, tabIDs, agentIDs} = await getRoomData(io, socket.handshake.query.roomId); - if (socket.identity === IDENTITIES.session) { + if (socket.handshake.query.identity === IDENTITIES.session) { // Check if session with the same tabID already connected, if so, refuse new connexion if (tabsCount > 0) { for (let tab of tabIDs) { - if (tab === socket.tabId) { - error_log && console.log(`session already connected, refusing new connexion, peerId: ${socket.peerId}`); + if (tab === socket.handshake.query.tabId) { + error_log && console.log(`session already connected, refusing new connexion, peerId: ${socket.handshake.query.peerId}`); io.to(socket.id).emit(EVENTS_DEFINITION.emit.SESSION_ALREADY_CONNECTED); return socket.disconnect(); } @@ -106,35 +97,34 @@ async function onConnect(socket) { // New session creates new room IncreaseTotalRooms(); IncreaseOnlineRooms(); - AddRoom(socket.projectKey, socket.sessId, socket.handshake.query.sessionInfo); } // Inform all connected agents about reconnected session if (agentsCount > 0) { debug_log && console.log(`notifying new session about agent-existence`); io.to(socket.id).emit(EVENTS_DEFINITION.emit.AGENTS_CONNECTED, agentIDs); - socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id); + socket.to(socket.handshake.query.roomId).emit(EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id); } } else if (tabsCount <= 0) { - debug_log && console.log(`notifying new agent about no SESSIONS with peerId:${socket.peerId}`); + debug_log && console.log(`notifying new agent about no SESSIONS with peerId:${socket.handshake.query.peerId}`); io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); } - await socket.join(socket.roomId); + await socket.join(socket.handshake.query.roomId); if (debug_log) { - let connectedSockets = await io.in(socket.roomId).fetchSockets(); + let connectedSockets = await io.in(socket.handshake.query.roomId).fetchSockets(); if (connectedSockets.length > 0) { - console.log(`${socket.id} joined room:${socket.roomId}, as:${socket.identity}, members:${connectedSockets.length}`); + console.log(`${socket.id} joined room:${socket.handshake.query.roomId}, as:${socket.handshake.query.identity}, members:${connectedSockets.length}`); } } - if (socket.identity === IDENTITIES.agent) { + if (socket.handshake.query.identity === IDENTITIES.agent) { if (socket.handshake.query.agentInfo !== undefined) { socket.handshake.query.agentInfo = JSON.parse(socket.handshake.query.agentInfo); - socket.agentID = socket.handshake.query.agentInfo.id; + socket.handshake.query.agentID = socket.handshake.query.agentInfo.id; // Stats - startAssist(socket, socket.agentID); + startAssist(socket, socket.handshake.query.agentID); } - socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo); + socket.to(socket.handshake.query.roomId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo); } // Set disconnect handler @@ -153,50 +143,46 @@ async function onConnect(socket) { } async function onDisconnect(socket) { - DecreaseOnlineConnections(socket.identity); - debug_log && console.log(`${socket.id} disconnected from ${socket.roomId}`); + DecreaseOnlineConnections(socket.handshake.query.identity); + debug_log && console.log(`${socket.id} disconnected from ${socket.handshake.query.roomId}`); - if (socket.identity === IDENTITIES.agent) { - socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id); + if (socket.handshake.query.identity === IDENTITIES.agent) { + socket.to(socket.handshake.query.roomId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id); // Stats - endAssist(socket, socket.agentID); + endAssist(socket, socket.handshake.query.agentID); } debug_log && console.log("checking for number of connected agents and sessions"); const io = getServer(); - let {tabsCount, agentsCount, tabIDs, agentIDs} = await getRoomData(io, socket.roomId); + let {tabsCount, agentsCount, tabIDs, agentIDs} = await getRoomData(io, socket.handshake.query.roomId); if (tabsCount === -1 && agentsCount === -1) { DecreaseOnlineRooms(); - debug_log && console.log(`room not found: ${socket.roomId}`); - DeleteSession(socket.projectKey, socket.sessId); - DeleteRoom(socket.projectKey, socket.sessId); + debug_log && console.log(`room not found: ${socket.handshake.query.roomId}`); return; } if (tabsCount === 0) { - debug_log && console.log(`notifying everyone in ${socket.roomId} about no SESSIONS`); - socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); - DeleteSession(socket.projectKey, socket.sessId); + debug_log && console.log(`notifying everyone in ${socket.handshake.query.roomId} about no SESSIONS`); + socket.to(socket.handshake.query.roomId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); } if (agentsCount === 0) { - debug_log && console.log(`notifying everyone in ${socket.roomId} about no AGENTS`); - socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_AGENTS); + debug_log && console.log(`notifying everyone in ${socket.handshake.query.roomId} about no AGENTS`); + socket.to(socket.handshake.query.roomId).emit(EVENTS_DEFINITION.emit.NO_AGENTS); } } async function onUpdateEvent(socket, ...args) { debug_log && console.log(`${socket.id} sent update event.`); - if (socket.identity !== IDENTITIES.session) { + if (socket.handshake.query.identity !== IDENTITIES.session) { debug_log && console.log('Ignoring update event.'); return } args[0] = updateSessionData(socket, args[0]) Object.assign(socket.handshake.query.sessionInfo, args[0].data, {tabId: args[0]?.meta?.tabId}); - UpdateRoom(socket.sessId, socket.handshake.query.sessionInfo); // Update sessionInfo for all agents in the room const io = getServer(); - const connected_sockets = await io.in(socket.roomId).fetchSockets(); + const connected_sockets = await io.in(socket.handshake.query.roomId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo) { Object.assign(item.handshake.query.sessionInfo, args[0]?.data, {tabId: args[0]?.meta?.tabId}); @@ -212,17 +198,17 @@ async function onAny(socket, eventName, ...args) { return } args[0] = updateSessionData(socket, args[0]) - if (socket.identity === IDENTITIES.session) { - debug_log && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.roomId}`); - socket.to(socket.roomId).emit(eventName, args[0]); + if (socket.handshake.query.identity === IDENTITIES.session) { + debug_log && console.log(`received event:${eventName}, from:${socket.handshake.query.identity}, sending message to room:${socket.handshake.query.roomId}`); + socket.to(socket.handshake.query.roomId).emit(eventName, args[0]); } else { // Stats handleEvent(eventName, socket, args[0]); - debug_log && console.log(`received event:${eventName}, from:${socket.identity}, sending message to session of room:${socket.roomId}`); + debug_log && console.log(`received event:${eventName}, from:${socket.handshake.query.identity}, sending message to session of room:${socket.handshake.query.roomId}`); const io = getServer(); - let socketId = await findSessionSocketId(io, socket.roomId, args[0]?.meta?.tabId); + let socketId = await findSessionSocketId(io, socket.handshake.query.roomId, args[0]?.meta?.tabId); if (socketId === null) { - debug_log && console.log(`session not found for:${socket.roomId}`); + debug_log && console.log(`session not found for:${socket.handshake.query.roomId}`); io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); } else { debug_log && console.log("message sent"); @@ -233,7 +219,7 @@ async function onAny(socket, eventName, ...args) { // Back compatibility (add top layer with meta information) function updateSessionData(socket, sessionData) { - if (sessionData?.meta === undefined && socket.identity === IDENTITIES.session) { + if (sessionData?.meta === undefined && socket.handshake.query.identity === IDENTITIES.session) { sessionData = {meta: {tabId: socket.tabId, version: 1}, data: sessionData}; } return sessionData diff --git a/assist/utils/wsServer.js b/assist/utils/wsServer.js index 00165eafd..bac62e750 100644 --- a/assist/utils/wsServer.js +++ b/assist/utils/wsServer.js @@ -7,6 +7,16 @@ const getServer = function () { return io; } +const fetchSockets = async function (roomID) { + if (!io) { + return []; + } + if (!roomID) { + return await io.fetchSockets(); + } + return await io.in(roomID).fetchSockets(); +} + const createSocketIOServer = function (server, prefix) { if (io) { return io; @@ -26,4 +36,5 @@ const createSocketIOServer = function (server, prefix) { module.exports = { createSocketIOServer, getServer, + fetchSockets, } \ No newline at end of file diff --git a/ee/assist/servers/websocket-cluster.js b/ee/assist/servers/websocket-cluster.js index 95e0c357c..42bd61bdc 100644 --- a/ee/assist/servers/websocket-cluster.js +++ b/ee/assist/servers/websocket-cluster.js @@ -14,7 +14,7 @@ const { socketsLiveByProject, socketsLiveBySession, autocomplete -} = require('../utils/httpHandlers-cluster'); +} = require('../utils/httpHandlers'); const {createAdapter} = require("@socket.io/redis-adapter"); const {createClient} = require("redis"); diff --git a/ee/assist/utils/httpHandlers-cluster.js b/ee/assist/utils/httpHandlers-cluster.js deleted file mode 100644 index c24135340..000000000 --- a/ee/assist/utils/httpHandlers-cluster.js +++ /dev/null @@ -1,176 +0,0 @@ -const { - hasFilters, - extractPeerId, - isValidSession, - sortPaginate, - getValidAttributes, - uniqueAutocomplete -} = require("./helper"); -const { - extractProjectKeyFromRequest, - extractSessionIdFromRequest, - extractPayloadFromRequest, - getAvailableRooms -} = require("./extractors"); -const { - IDENTITIES -} = require("./assistHelper"); -const { - getServer -} = require('../utils/wsServer'); -const { - RecordRequestDuration, - IncreaseTotalRequests -} = require('../utils/metrics'); - -const debug_log = process.env.debug === "1"; - -const respond = function (req, res, data) { - let result = {data} - if (process.env.uws !== "true") { - res.statusCode = 200; - res.setHeader('Content-Type', 'application/json'); - res.end(JSON.stringify(result)); - } else { - res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify(result)); - } - const duration = performance.now() - req.startTs; - IncreaseTotalRequests(); - RecordRequestDuration(req.method.toLowerCase(), res.handlerName, 200, duration/1000.0); -} - -const socketsListByProject = async function (req, res) { - debug_log && console.log("[WS]looking for available sessions"); - res.handlerName = 'socketsListByProject'; - - let io = getServer(); - let _projectKey = extractProjectKeyFromRequest(req); - let _sessionId = extractSessionIdFromRequest(req); - if (_sessionId === undefined) { - return respond(req, res, null); - } - let filters = await extractPayloadFromRequest(req, res); - - let connected_sockets = await io.in(_projectKey + '-' + _sessionId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo - && isValidSession(item.handshake.query.sessionInfo, filters.filter)) { - return respond(req, res, _sessionId); - } - } - respond(req, res, null); -} - -const socketsLiveByProject = async function (req, res) { - debug_log && console.log("[WS]looking for available LIVE sessions"); - res.handlerName = 'socketsLiveByProject'; - - let io = getServer(); - let _projectKey = extractProjectKeyFromRequest(req); - let _sessionId = extractSessionIdFromRequest(req); - let filters = await extractPayloadFromRequest(req, res); - let withFilters = hasFilters(filters); - let liveSessions = new Set(); - const sessIDs = new Set(); - - let rooms = await getAvailableRooms(io); - for (let roomId of rooms.keys()) { - let {projectKey, sessionId} = extractPeerId(roomId); - if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) { - let connected_sockets = await io.in(roomId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session) { - if (withFilters) { - if (item.handshake.query.sessionInfo && - isValidSession(item.handshake.query.sessionInfo, filters.filter) && - !sessIDs.has(item.handshake.query.sessionInfo.sessionID) - ) { - liveSessions.add(item.handshake.query.sessionInfo); - sessIDs.add(item.handshake.query.sessionInfo.sessionID); - } - } else { - if (!sessIDs.has(item.handshake.query.sessionInfo.sessionID)) { - liveSessions.add(item.handshake.query.sessionInfo); - sessIDs.add(item.handshake.query.sessionInfo.sessionID); - } - } - } - } - } - } - let sessions = Array.from(liveSessions); - respond(req, res, _sessionId === undefined ? sortPaginate(sessions, filters) : sessions.length > 0 ? sessions[0] : null); -} - -const socketsLiveBySession = async function (req, res) { - debug_log && console.log("[WS]looking for LIVE session"); - res.handlerName = 'socketsLiveBySession'; - - let io = getServer(); - let _projectKey = extractProjectKeyFromRequest(req); - let _sessionId = extractSessionIdFromRequest(req); - if (_sessionId === undefined) { - return respond(req, res, null); - } - let filters = await extractPayloadFromRequest(req, res); - let withFilters = hasFilters(filters); - let liveSessions = new Set(); - const sessIDs = new Set(); - - let connected_sockets = await io.in(_projectKey + '-' + _sessionId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session) { - if (withFilters) { - if (item.handshake.query.sessionInfo && - isValidSession(item.handshake.query.sessionInfo, filters.filter) && - !sessIDs.has(item.handshake.query.sessionInfo.sessionID) - ) { - liveSessions.add(item.handshake.query.sessionInfo); - sessIDs.add(item.handshake.query.sessionInfo.sessionID); - } - } else { - if (!sessIDs.has(item.handshake.query.sessionInfo.sessionID)) { - liveSessions.add(item.handshake.query.sessionInfo); - sessIDs.add(item.handshake.query.sessionInfo.sessionID); - } - } - } - - } - let sessions = Array.from(liveSessions); - respond(req, res, sessions.length > 0 ? sessions[0] : null); -} - -const autocomplete = async function (req, res) { - debug_log && console.log("[WS]autocomplete"); - res.handlerName = 'autocomplete'; - - let io = getServer(); - let _projectKey = extractProjectKeyFromRequest(req); - let filters = await extractPayloadFromRequest(req); - let results = []; - - if (filters.query && Object.keys(filters.query).length > 0) { - let rooms = await getAvailableRooms(io); - for (let roomId of rooms.keys()) { - let {projectKey} = extractPeerId(roomId); - if (projectKey === _projectKey) { - let connected_sockets = await io.in(roomId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo) { - results = [...results, ...getValidAttributes(item.handshake.query.sessionInfo, filters.query)]; - } - } - } - } - } - respond(req, res, uniqueAutocomplete(results)); -} - -module.exports = { - respond, - socketsListByProject, - socketsLiveByProject, - socketsLiveBySession, - autocomplete -} \ No newline at end of file diff --git a/ee/assist/utils/wsServer.js b/ee/assist/utils/wsServer.js index e20f07a7f..34afc098b 100644 --- a/ee/assist/utils/wsServer.js +++ b/ee/assist/utils/wsServer.js @@ -7,6 +7,54 @@ const getServer = function () { return io; } +let redisClient; +const useRedis = process.env.redis === "true"; + +if (useRedis) { + const {createClient} = require("redis"); + const REDIS_URL = (process.env.REDIS_URL || "localhost:6379").replace(/((^\w+:|^)\/\/|^)/, 'redis://'); + redisClient = createClient({url: REDIS_URL}); + redisClient.on("error", (error) => console.error(`Redis error : ${error}`)); + void redisClient.connect(); +} + +const processSocketsList = function (sockets) { + let res = [] + for (let socket of sockets) { + let {handshake} = socket; + res.push({handshake}); + } + return res +} + +const doFetchAllSockets = async function () { + if (useRedis) { + try { + let cachedResult = await redisClient.get('fetchSocketsResult'); + if (cachedResult) { + return JSON.parse(cachedResult); + } + let result = await io.fetchSockets(); + let cachedString = JSON.stringify(processSocketsList(result)); + await redisClient.set('fetchSocketsResult', cachedString, {EX: 5}); + return result; + } catch (error) { + console.error('Error setting value with expiration:', error); + } + } + return await io.fetchSockets(); +} + +const fetchSockets = async function (roomID) { + if (!io) { + return []; + } + if (!roomID) { + return await doFetchAllSockets(); + } + return await io.in(roomID).fetchSockets(); +} + const createSocketIOServer = function (server, prefix) { if (io) { return io; @@ -41,4 +89,5 @@ const createSocketIOServer = function (server, prefix) { module.exports = { createSocketIOServer, getServer, + fetchSockets, } \ No newline at end of file