diff --git a/ee/assist/.gitignore b/ee/assist/.gitignore index 085b1a57e..95efe934d 100644 --- a/ee/assist/.gitignore +++ b/ee/assist/.gitignore @@ -15,7 +15,7 @@ servers/sourcemaps-server.js /utils/HeapSnapshot.js /utils/helper.js /utils/assistHelper.js -/utils/httpHandlers.js -/utils/socketHandlers.js +#/utils/httpHandlers.js +#/utils/socketHandlers.js .local *.mmdb \ No newline at end of file diff --git a/ee/assist/utils/httpHandlers.js b/ee/assist/utils/httpHandlers.js new file mode 100644 index 000000000..b92eddcb2 --- /dev/null +++ b/ee/assist/utils/httpHandlers.js @@ -0,0 +1,223 @@ +const { + hasFilters, + hasQuery, + isValidSession, + sortPaginate, + getValidAttributes, + uniqueAutocomplete +} = require("./helper"); +const { + extractProjectKeyFromRequest, + extractSessionIdFromRequest, + extractPayloadFromRequest, +} = require("./extractors"); +const { + RecordRequestDuration, + IncreaseTotalRequests +} = require('../utils/metrics'); +const {fetchSockets, getSessionFromCache} = require("./wsServer"); +const {IDENTITIES} = require("./assistHelper"); +const {logger} = require('./logger'); + +const respond = function (req, res, data) { + logger.debug("responding with data: ", JSON.stringify(data)) + let result = {data} + if (process.env.uws !== "true") { + res.statusCode = 200; + res.setHeader('Content-Type', 'application/json'); + res.end(JSON.stringify(result)); + } else { + if (!res.aborted) { + res.cork(() => { + res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify(result)); + }); + } else { + logger.debug("response aborted"); + return; + } + } + const duration = performance.now() - req.startTs; + IncreaseTotalRequests(); + RecordRequestDuration(req.method.toLowerCase(), res.handlerName, 200, duration/1000.0); +} + +const getParticularSession = async function (roomId, filters, all=false) { + // let connected_sockets = await fetchSockets(roomId, all); + // if (connected_sockets.length === 0) { + // return null; + // } + // let sessInfo; + // for (let item of connected_sockets) { + // if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo) { + // sessInfo = item.handshake.query.sessionInfo; + // break; + // } + // } + let sessInfo = await getSessionFromCache(); + if (!sessInfo) { + return null; + } + if (!hasFilters(filters)) { + return sessInfo; + } + const result = isValidSession(sessInfo, filters.filter) + if (result.matched) { + return sessInfo; + } + return null; +} + +const getAllSessions = async function (projectKey, filters, counters, onlineOnly= false) { + const sessions = []; + const connected_sockets = await fetchSockets(); + if (connected_sockets.length === 0) { + return sessions; + } + + const rooms = new Map(); + for (let item of connected_sockets) { + // Prefilter checks + if (rooms.has(item.handshake.query.roomId)) { + continue; + } + if (item.handshake.query.projectKey !== projectKey || !item.handshake.query.sessionInfo) { + continue; + } + if (onlineOnly && item.handshake.query.identity !== IDENTITIES.session) { + continue; + } + + // Mark this room as visited + rooms.set(item.handshake.query.roomId, true); + + // Add session to the list without filtering + if (!hasFilters(filters)) { + sessions.push(item.handshake.query.sessionInfo); + continue; + } + + // Add session to the list if it passes the filter + const result = isValidSession(item.handshake.query.sessionInfo, filters.filter) + if (result.matched) { + sessions.push(item.handshake.query.sessionInfo); + // Add filter name/value to counter + for (const [filterName, filterValue] of Object.entries(result.filters)) { + if (counters[filterName] === undefined) { + counters[filterName] = {}; + } + if (counters[filterName][filterValue] === undefined) { + counters[filterName][filterValue] = 0; + } + counters[filterName][filterValue] += 1; + } + } + } + + return sessions +} + +// Sort by projectKey +const socketsListByProject = async function (req, res) { + logger.debug("[WS]looking for available sessions"); + res.handlerName = 'socketsListByProject'; + + const _projectKey = extractProjectKeyFromRequest(req); + const _sessionId = extractSessionIdFromRequest(req); + const filters = await extractPayloadFromRequest(req, res); + + // find a particular session + if (_sessionId) { + const sessInfo = await getParticularSession(_sessionId, filters);//(`${_projectKey}-${_sessionId}`, filters); + return respond(req, res, sessInfo); + } + + // find all sessions for a project + const counters = {}; + const sessions = await getAllSessions(_projectKey, filters, counters); + + // send response + respond(req, res, sortPaginate(sessions, filters, counters)); +} + +// Sort by projectKey +const socketsLiveByProject = async function (req, res) { + logger.debug("[WS]looking for available LIVE sessions"); + res.handlerName = 'socketsLiveByProject'; + + const _projectKey = extractProjectKeyFromRequest(req); + const _sessionId = extractSessionIdFromRequest(req); + const filters = await extractPayloadFromRequest(req, res); + + // find a particular session + if (_sessionId) { + let sessInfo = await getParticularSession(_sessionId, filters);//(`${_projectKey}-${_sessionId}`, filters); + return respond(req, res, sessInfo); + } + + // find all sessions for a project + const counters = {}; + const sessions = await getAllSessions(_projectKey, filters, counters, true); + + // send response + respond(req, res, sortPaginate(sessions, filters, counters)); +} + +// Sort by roomID (projectKey+sessionId) +const socketsLiveBySession = async function (req, res) { + logger.debug("[WS]looking for LIVE session"); + res.handlerName = 'socketsLiveBySession'; + + const _projectKey = extractProjectKeyFromRequest(req); + const _sessionId = extractSessionIdFromRequest(req); + const filters = await extractPayloadFromRequest(req, res); + + // find a particular session + if (_sessionId) { + let sessInfo = await getParticularSession(_sessionId, filters);//(`${_projectKey}-${_sessionId}`, filters, true); + return respond(req, res, sessInfo); + } + return respond(req, res, null); +} + +// Sort by projectKey +const autocomplete = async function (req, res) { + logger.debug("[WS]autocomplete"); + res.handlerName = 'autocomplete'; + + const _projectKey = extractProjectKeyFromRequest(req); + const filters = await extractPayloadFromRequest(req); + let results = []; + if (!hasQuery(filters)) { + return respond(req, res, results); + } + + let connected_sockets = await fetchSockets(); + if (connected_sockets.length === 0) { + return results; + } + + const rooms = new Map(); + for (let item of connected_sockets) { + if (rooms.has(item.handshake.query.roomId)) { + continue; + } + if (item.handshake.query.sessionInfo) { + if ((item.handshake.query.projectKey !== _projectKey) || (item.handshake.query.identity !== IDENTITIES.session)) { + continue; + } + // Mark this room as visited + rooms.set(item.handshake.query.roomId, true); + results.push(...getValidAttributes(item.handshake.query.sessionInfo, filters.query)) + } + } + + respond(req, res, uniqueAutocomplete(results)); +} + +module.exports = { + respond, + socketsListByProject, + socketsLiveByProject, + socketsLiveBySession, + autocomplete +} \ No newline at end of file diff --git a/ee/assist/utils/socketHandlers.js b/ee/assist/utils/socketHandlers.js new file mode 100644 index 000000000..e1b349b64 --- /dev/null +++ b/ee/assist/utils/socketHandlers.js @@ -0,0 +1,247 @@ +const { + extractPeerId, +} = require("./helper"); +const { + IDENTITIES, + EVENTS_DEFINITION, + extractSessionInfo, + errorHandler +} = require("./assistHelper"); +const { + startAssist, + endAssist, + handleEvent +} = require("./stats"); +const { + sendTo, + sendFrom, + fetchSockets, + addSessionToCache, + getSessionFromCache, + removeSessionFromCache +} = require('../utils/wsServer'); +const { + IncreaseTotalWSConnections, + IncreaseOnlineConnections, + DecreaseOnlineConnections, + IncreaseTotalRooms, + IncreaseOnlineRooms, + DecreaseOnlineRooms, +} = require('../utils/metrics'); +const {logger} = require('./logger'); +const deepMerge = require('@fastify/deepmerge')({all: true}); + +const findSessionSocketId = async (roomId, tabId) => { + let pickFirstSession = tabId === undefined; + const connected_sockets = await fetchSockets(roomId); + for (let socket of connected_sockets) { + if (socket.handshake.query.identity === IDENTITIES.session) { + if (pickFirstSession) { + return socket.id; + } else if (socket.handshake.query.tabId === tabId) { + return socket.id; + } + } + } + return null; +}; + +async function getRoomData(roomID) { + let tabsCount = 0, agentsCount = 0, tabIDs = [], agentIDs = []; + const connected_sockets = await fetchSockets(roomID); + if (connected_sockets.length > 0) { + for (let socket of connected_sockets) { + if (socket.handshake.query.identity === IDENTITIES.session) { + tabsCount++; + tabIDs.push(socket.handshake.query.tabId); + } else { + agentsCount++; + agentIDs.push(socket.id); + } + } + } else { + tabsCount = -1; + agentsCount = -1; + } + return {tabsCount, agentsCount, tabIDs, agentIDs}; +} + +function processNewSocket(socket) { + socket._connectedAt = new Date(); + let {projectKey: connProjectKey, sessionId: connSessionId, tabId: connTabId} = extractPeerId(socket.handshake.query.peerId); + socket.handshake.query.roomId = `${connProjectKey}-${connSessionId}`; + socket.handshake.query.projectKey = connProjectKey; + socket.handshake.query.sessId = connSessionId; + socket.handshake.query.tabId = connTabId; + logger.debug(`connProjectKey:${connProjectKey}, connSessionId:${connSessionId}, connTabId:${connTabId}, roomId:${socket.handshake.query.roomId}`); +} + +async function onConnect(socket) { + logger.debug(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`); + processNewSocket(socket); + IncreaseTotalWSConnections(socket.handshake.query.identity); + IncreaseOnlineConnections(socket.handshake.query.identity); + + const {tabsCount, agentsCount, tabIDs, agentIDs} = await getRoomData(socket.handshake.query.roomId); + + if (socket.handshake.query.identity === IDENTITIES.session) { + // Check if session with the same tabID already connected, if so, refuse new connexion + if (tabsCount > 0) { + for (let tab of tabIDs) { + if (tab === socket.handshake.query.tabId) { + logger.debug(`session already connected, refusing new connexion, peerId: ${socket.handshake.query.peerId}`); + sendTo(socket.id, EVENTS_DEFINITION.emit.SESSION_ALREADY_CONNECTED); + return socket.disconnect(); + } + } + } + extractSessionInfo(socket); + if (tabsCount < 0) { + // New session creates new room + IncreaseTotalRooms(); + IncreaseOnlineRooms(); + } + // Inform all connected agents about reconnected session + if (agentsCount > 0) { + logger.debug(`notifying new session about agent-existence`); + sendTo(socket.id, EVENTS_DEFINITION.emit.AGENTS_CONNECTED, agentIDs); + sendFrom(socket, socket.handshake.query.roomId, EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id); + } + } else if (tabsCount <= 0) { + logger.debug(`notifying new agent about no SESSIONS with peerId:${socket.handshake.query.peerId}`); + sendTo(socket.id, EVENTS_DEFINITION.emit.NO_SESSIONS); + } + await socket.join(socket.handshake.query.roomId); + + // Add session to cache + if (socket.handshake.query.identity === IDENTITIES.session) { + await addSessionToCache(socket.handshake.query.sessId, socket.handshake.query.sessionInfo); + } + + logger.debug(`${socket.id} joined room:${socket.handshake.query.roomId}, as:${socket.handshake.query.identity}, connections:${agentsCount + tabsCount + 1}`) + + if (socket.handshake.query.identity === IDENTITIES.agent) { + if (socket.handshake.query.agentInfo !== undefined) { + socket.handshake.query.agentInfo = JSON.parse(socket.handshake.query.agentInfo); + socket.handshake.query.agentID = socket.handshake.query.agentInfo.id; + // Stats + startAssist(socket, socket.handshake.query.agentID); + } + sendFrom(socket, socket.handshake.query.roomId, EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo); + } + + // Set disconnect handler + socket.on('disconnect', () => onDisconnect(socket)); + + // Handle update event + socket.on(EVENTS_DEFINITION.listen.UPDATE_EVENT, (...args) => onUpdateEvent(socket, ...args)); + + // Handle webrtc events + socket.on(EVENTS_DEFINITION.listen.WEBRTC_AGENT_CALL, (...args) => onWebrtcAgentHandler(socket, ...args)); + + // Handle errors + socket.on(EVENTS_DEFINITION.listen.ERROR, err => errorHandler(EVENTS_DEFINITION.listen.ERROR, err)); + socket.on(EVENTS_DEFINITION.listen.CONNECT_ERROR, err => errorHandler(EVENTS_DEFINITION.listen.CONNECT_ERROR, err)); + socket.on(EVENTS_DEFINITION.listen.CONNECT_FAILED, err => errorHandler(EVENTS_DEFINITION.listen.CONNECT_FAILED, err)); + + // Handle all other events + socket.onAny((eventName, ...args) => onAny(socket, eventName, ...args)); +} + +async function onDisconnect(socket) { + DecreaseOnlineConnections(socket.handshake.query.identity); + logger.debug(`${socket.id} disconnected from ${socket.handshake.query.roomId}`); + + if (socket.handshake.query.identity === IDENTITIES.agent) { + sendFrom(socket, socket.handshake.query.roomId, EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id); + // Stats + endAssist(socket, socket.handshake.query.agentID); + } + logger.debug("checking for number of connected agents and sessions"); + let {tabsCount, agentsCount, tabIDs, agentIDs} = await getRoomData(socket.handshake.query.roomId); + + if (tabsCount <= 0) { + await removeSessionFromCache(socket.handshake.query.sessId); + } + + if (tabsCount === -1 && agentsCount === -1) { + DecreaseOnlineRooms(); + logger.debug(`room not found: ${socket.handshake.query.roomId}`); + return; + } + if (tabsCount === 0) { + logger.debug(`notifying everyone in ${socket.handshake.query.roomId} about no SESSIONS`); + sendFrom(socket, socket.handshake.query.roomId, EVENTS_DEFINITION.emit.NO_SESSIONS); + } + if (agentsCount === 0) { + logger.debug(`notifying everyone in ${socket.handshake.query.roomId} about no AGENTS`); + sendFrom(socket, socket.handshake.query.roomId, EVENTS_DEFINITION.emit.NO_AGENTS); + } +} + +async function onUpdateEvent(socket, ...args) { + logger.debug(`${socket.id} sent update event.`); + if (socket.handshake.query.identity !== IDENTITIES.session) { + logger.debug('Ignoring update event.'); + return + } + + args[0] = updateSessionData(socket, args[0]) + socket.handshake.query.sessionInfo = deepMerge(socket.handshake.query.sessionInfo, args[0]?.data, {tabId: args[0]?.meta?.tabId}); + + // Update sessionInfo for all agents in the room + const connected_sockets = await fetchSockets(socket.handshake.query.roomId); + for (let item of connected_sockets) { + if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo) { + item.handshake.query.sessionInfo = deepMerge(item.handshake.query.sessionInfo, args[0]?.data, {tabId: args[0]?.meta?.tabId}); + } else if (item.handshake.query.identity === IDENTITIES.agent) { + sendFrom(socket, item.id, EVENTS_DEFINITION.emit.UPDATE_EVENT, args[0]); + } + } +} + +async function onWebrtcAgentHandler(socket, ...args) { + if (socket.handshake.query.identity === IDENTITIES.agent) { + const agentIdToConnect = args[0]?.data?.toAgentId; + logger.debug(`${socket.id} sent webrtc event to agent:${agentIdToConnect}`); + if (agentIdToConnect && socket.handshake.sessionData.AGENTS_CONNECTED.includes(agentIdToConnect)) { + sendFrom(socket, agentIdToConnect, EVENTS_DEFINITION.listen.WEBRTC_AGENT_CALL, args[0]); + } + } +} + +async function onAny(socket, eventName, ...args) { + if (Object.values(EVENTS_DEFINITION.listen).indexOf(eventName) >= 0) { + logger.debug(`received event:${eventName}, should be handled by another listener, stopping onAny.`); + return + } + args[0] = updateSessionData(socket, args[0]) + if (socket.handshake.query.identity === IDENTITIES.session) { + logger.debug(`received event:${eventName}, from:${socket.handshake.query.identity}, sending message to room:${socket.handshake.query.roomId}`); + sendFrom(socket, socket.handshake.query.roomId, eventName, args[0]); + } else { + // Stats + handleEvent(eventName, socket, args[0]); + logger.debug(`received event:${eventName}, from:${socket.handshake.query.identity}, sending message to session of room:${socket.handshake.query.roomId}`); + let socketId = await findSessionSocketId(socket.handshake.query.roomId, args[0]?.meta?.tabId); + if (socketId === null) { + logger.debug(`session not found for:${socket.handshake.query.roomId}`); + sendTo(socket.id, EVENTS_DEFINITION.emit.NO_SESSIONS); + } else { + logger.debug("message sent"); + sendTo(socket.id, eventName, socket.id, args[0]); + } + } +} + +// Back compatibility (add top layer with meta information) +function updateSessionData(socket, sessionData) { + if (sessionData?.meta === undefined && socket.handshake.query.identity === IDENTITIES.session) { + sessionData = {meta: {tabId: socket.handshake.query.tabId, version: 1}, data: sessionData}; + } + return sessionData +} + +module.exports = { + onConnect, +} diff --git a/ee/assist/utils/wsServer.js b/ee/assist/utils/wsServer.js index 2b7345868..6c9e258ab 100644 --- a/ee/assist/utils/wsServer.js +++ b/ee/assist/utils/wsServer.js @@ -10,6 +10,47 @@ let inMemorySocketsCache = []; let lastCacheUpdateTime = 0; const CACHE_REFRESH_INTERVAL = parseInt(process.env.cacheRefreshInterval) || 5000; +let redisClient; +if (useRedis) { + const {createClient} = require("redis"); + const REDIS_URL = (process.env.REDIS_URL || "localhost:6379").replace(/((^\w+:|^)\/\/|^)/, 'redis://'); + redisClient = createClient({url: REDIS_URL}); + redisClient.on("error", (error) => logger.error(`Redis error : ${error}`)); + void redisClient.connect(); +} + +const addSessionToCache = async function (sessionID, sessionData) { + try { + await redisClient.set(`active_sessions:${sessionID}`, JSON.stringify(sessionData), 'EX', 3600); // 60 minutes + console.log(`Session ${sessionID} stored in Redis`); + } catch (error) { + console.log(error); + } +} + +const getSessionFromCache = async function (sessionID) { + try { + const sessionData = await redisClient.get(`active_sessions:${sessionID}`); + if (sessionData) { + console.log(`Session ${sessionID} retrieved from Redis`); + return JSON.parse(sessionData); + } + return null; + } catch (error) { + console.log(error); + return null; + } +} + +const removeSessionFromCache = async function (sessionID) { + try { + await redisClient.del(`active_sessions:${sessionID}`); + console.log(`Session ${sessionID} removed from Redis`); + } catch (error) { + console.log(error); + } +} + const doFetchAllSockets = async function () { if (useRedis) { const now = Date.now(); @@ -119,4 +160,7 @@ module.exports = { sendTo, sendFrom, fetchSockets, + addSessionToCache, + getSessionFromCache, + removeSessionFromCache, } \ No newline at end of file