From cb9d713a1fb2239ccbd818ef7363b9bb7414b7d9 Mon Sep 17 00:00:00 2001 From: Alexander Zavorotynskiy Date: Fri, 2 Jun 2023 09:16:49 +0200 Subject: [PATCH] feat(backend): added multitab support to assits --- assist/servers/websocket.js | 142 ++++++++++++++++++++---------------- assist/utils/helper.js | 23 +++++- 2 files changed, 102 insertions(+), 63 deletions(-) diff --git a/assist/servers/websocket.js b/assist/servers/websocket.js index 44aed4d09..65d5b5a0a 100644 --- a/assist/servers/websocket.js +++ b/assist/servers/websocket.js @@ -1,6 +1,8 @@ const _io = require('socket.io'); const express = require('express'); const { + extractRoomId, + extractTabId, extractPeerId, extractProjectKeyFromRequest, extractSessionIdFromRequest, @@ -47,25 +49,30 @@ const respond = function (res, data) { const socketsList = async function (req, res) { debug && console.log("[WS]looking for all available sessions"); let filters = await extractPayloadFromRequest(req); - let liveSessions = {}; + let withFilters = hasFilters(filters); + let liveSessionsPerProject = {}; let rooms = await getAvailableRooms(io); - for (let peerId of rooms.keys()) { - let {projectKey, sessionId} = extractPeerId(peerId); + for (let roomId of rooms.keys()) { + let {projectKey, sessionId} = extractPeerId(roomId); if (projectKey !== undefined) { - liveSessions[projectKey] = liveSessions[projectKey] || []; - if (hasFilters(filters)) { - const connected_sockets = await io.in(peerId).fetchSockets(); + liveSessionsPerProject[projectKey] = liveSessionsPerProject[projectKey] || new Set(); + if (withFilters) { + const connected_sockets = await io.in(roomId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo && isValidSession(item.handshake.query.sessionInfo, filters.filter)) { - liveSessions[projectKey].push(sessionId); + liveSessionsPerProject[projectKey].add(sessionId); } } } else { - liveSessions[projectKey].push(sessionId); + liveSessionsPerProject[projectKey].add(sessionId); } } } + let liveSessions = {}; + liveSessionsPerProject.forEach((sessions, projectId) => { + liveSessions[projectId] = Array.from(sessions); + }); respond(res, liveSessions); } @@ -74,35 +81,36 @@ const socketsListByProject = async function (req, res) { let _projectKey = extractProjectKeyFromRequest(req); let _sessionId = extractSessionIdFromRequest(req); let filters = await extractPayloadFromRequest(req); - let liveSessions = {}; + let withFilters = hasFilters(filters); + let liveSessions = new Set(); let rooms = await getAvailableRooms(io); - for (let peerId of rooms.keys()) { - let {projectKey, sessionId} = extractPeerId(peerId); + for (let roomId of rooms.keys()) { + let {projectKey, sessionId} = extractPeerId(roomId); if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) { - liveSessions[projectKey] = liveSessions[projectKey] || []; - if (hasFilters(filters)) { - const connected_sockets = await io.in(peerId).fetchSockets(); + if (withFilters) { + const connected_sockets = await io.in(roomId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo && isValidSession(item.handshake.query.sessionInfo, filters.filter)) { - liveSessions[projectKey].push(sessionId); + liveSessions.add(sessionId); } } } else { - liveSessions[projectKey].push(sessionId); + liveSessions.add(sessionId); } } } - liveSessions[_projectKey] = liveSessions[_projectKey] || []; - respond(res, _sessionId === undefined ? sortPaginate(liveSessions[_projectKey], filters) - : liveSessions[_projectKey].length > 0 ? liveSessions[_projectKey][0] + let sessions = Array.from(liveSessions); + respond(res, _sessionId === undefined ? sortPaginate(sessions, filters) + : sessions.length > 0 ? sessions[0] : null); } const socketsLive = async function (req, res) { debug && console.log("[WS]looking for all available LIVE sessions"); let filters = await extractPayloadFromRequest(req); - let liveSessions = {}; + let withFilters = hasFilters(filters); + let liveSessionsPerProject = {}; let rooms = await getAvailableRooms(io); for (let peerId of rooms.keys()) { let {projectKey} = extractPeerId(peerId); @@ -110,18 +118,22 @@ const socketsLive = async function (req, res) { let connected_sockets = await io.in(peerId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session) { - liveSessions[projectKey] = liveSessions[projectKey] || []; - if (hasFilters(filters)) { + liveSessionsPerProject[projectKey] = liveSessionsPerProject[projectKey] || new Set(); + if (withFilters) { if (item.handshake.query.sessionInfo && isValidSession(item.handshake.query.sessionInfo, filters.filter)) { - liveSessions[projectKey].push(item.handshake.query.sessionInfo); + liveSessionsPerProject[projectKey].add(item.handshake.query.sessionInfo); } } else { - liveSessions[projectKey].push(item.handshake.query.sessionInfo); + liveSessionsPerProject[projectKey].add(item.handshake.query.sessionInfo); } } } } } + let liveSessions = {}; + liveSessionsPerProject.forEach((sessions, projectId) => { + liveSessions[projectId] = Array.from(sessions); + }); respond(res, sortPaginate(liveSessions, filters)); } @@ -130,29 +142,29 @@ const socketsLiveByProject = async function (req, res) { let _projectKey = extractProjectKeyFromRequest(req); let _sessionId = extractSessionIdFromRequest(req); let filters = await extractPayloadFromRequest(req); - let liveSessions = {}; + let withFilters = hasFilters(filters); + let liveSessions = new Set(); let rooms = await getAvailableRooms(io); - for (let peerId of rooms.keys()) { - let {projectKey, sessionId} = extractPeerId(peerId); + for (let roomId of rooms.keys()) { + let {projectKey, sessionId} = extractPeerId(roomId); if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) { - let connected_sockets = await io.in(peerId).fetchSockets(); + let connected_sockets = await io.in(roomId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session) { - liveSessions[projectKey] = liveSessions[projectKey] || []; - if (hasFilters(filters)) { + if (withFilters) { if (item.handshake.query.sessionInfo && isValidSession(item.handshake.query.sessionInfo, filters.filter)) { - liveSessions[projectKey].push(item.handshake.query.sessionInfo); + liveSessions.add(item.handshake.query.sessionInfo); } } else { - liveSessions[projectKey].push(item.handshake.query.sessionInfo); + liveSessions.add(item.handshake.query.sessionInfo); } } } } } - liveSessions[_projectKey] = liveSessions[_projectKey] || []; - respond(res, _sessionId === undefined ? sortPaginate(liveSessions[_projectKey], filters) - : liveSessions[_projectKey].length > 0 ? liveSessions[_projectKey][0] + let sessions = Array.from(liveSessions); + respond(res, _sessionId === undefined ? sortPaginate(sessions, filters) + : sessions.length > 0 ? sessions[0] : null); } @@ -178,10 +190,10 @@ const autocomplete = async function (req, res) { respond(res, uniqueAutocomplete(results)); } -const findSessionSocketId = async (io, peerId) => { - const connected_sockets = await io.in(peerId).fetchSockets(); +const findSessionSocketId = async (io, roomId, tabId) => { + const connected_sockets = await io.in(roomId).fetchSockets(); for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session) { + if (item.handshake.query.identity === IDENTITIES.session && item.tabId === tabId) { return item.id; } } @@ -191,8 +203,8 @@ const findSessionSocketId = async (io, peerId) => { async function sessions_agents_count(io, socket) { let c_sessions = 0, c_agents = 0; const rooms = await getAvailableRooms(io); - if (rooms.get(socket.peerId)) { - const connected_sockets = await io.in(socket.peerId).fetchSockets(); + if (rooms.get(socket.roomId)) { + const connected_sockets = await io.in(socket.roomId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session) { @@ -211,8 +223,8 @@ async function sessions_agents_count(io, socket) { async function get_all_agents_ids(io, socket) { let agents = []; const rooms = await getAvailableRooms(io); - if (rooms.get(socket.peerId)) { - const connected_sockets = await io.in(socket.peerId).fetchSockets(); + if (rooms.get(socket.roomId)) { + const connected_sockets = await io.in(socket.roomId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.agent) { agents.push(item.id); @@ -246,6 +258,8 @@ module.exports = { debug && console.log(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`); socket._connectedAt = new Date(); socket.peerId = socket.handshake.query.peerId; + socket.roomId = extractRoomId(socket.peerId); + socket.tabId = extractTabId(socket.peerId); socket.identity = socket.handshake.query.identity; let {c_sessions, c_agents} = await sessions_agents_count(io, socket); if (socket.identity === IDENTITIES.session) { @@ -259,42 +273,42 @@ module.exports = { debug && console.log(`notifying new session about agent-existence`); let agents_ids = await get_all_agents_ids(io, socket); io.to(socket.id).emit(EVENTS_DEFINITION.emit.AGENTS_CONNECTED, agents_ids); - socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id); + socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id); } } else if (c_sessions <= 0) { debug && console.log(`notifying new agent about no SESSIONS with peerId:${socket.peerId}`); io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); } - await socket.join(socket.peerId); + await socket.join(socket.roomId); const rooms = await getAvailableRooms(io); - if (rooms.get(socket.peerId)) { - debug && console.log(`${socket.id} joined room:${socket.peerId}, as:${socket.identity}, members:${rooms.get(socket.peerId).size}`); + if (rooms.get(socket.roomId)) { + debug && console.log(`${socket.id} joined room:${socket.roomId}, as:${socket.identity}, members:${rooms.get(socket.roomId).size}`); } if (socket.identity === IDENTITIES.agent) { if (socket.handshake.query.agentInfo !== undefined) { socket.handshake.query.agentInfo = JSON.parse(socket.handshake.query.agentInfo); } - socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo); + socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo); } socket.on('disconnect', async () => { - debug && console.log(`${socket.id} disconnected from ${socket.peerId}`); + debug && console.log(`${socket.id} disconnected from ${socket.roomId}`); if (socket.identity === IDENTITIES.agent) { - socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id); + socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id); } debug && console.log("checking for number of connected agents and sessions"); let {c_sessions, c_agents} = await sessions_agents_count(io, socket); if (c_sessions === -1 && c_agents === -1) { - debug && console.log(`room not found: ${socket.peerId}`); + debug && console.log(`room not found: ${socket.roomId}`); } if (c_sessions === 0) { - debug && console.log(`notifying everyone in ${socket.peerId} about no SESSIONS`); - socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); + debug && console.log(`notifying everyone in ${socket.roomId} about no SESSIONS`); + socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); } if (c_agents === 0) { - debug && console.log(`notifying everyone in ${socket.peerId} about no AGENTS`); - socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NO_AGENTS); + debug && console.log(`notifying everyone in ${socket.roomId} about no AGENTS`); + socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_AGENTS); } }); @@ -305,7 +319,7 @@ module.exports = { return } socket.handshake.query.sessionInfo = {...socket.handshake.query.sessionInfo, ...args[0]}; - socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.UPDATE_EVENT, args[0]); + socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.UPDATE_EVENT, args[0]); }); socket.on(EVENTS_DEFINITION.listen.CONNECT_ERROR, err => errorHandler(EVENTS_DEFINITION.listen.CONNECT_ERROR, err)); @@ -317,17 +331,23 @@ module.exports = { return } if (socket.identity === IDENTITIES.session) { - debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.peerId}`); + debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.roomId}`); + // TODO: emit message to all agents in the room (except tabs) socket.to(socket.peerId).emit(eventName, args[0]); } else { - debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to session of room:${socket.peerId}`); - let socketId = await findSessionSocketId(io, socket.peerId); + debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to session of room:${socket.roomId}`); + // TODO: new message structure: {meta: {tabId: string::id}, data: xxx -> args[0]} + if (args[0].meta === undefined && args[0].data === undefined) { + debug && console.log(`received event:${eventName}, from:${socket.identity}, but message structure is wrong, stopping onAny.`); + return + } + let socketId = await findSessionSocketId(io, socket.roomId, args[0].meta.tabId); if (socketId === null) { - debug && console.log(`session not found for:${socket.peerId}`); + debug && console.log(`session not found for:${socket.roomId}`); io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); } else { debug && console.log("message sent"); - io.to(socketId).emit(eventName, socket.id, args[0]); + io.to(socketId).emit(eventName, socket.id, args[0].data); } } }); @@ -342,7 +362,7 @@ module.exports = { const arr = Array.from(rooms); const filtered = arr.filter(room => !room[1].has(room[0])); for (let i of filtered) { - let {projectKey, sessionId} = extractPeerId(i[0]); + let {projectKey, sessionId, tabId} = extractPeerId(i[0]); if (projectKey !== null && sessionId !== null) { count++; } diff --git a/assist/utils/helper.js b/assist/utils/helper.js index 216518777..9f92d2834 100644 --- a/assist/utils/helper.js +++ b/assist/utils/helper.js @@ -1,8 +1,22 @@ let PROJECT_KEY_LENGTH = parseInt(process.env.PROJECT_KEY_LENGTH) || 20; let debug = process.env.debug === "1" || false; +const extractRoomId = (peerId) => { + let {projectKey, sessionId, tabId} = extractPeerId(peerId); + if (projectKey && sessionId) { + return `${projectKey}-${sessionId}`; + } + return null; +} +const extractTabId = (peerId) => { + let {projectKey, sessionId, tabId} = extractPeerId(peerId); + if (tabId) { + return tabId; + } + return null; +} const extractPeerId = (peerId) => { let splited = peerId.split("-"); - if (splited.length !== 2) { + if (splited.length < 2 || splited.length > 3) { debug && console.error(`cannot split peerId: ${peerId}`); return {}; } @@ -10,7 +24,10 @@ const extractPeerId = (peerId) => { debug && console.error(`wrong project key length for peerId: ${peerId}`); return {}; } - return {projectKey: splited[0], sessionId: splited[1]}; + if (splited.length === 2) { + return {projectKey: splited[0], sessionId: splited[1], tabId: null}; + } + return {projectKey: splited[0], sessionId: splited[1], tabId: splited[2]}; }; const request_logger = (identity) => { return (req, res, next) => { @@ -246,6 +263,8 @@ const getCompressionConfig = function () { } module.exports = { transformFilters, + extractRoomId, + extractTabId, extractPeerId, request_logger, getValidAttributes,