diff --git a/assist/Dockerfile b/assist/Dockerfile index f8e5ac748..b316f3103 100644 --- a/assist/Dockerfile +++ b/assist/Dockerfile @@ -1,4 +1,7 @@ -FROM node:20-alpine +#ARCH can be amd64 or arm64 +ARG ARCH=amd64 + +FROM --platform=linux/$ARCH node:20-alpine LABEL Maintainer="KRAIEM Taha Yassine" RUN apk add --no-cache tini diff --git a/assist/servers/websocket.js b/assist/servers/websocket.js index 350e79953..4ea23b67d 100644 --- a/assist/servers/websocket.js +++ b/assist/servers/websocket.js @@ -1,14 +1,5 @@ const express = require('express'); const { - extractPeerId, - hasFilters, - isValidSession, - extractPayloadFromRequest, - sortPaginate, - getAvailableRooms, -} = require('../utils/helper'); -const { - IDENTITIES, socketConnexionTimeout, authorizer } = require('../utils/assistHelper'); @@ -19,66 +10,21 @@ const { createSocketIOServer } = require('../utils/wsServer'); const { - respond, - socketsList, socketsListByProject, socketsLiveByProject, socketsLiveBySession, autocomplete } = require('../utils/httpHandlers'); -let io; - const wsRouter = express.Router(); - -const debug_log = process.env.debug === "1"; - -const socketsLive = async function (req, res) { - res.handlerName = 'socketsLive'; - debug_log && console.log("[WS]looking for all available LIVE sessions"); - let filters = await extractPayloadFromRequest(req, res); - let withFilters = hasFilters(filters); - let liveSessionsPerProject = {}; - let rooms = await getAvailableRooms(io); - for (let roomId of rooms.keys()) { - let {projectKey} = extractPeerId(roomId); - if (projectKey !== undefined) { - let connected_sockets = await io.in(roomId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session) { - liveSessionsPerProject[projectKey] = liveSessionsPerProject[projectKey] || new Set(); - if (withFilters) { - if (item.handshake.query.sessionInfo && isValidSession(item.handshake.query.sessionInfo, filters.filter)) { - liveSessionsPerProject[projectKey].add(item.handshake.query.sessionInfo); - } - } else { - liveSessionsPerProject[projectKey].add(item.handshake.query.sessionInfo); - } - } - } - } - } - let liveSessions = {}; - liveSessionsPerProject.forEach((sessions, projectId) => { - liveSessions[projectId] = Array.from(sessions); - }); - respond(req, res, sortPaginate(liveSessions, filters)); -} - -wsRouter.get(`/sockets-list`, socketsList); -wsRouter.post(`/sockets-list`, socketsList); -wsRouter.get(`/sockets-list/:projectKey/autocomplete`, autocomplete); -wsRouter.get(`/sockets-list/:projectKey`, socketsListByProject); -wsRouter.post(`/sockets-list/:projectKey`, socketsListByProject); -wsRouter.get(`/sockets-list/:projectKey/:sessionId`, socketsListByProject); - -wsRouter.get(`/sockets-live`, socketsLive); -wsRouter.post(`/sockets-live`, socketsLive); -wsRouter.get(`/sockets-live/:projectKey/autocomplete`, autocomplete); +wsRouter.get(`/sockets-list/:projectKey/autocomplete`, autocomplete); // autocomplete +wsRouter.get(`/sockets-list/:projectKey/:sessionId`, socketsListByProject); // is_live +wsRouter.get(`/sockets-live/:projectKey/autocomplete`, autocomplete); // not using wsRouter.get(`/sockets-live/:projectKey`, socketsLiveByProject); -wsRouter.post(`/sockets-live/:projectKey`, socketsLiveByProject); -wsRouter.get(`/sockets-live/:projectKey/:sessionId`, socketsLiveBySession); +wsRouter.post(`/sockets-live/:projectKey`, socketsLiveByProject); // assist search +wsRouter.get(`/sockets-live/:projectKey/:sessionId`, socketsLiveBySession); // session_exists, get_live_session_by_id +let io; module.exports = { wsRouter, start: (server, prefix) => { @@ -87,13 +33,10 @@ module.exports = { io.on('connection', (socket) => onConnect(socket)); console.log("WS server started"); - socketConnexionTimeout(io); }, handlers: { - socketsList, socketsListByProject, - socketsLive, socketsLiveByProject, socketsLiveBySession } diff --git a/assist/utils/helper.js b/assist/utils/helper.js index 9b0ffba8c..b3ee52dbe 100644 --- a/assist/utils/helper.js +++ b/assist/utils/helper.js @@ -120,6 +120,10 @@ const hasFilters = function (filters) { return filters && filters.filter && Object.keys(filters.filter).length > 0; } +const hasQuery = function (filters) { + return filters && filters.query && Object.keys(filters.query).length > 0; +} + const objectToObjectOfArrays = function (obj) { let _obj = {} if (obj) { @@ -161,7 +165,7 @@ const extractPayloadFromRequest = async function (req, res) { "filter": {}, // for sessions search "sort": { "key": req.body.sort && req.body.sort.key ? req.body.sort.key : undefined, - "order": req.body.sort && req.body.sort.order === "DESC" + "order": req.body.sort && req.body.sort?.order.toLowerCase() === "desc" }, "pagination": { "limit": req.body.pagination && req.body.pagination.limit ? req.body.pagination.limit : undefined, @@ -214,20 +218,18 @@ const sortPaginate = function (list, filters) { } const total = list.length; - list.sort((a, b) => { - const tA = getValue(a, "timestamp"); - const tB = getValue(b, "timestamp"); - return tA < tB ? 1 : tA > tB ? -1 : 0; // b - a - }); - if (filters.sort.order) { - list.reverse(); - } - if ((filters.sort.key || "timestamp") !== "timestamp") { + if (filters.sort.key && filters.sort.key !== "timestamp") { list.sort((a, b) => { const vA = getValue(a, filters.sort.key); const vB = getValue(b, filters.sort.key); return vA > vB ? 1 : vA < vB ? -1 : 0; }); + } else { + list.sort((a, b) => { + const tA = getValue(a, "timestamp"); + const tB = getValue(b, "timestamp"); + return tB - tA + }); } if (filters.sort.order) { list.reverse(); @@ -291,6 +293,7 @@ module.exports = { extractSessionIdFromRequest, isValidSession, hasFilters, + hasQuery, objectToObjectOfArrays, extractPayloadFromRequest, sortPaginate, diff --git a/assist/utils/httpHandlers.js b/assist/utils/httpHandlers.js index d1a21e995..092e9102e 100644 --- a/assist/utils/httpHandlers.js +++ b/assist/utils/httpHandlers.js @@ -1,6 +1,6 @@ const { hasFilters, - extractPeerId, + hasQuery, isValidSession, sortPaginate, getValidAttributes, @@ -10,22 +10,21 @@ const { extractProjectKeyFromRequest, extractSessionIdFromRequest, extractPayloadFromRequest, - getAvailableRooms } = require("./extractors"); -const { - IDENTITIES -} = require("./assistHelper"); -const { - getServer -} = require('../utils/wsServer'); const { RecordRequestDuration, IncreaseTotalRequests } = require('../utils/metrics'); +const { + GetRoomInfo, + GetRooms, + GetSessions, +} = require('../utils/rooms'); const debug_log = process.env.debug === "1"; const respond = function (req, res, data) { + console.log("responding with data: ", data) let result = {data} if (process.env.uws !== "true") { res.statusCode = 200; @@ -39,174 +38,121 @@ const respond = function (req, res, data) { RecordRequestDuration(req.method.toLowerCase(), res.handlerName, 200, duration/1000.0); } -const socketsList = async function (req, res) { - res.handlerName = 'socketsList'; - let io = getServer(); - debug_log && console.log("[WS]looking for all available sessions"); - let filters = await extractPayloadFromRequest(req, res); - let withFilters = hasFilters(filters); - let liveSessionsPerProject = {}; - let rooms = await getAvailableRooms(io); - for (let roomId of rooms.keys()) { - let {projectKey, sessionId} = extractPeerId(roomId); - if (projectKey !== undefined) { - liveSessionsPerProject[projectKey] = liveSessionsPerProject[projectKey] || new Set(); - if (withFilters) { - const connected_sockets = await io.in(roomId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo - && isValidSession(item.handshake.query.sessionInfo, filters.filter)) { - liveSessionsPerProject[projectKey].add(sessionId); - } - } - } else { - liveSessionsPerProject[projectKey].add(sessionId); - } - } +const getParticularSession = function (sessionId, filters) { + const sessInfo = GetRoomInfo(sessionId); + if (!sessInfo) { + return null; } - let liveSessions = {}; - liveSessionsPerProject.forEach((sessions, projectId) => { - liveSessions[projectId] = Array.from(sessions); - }); - respond(req, res, liveSessions); + if (!hasFilters(filters)) { + return sessInfo; + } + if (isValidSession(sessInfo, filters.filter)) { + return sessInfo; + } + return null; } +const getAllSessions = function (projectKey, filters, onlineOnly= false) { + const sessions = []; + const allRooms = onlineOnly ? GetSessions(projectKey) : GetRooms(projectKey); + + for (let sessionId of allRooms) { + let sessInfo = GetRoomInfo(sessionId); + if (!sessInfo) { + continue; + } + if (!hasFilters(filters)) { + sessions.push(sessInfo); + continue; + } + if (isValidSession(sessInfo, filters.filter)) { + sessions.push(sessInfo); + } + } + return sessions +} + +// Sort by projectKey const socketsListByProject = async function (req, res) { - res.handlerName = 'socketsListByProject'; - let io = getServer(); debug_log && console.log("[WS]looking for available sessions"); - let _projectKey = extractProjectKeyFromRequest(req); - let _sessionId = extractSessionIdFromRequest(req); - let filters = await extractPayloadFromRequest(req, res); - let withFilters = hasFilters(filters); - let liveSessions = new Set(); - let rooms = await getAvailableRooms(io); - for (let roomId of rooms.keys()) { - let {projectKey, sessionId} = extractPeerId(roomId); - if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) { - if (withFilters) { - const connected_sockets = await io.in(roomId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo - && isValidSession(item.handshake.query.sessionInfo, filters.filter)) { - liveSessions.add(sessionId); - } - } - } else { - liveSessions.add(sessionId); - } - } + res.handlerName = 'socketsListByProject'; + + const _projectKey = extractProjectKeyFromRequest(req); + const _sessionId = extractSessionIdFromRequest(req); + const filters = await extractPayloadFromRequest(req, res); + + // find a particular session + if (_sessionId) { + return respond(req, res, getParticularSession(_sessionId, filters)); } - let sessions = Array.from(liveSessions); - respond(req, res, _sessionId === undefined ? sortPaginate(sessions, filters) - : sessions.length > 0 ? sessions[0] - : null); + + // find all sessions for a project + const sessions = getAllSessions(_projectKey, filters); + + // send response + respond(req, res, sortPaginate(sessions, filters)); } +// Sort by projectKey const socketsLiveByProject = async function (req, res) { - res.handlerName = 'socketsLiveByProject'; - let io = getServer(); debug_log && console.log("[WS]looking for available LIVE sessions"); - let _projectKey = extractProjectKeyFromRequest(req); - let _sessionId = extractSessionIdFromRequest(req); - let filters = await extractPayloadFromRequest(req, res); - let withFilters = hasFilters(filters); - let liveSessions = new Set(); - const sessIDs = new Set(); - let rooms = await getAvailableRooms(io); - for (let roomId of rooms.keys()) { - let {projectKey, sessionId} = extractPeerId(roomId); - if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) { - let connected_sockets = await io.in(roomId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session) { - if (withFilters) { - if (item.handshake.query.sessionInfo && - isValidSession(item.handshake.query.sessionInfo, filters.filter) && - !sessIDs.has(item.handshake.query.sessionInfo.sessionID) - ) { - liveSessions.add(item.handshake.query.sessionInfo); - sessIDs.add(item.handshake.query.sessionInfo.sessionID); - } - } else { - if (!sessIDs.has(item.handshake.query.sessionInfo.sessionID)) { - liveSessions.add(item.handshake.query.sessionInfo); - sessIDs.add(item.handshake.query.sessionInfo.sessionID); - } - } - } - } - } + res.handlerName = 'socketsLiveByProject'; + + const _projectKey = extractProjectKeyFromRequest(req); + const _sessionId = extractSessionIdFromRequest(req); + const filters = await extractPayloadFromRequest(req, res); + + // find a particular session + if (_sessionId) { + return respond(req, res, getParticularSession(_sessionId, filters)); } - let sessions = Array.from(liveSessions); - respond(req, res, _sessionId === undefined ? sortPaginate(sessions, filters) : sessions.length > 0 ? sessions[0] : null); + + // find all sessions for a project + const sessions = getAllSessions(_projectKey, filters, true); + + // send response + respond(req, res, sortPaginate(sessions, filters)); } +// Sort by roomID (projectKey+sessionId) const socketsLiveBySession = async function (req, res) { - res.handlerName = 'socketsLiveBySession'; - let io = getServer(); debug_log && console.log("[WS]looking for LIVE session"); - let _projectKey = extractProjectKeyFromRequest(req); - let _sessionId = extractSessionIdFromRequest(req); - if (_sessionId === undefined) { - return respond(req, res, null); - } - let filters = await extractPayloadFromRequest(req, res); - let withFilters = hasFilters(filters); - let liveSessions = new Set(); - const sessIDs = new Set(); + res.handlerName = 'socketsLiveBySession'; - let connected_sockets = await io.in(_projectKey + '-' + _sessionId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session) { - if (withFilters) { - if (item.handshake.query.sessionInfo && - isValidSession(item.handshake.query.sessionInfo, filters.filter) && - !sessIDs.has(item.handshake.query.sessionInfo.sessionID) - ) { - liveSessions.add(item.handshake.query.sessionInfo); - sessIDs.add(item.handshake.query.sessionInfo.sessionID); - } - } else { - if (!sessIDs.has(item.handshake.query.sessionInfo.sessionID)) { - liveSessions.add(item.handshake.query.sessionInfo); - sessIDs.add(item.handshake.query.sessionInfo.sessionID); - } - } - } + const _sessionId = extractSessionIdFromRequest(req); + const filters = await extractPayloadFromRequest(req, res); + // find a particular session + if (_sessionId) { + return respond(req, res, getParticularSession(_sessionId, filters)); } - let sessions = Array.from(liveSessions); - respond(req, res, sessions.length > 0 ? sessions[0] : null); + return respond(req, res, null); } +// Sort by projectKey const autocomplete = async function (req, res) { - res.handlerName = 'autocomplete'; - let io = getServer(); debug_log && console.log("[WS]autocomplete"); - let _projectKey = extractProjectKeyFromRequest(req); - let filters = await extractPayloadFromRequest(req); + res.handlerName = 'autocomplete'; + + const _projectKey = extractProjectKeyFromRequest(req); + const filters = await extractPayloadFromRequest(req); let results = []; - if (filters.query && Object.keys(filters.query).length > 0) { - let rooms = await getAvailableRooms(io); - for (let roomId of rooms.keys()) { - let {projectKey} = extractPeerId(roomId); - if (projectKey === _projectKey) { - let connected_sockets = await io.in(roomId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo) { - results = [...results, ...getValidAttributes(item.handshake.query.sessionInfo, filters.query)]; - } - } - } + if (!hasQuery(filters)) { + return respond(req, res, results); + } + let allSessions = GetSessions(_projectKey); + for (let sessionId of allSessions) { + let sessInfo = GetRoomInfo(sessionId); + if (!sessInfo) { + continue; } + results = [...results, ...getValidAttributes(sessInfo, filters.query)]; } respond(req, res, uniqueAutocomplete(results)); } module.exports = { respond, - socketsList, socketsListByProject, socketsLiveByProject, socketsLiveBySession, diff --git a/assist/utils/rooms.js b/assist/utils/rooms.js new file mode 100644 index 000000000..4e4504253 --- /dev/null +++ b/assist/utils/rooms.js @@ -0,0 +1,49 @@ +const roomsInfo = new Map(); // sessionID -> sessionInfo +const projectSessions = new Map(); // projectKey -> Set(sessionIDs) // all rooms (even with agent only) +const projectRooms = new Map(); // projectKey -> Set(roomIDs) // online rooms + +function AddRoom(projKey, sessID, sessInfo) { + roomsInfo.set(sessID, sessInfo); + if (!projectRooms.has(projKey)) { + projectRooms.set(projKey, new Set()); + } + projectRooms.get(projKey).add(sessID); + if (!projectSessions.has(projKey)) { + projectSessions.set(projKey, new Set()); + } + projectSessions.get(projKey).add(sessID); +} + +function UpdateRoom(sessID, sessInfo) { + roomsInfo.set(sessID, sessInfo); +} + +function DeleteSession(projKey, sessID) { + projectSessions.get(projKey)?.delete(sessID); +} + +function DeleteRoom(projKey, sessID) { + projectRooms.get(projKey)?.delete(sessID); +} + +function GetRoomInfo(sessID) { + return roomsInfo.get(sessID); +} + +function GetRooms(projectKey) { + return projectRooms.get(projectKey) || new Set(); +} + +function GetSessions(projectKey) { + return projectSessions.get(projectKey) || new Set(); +} + +module.exports = { + AddRoom, + UpdateRoom, + DeleteRoom, + DeleteSession, + GetRoomInfo, + GetRooms, + GetSessions, +} \ No newline at end of file diff --git a/assist/utils/socketHandlers.js b/assist/utils/socketHandlers.js index f2ebcb863..9aceafaf4 100644 --- a/assist/utils/socketHandlers.js +++ b/assist/utils/socketHandlers.js @@ -23,6 +23,12 @@ const { IncreaseOnlineRooms, DecreaseOnlineRooms, } = require('../utils/metrics'); +const { + AddRoom, + UpdateRoom, + DeleteRoom, + DeleteSession, +} = require('../utils/rooms'); const debug_log = process.env.debug === "1"; const error_log = process.env.ERROR === "1"; @@ -95,12 +101,13 @@ async function onConnect(socket) { } } } + extractSessionInfo(socket); if (tabsCount < 0) { // New session creates new room IncreaseTotalRooms(); IncreaseOnlineRooms(); + AddRoom(socket.projectKey, socket.sessId, socket.handshake.query.sessionInfo); } - extractSessionInfo(socket); // Inform all connected agents about reconnected session if (agentsCount > 0) { debug_log && console.log(`notifying new session about agent-existence`); @@ -161,11 +168,14 @@ async function onDisconnect(socket) { if (tabsCount === -1 && agentsCount === -1) { DecreaseOnlineRooms(); debug_log && console.log(`room not found: ${socket.roomId}`); + DeleteSession(socket.projectKey, socket.sessId); + DeleteRoom(socket.projectKey, socket.sessId); return; } if (tabsCount === 0) { debug_log && console.log(`notifying everyone in ${socket.roomId} about no SESSIONS`); socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); + DeleteSession(socket.projectKey, socket.sessId); } if (agentsCount === 0) { debug_log && console.log(`notifying everyone in ${socket.roomId} about no AGENTS`); @@ -182,6 +192,7 @@ async function onUpdateEvent(socket, ...args) { args[0] = updateSessionData(socket, args[0]) Object.assign(socket.handshake.query.sessionInfo, args[0].data, {tabId: args[0]?.meta?.tabId}); + UpdateRoom(socket.sessId, socket.handshake.query.sessionInfo); // Update sessionInfo for all agents in the room const io = getServer(); diff --git a/ee/assist/Dockerfile b/ee/assist/Dockerfile index fe01eaf19..c53d519bf 100644 --- a/ee/assist/Dockerfile +++ b/ee/assist/Dockerfile @@ -1,6 +1,10 @@ -FROM node:18-alpine +#ARCH can be amd64 or arm64 +ARG ARCH=amd64 + +FROM --platform=linux/$ARCH node:20-alpine LABEL Maintainer="KRAIEM Taha Yassine" -RUN apk add --no-cache tini git libc6-compat && ln -s /lib/libc.musl-x86_64.so.1 /lib/ld-linux-x86-64.so.2 +RUN apk add --no-cache tini git libc6-compat +# && ln -s /lib/libc.musl-x86_64.so.1 /lib/ld-linux-x86-64.so.2 ARG envarg ENV ENTERPRISE_BUILD=${envarg} \ diff --git a/ee/assist/server.js b/ee/assist/server.js index 685a77239..fed10e38c 100644 --- a/ee/assist/server.js +++ b/ee/assist/server.js @@ -92,15 +92,9 @@ if (process.env.uws !== "true") { return fn(req, res); } } - uapp.get(`${PREFIX}/${P_KEY}/sockets-list`, uWrapper(socket.handlers.socketsList)); - uapp.post(`${PREFIX}/${P_KEY}/sockets-list`, uWrapper(socket.handlers.socketsList)); - uapp.get(`${PREFIX}/${P_KEY}/sockets-list/:projectKey/autocomplete`, uWrapper(socket.handlers.autocomplete)); - uapp.get(`${PREFIX}/${P_KEY}/sockets-list/:projectKey`, uWrapper(socket.handlers.socketsListByProject)); - uapp.post(`${PREFIX}/${P_KEY}/sockets-list/:projectKey`, uWrapper(socket.handlers.socketsListByProject)); - uapp.get(`${PREFIX}/${P_KEY}/sockets-list/:projectKey/:sessionId`, uWrapper(socket.handlers.socketsListByProject)); - uapp.get(`${PREFIX}/${P_KEY}/sockets-live`, uWrapper(socket.handlers.socketsLive)); - uapp.post(`${PREFIX}/${P_KEY}/sockets-live`, uWrapper(socket.handlers.socketsLive)); + uapp.get(`${PREFIX}/${P_KEY}/sockets-list/:projectKey/autocomplete`, uWrapper(socket.handlers.autocomplete)); + uapp.get(`${PREFIX}/${P_KEY}/sockets-list/:projectKey/:sessionId`, uWrapper(socket.handlers.socketsListByProject)); uapp.get(`${PREFIX}/${P_KEY}/sockets-live/:projectKey/autocomplete`, uWrapper(socket.handlers.autocomplete)); uapp.get(`${PREFIX}/${P_KEY}/sockets-live/:projectKey`, uWrapper(socket.handlers.socketsLiveByProject)); uapp.post(`${PREFIX}/${P_KEY}/sockets-live/:projectKey`, uWrapper(socket.handlers.socketsLiveByProject)); diff --git a/ee/assist/servers/websocket-cluster.js b/ee/assist/servers/websocket-cluster.js index 6cb2d9613..95e0c357c 100644 --- a/ee/assist/servers/websocket-cluster.js +++ b/ee/assist/servers/websocket-cluster.js @@ -1,19 +1,8 @@ const express = require('express'); const { - extractPeerId, - hasFilters, - isValidSession, - sortPaginate, -} = require('../utils/helper'); -const { - IDENTITIES, socketConnexionTimeout, authorizer } = require('../utils/assistHelper'); -const { - extractPayloadFromRequest, - getAvailableRooms -} = require('../utils/helper-ee'); const { createSocketIOServer } = require('../utils/wsServer'); @@ -21,78 +10,30 @@ const { onConnect } = require('../utils/socketHandlers'); const { - respond, - socketsList, socketsListByProject, socketsLiveByProject, socketsLiveBySession, autocomplete -} = require('../utils/httpHandlers'); +} = require('../utils/httpHandlers-cluster'); const {createAdapter} = require("@socket.io/redis-adapter"); const {createClient} = require("redis"); -const wsRouter = express.Router(); const REDIS_URL = (process.env.REDIS_URL || "localhost:6379").replace(/((^\w+:|^)\/\/|^)/, 'redis://'); const pubClient = createClient({url: REDIS_URL}); const subClient = pubClient.duplicate(); console.log(`Using Redis: ${REDIS_URL}`); -let io; + const debug_log = process.env.debug === "1"; -const socketsLive = async function (req, res) { - res.handlerName = 'socketsLive'; - debug_log && console.log("[WS]looking for all available LIVE sessions"); - let filters = await extractPayloadFromRequest(req, res); - let withFilters = hasFilters(filters); - let liveSessionsPerProject = {}; - const sessIDs = new Set(); - let rooms = await getAvailableRooms(io); - for (let roomId of rooms.keys()) { - let {projectKey} = extractPeerId(roomId); - if (projectKey !== undefined) { - let connected_sockets = await io.in(roomId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session) { - liveSessionsPerProject[projectKey] = liveSessionsPerProject[projectKey] || new Set(); - if (withFilters) { - if (item.handshake.query.sessionInfo && - isValidSession(item.handshake.query.sessionInfo, filters.filter) && - !sessIDs.has(item.handshake.query.sessionInfo.sessionID) - ) { - liveSessionsPerProject[projectKey].add(item.handshake.query.sessionInfo); - sessIDs.add(item.handshake.query.sessionInfo.sessionID); - } - } else { - if (!sessIDs.has(item.handshake.query.sessionInfo.sessionID)) { - liveSessionsPerProject[projectKey].add(item.handshake.query.sessionInfo); - sessIDs.add(item.handshake.query.sessionInfo.sessionID); - } - } - } - } - } - } - let liveSessions = {}; - liveSessionsPerProject.forEach((sessions, projectId) => { - liveSessions[projectId] = Array.from(sessions); - }); - respond(req, res, sortPaginate(liveSessions, filters)); -} - -wsRouter.get(`/sockets-list`, socketsList); -wsRouter.post(`/sockets-list`, socketsList); -wsRouter.get(`/sockets-list/:projectKey/autocomplete`, autocomplete); -wsRouter.get(`/sockets-list/:projectKey`, socketsListByProject); -wsRouter.post(`/sockets-list/:projectKey`, socketsListByProject); -wsRouter.get(`/sockets-list/:projectKey/:sessionId`, socketsListByProject); - -wsRouter.get(`/sockets-live`, socketsLive); -wsRouter.post(`/sockets-live`, socketsLive); -wsRouter.get(`/sockets-live/:projectKey/autocomplete`, autocomplete); +const wsRouter = express.Router(); +wsRouter.get(`/sockets-list/:projectKey/autocomplete`, autocomplete); // autocomplete +wsRouter.get(`/sockets-list/:projectKey/:sessionId`, socketsListByProject); // is_live +wsRouter.get(`/sockets-live/:projectKey/autocomplete`, autocomplete); // not using wsRouter.get(`/sockets-live/:projectKey`, socketsLiveByProject); -wsRouter.post(`/sockets-live/:projectKey`, socketsLiveByProject); -wsRouter.get(`/sockets-live/:projectKey/:sessionId`, socketsLiveBySession); +wsRouter.post(`/sockets-live/:projectKey`, socketsLiveByProject); // assist search +wsRouter.get(`/sockets-live/:projectKey/:sessionId`, socketsLiveBySession); // session_exists, get_live_session_by_id +let io; module.exports = { wsRouter, start: (server, prefix) => { @@ -117,9 +58,7 @@ module.exports = { }); }, handlers: { - socketsList, socketsListByProject, - socketsLive, socketsLiveByProject, socketsLiveBySession, autocomplete diff --git a/ee/assist/servers/websocket.js b/ee/assist/servers/websocket.js index 855894272..0bc40c035 100644 --- a/ee/assist/servers/websocket.js +++ b/ee/assist/servers/websocket.js @@ -1,16 +1,5 @@ const express = require('express'); const { - extractPeerId, - hasFilters, - isValidSession, - sortPaginate, -} = require('../utils/helper'); -const { - extractPayloadFromRequest, - getAvailableRooms -} = require('../utils/helper-ee'); -const { - IDENTITIES, socketConnexionTimeout, authorizer } = require('../utils/assistHelper'); @@ -21,8 +10,6 @@ const { onConnect } = require('../utils/socketHandlers'); const { - respond, - socketsList, socketsListByProject, socketsLiveByProject, socketsLiveBySession, @@ -30,56 +17,14 @@ const { } = require('../utils/httpHandlers'); const wsRouter = express.Router(); +wsRouter.get(`/sockets-list/:projectKey/autocomplete`, autocomplete); // autocomplete +wsRouter.get(`/sockets-list/:projectKey/:sessionId`, socketsListByProject); // is_live +wsRouter.get(`/sockets-live/:projectKey/autocomplete`, autocomplete); // not using +wsRouter.get(`/sockets-live/:projectKey`, socketsLiveByProject); +wsRouter.post(`/sockets-live/:projectKey`, socketsLiveByProject); // assist search +wsRouter.get(`/sockets-live/:projectKey/:sessionId`, socketsLiveBySession); // session_exists, get_live_session_by_id let io; -const debug_log = process.env.debug === "1"; - -const socketsLive = async function (req, res) { - res.handlerName = 'socketsLive'; - debug_log && console.log("[WS]looking for all available LIVE sessions"); - let filters = await extractPayloadFromRequest(req, res); - let withFilters = hasFilters(filters); - let liveSessionsPerProject = {}; - let rooms = await getAvailableRooms(io); - for (let roomId of rooms.keys()) { - let {projectKey} = extractPeerId(roomId); - if (projectKey !== undefined) { - let connected_sockets = await io.in(roomId).fetchSockets(); - for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session) { - liveSessionsPerProject[projectKey] = liveSessionsPerProject[projectKey] || new Set(); - if (withFilters) { - if (item.handshake.query.sessionInfo && isValidSession(item.handshake.query.sessionInfo, filters.filter)) { - liveSessionsPerProject[projectKey].add(item.handshake.query.sessionInfo); - } - } else { - liveSessionsPerProject[projectKey].add(item.handshake.query.sessionInfo); - } - } - } - } - } - let liveSessions = {}; - liveSessionsPerProject.forEach((sessions, projectId) => { - liveSessions[projectId] = Array.from(sessions); - }); - respond(req, res, sortPaginate(liveSessions, filters)); -} - -wsRouter.get(`/sockets-list`, socketsList); -wsRouter.post(`/sockets-list`, socketsList); -wsRouter.get(`/sockets-list/:projectKey/autocomplete`, autocomplete); -wsRouter.get(`/sockets-list/:projectKey`, socketsListByProject); -wsRouter.post(`/sockets-list/:projectKey`, socketsListByProject); -wsRouter.get(`/sockets-list/:projectKey/:sessionId`, socketsListByProject); - -wsRouter.get(`/sockets-live`, socketsLive); -wsRouter.post(`/sockets-live`, socketsLive); -wsRouter.get(`/sockets-live/:projectKey/autocomplete`, autocomplete); -wsRouter.get(`/sockets-live/:projectKey`, socketsLiveByProject); -wsRouter.post(`/sockets-live/:projectKey`, socketsLiveByProject); -wsRouter.get(`/sockets-live/:projectKey/:sessionId`, socketsLiveBySession); - module.exports = { wsRouter, start: (server, prefix) => { @@ -88,13 +33,10 @@ module.exports = { io.on('connection', (socket) => onConnect(socket)); console.log("WS server started"); - socketConnexionTimeout(io); }, handlers: { - socketsList, socketsListByProject, - socketsLive, socketsLiveByProject, socketsLiveBySession, autocomplete diff --git a/ee/assist/utils/httpHandlers-cluster.js b/ee/assist/utils/httpHandlers-cluster.js new file mode 100644 index 000000000..c24135340 --- /dev/null +++ b/ee/assist/utils/httpHandlers-cluster.js @@ -0,0 +1,176 @@ +const { + hasFilters, + extractPeerId, + isValidSession, + sortPaginate, + getValidAttributes, + uniqueAutocomplete +} = require("./helper"); +const { + extractProjectKeyFromRequest, + extractSessionIdFromRequest, + extractPayloadFromRequest, + getAvailableRooms +} = require("./extractors"); +const { + IDENTITIES +} = require("./assistHelper"); +const { + getServer +} = require('../utils/wsServer'); +const { + RecordRequestDuration, + IncreaseTotalRequests +} = require('../utils/metrics'); + +const debug_log = process.env.debug === "1"; + +const respond = function (req, res, data) { + let result = {data} + if (process.env.uws !== "true") { + res.statusCode = 200; + res.setHeader('Content-Type', 'application/json'); + res.end(JSON.stringify(result)); + } else { + res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify(result)); + } + const duration = performance.now() - req.startTs; + IncreaseTotalRequests(); + RecordRequestDuration(req.method.toLowerCase(), res.handlerName, 200, duration/1000.0); +} + +const socketsListByProject = async function (req, res) { + debug_log && console.log("[WS]looking for available sessions"); + res.handlerName = 'socketsListByProject'; + + let io = getServer(); + let _projectKey = extractProjectKeyFromRequest(req); + let _sessionId = extractSessionIdFromRequest(req); + if (_sessionId === undefined) { + return respond(req, res, null); + } + let filters = await extractPayloadFromRequest(req, res); + + let connected_sockets = await io.in(_projectKey + '-' + _sessionId).fetchSockets(); + for (let item of connected_sockets) { + if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo + && isValidSession(item.handshake.query.sessionInfo, filters.filter)) { + return respond(req, res, _sessionId); + } + } + respond(req, res, null); +} + +const socketsLiveByProject = async function (req, res) { + debug_log && console.log("[WS]looking for available LIVE sessions"); + res.handlerName = 'socketsLiveByProject'; + + let io = getServer(); + let _projectKey = extractProjectKeyFromRequest(req); + let _sessionId = extractSessionIdFromRequest(req); + let filters = await extractPayloadFromRequest(req, res); + let withFilters = hasFilters(filters); + let liveSessions = new Set(); + const sessIDs = new Set(); + + let rooms = await getAvailableRooms(io); + for (let roomId of rooms.keys()) { + let {projectKey, sessionId} = extractPeerId(roomId); + if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) { + let connected_sockets = await io.in(roomId).fetchSockets(); + for (let item of connected_sockets) { + if (item.handshake.query.identity === IDENTITIES.session) { + if (withFilters) { + if (item.handshake.query.sessionInfo && + isValidSession(item.handshake.query.sessionInfo, filters.filter) && + !sessIDs.has(item.handshake.query.sessionInfo.sessionID) + ) { + liveSessions.add(item.handshake.query.sessionInfo); + sessIDs.add(item.handshake.query.sessionInfo.sessionID); + } + } else { + if (!sessIDs.has(item.handshake.query.sessionInfo.sessionID)) { + liveSessions.add(item.handshake.query.sessionInfo); + sessIDs.add(item.handshake.query.sessionInfo.sessionID); + } + } + } + } + } + } + let sessions = Array.from(liveSessions); + respond(req, res, _sessionId === undefined ? sortPaginate(sessions, filters) : sessions.length > 0 ? sessions[0] : null); +} + +const socketsLiveBySession = async function (req, res) { + debug_log && console.log("[WS]looking for LIVE session"); + res.handlerName = 'socketsLiveBySession'; + + let io = getServer(); + let _projectKey = extractProjectKeyFromRequest(req); + let _sessionId = extractSessionIdFromRequest(req); + if (_sessionId === undefined) { + return respond(req, res, null); + } + let filters = await extractPayloadFromRequest(req, res); + let withFilters = hasFilters(filters); + let liveSessions = new Set(); + const sessIDs = new Set(); + + let connected_sockets = await io.in(_projectKey + '-' + _sessionId).fetchSockets(); + for (let item of connected_sockets) { + if (item.handshake.query.identity === IDENTITIES.session) { + if (withFilters) { + if (item.handshake.query.sessionInfo && + isValidSession(item.handshake.query.sessionInfo, filters.filter) && + !sessIDs.has(item.handshake.query.sessionInfo.sessionID) + ) { + liveSessions.add(item.handshake.query.sessionInfo); + sessIDs.add(item.handshake.query.sessionInfo.sessionID); + } + } else { + if (!sessIDs.has(item.handshake.query.sessionInfo.sessionID)) { + liveSessions.add(item.handshake.query.sessionInfo); + sessIDs.add(item.handshake.query.sessionInfo.sessionID); + } + } + } + + } + let sessions = Array.from(liveSessions); + respond(req, res, sessions.length > 0 ? sessions[0] : null); +} + +const autocomplete = async function (req, res) { + debug_log && console.log("[WS]autocomplete"); + res.handlerName = 'autocomplete'; + + let io = getServer(); + let _projectKey = extractProjectKeyFromRequest(req); + let filters = await extractPayloadFromRequest(req); + let results = []; + + if (filters.query && Object.keys(filters.query).length > 0) { + let rooms = await getAvailableRooms(io); + for (let roomId of rooms.keys()) { + let {projectKey} = extractPeerId(roomId); + if (projectKey === _projectKey) { + let connected_sockets = await io.in(roomId).fetchSockets(); + for (let item of connected_sockets) { + if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo) { + results = [...results, ...getValidAttributes(item.handshake.query.sessionInfo, filters.query)]; + } + } + } + } + } + respond(req, res, uniqueAutocomplete(results)); +} + +module.exports = { + respond, + socketsListByProject, + socketsLiveByProject, + socketsLiveBySession, + autocomplete +} \ No newline at end of file