feat(backend): added multitab support to assits

This commit is contained in:
Alexander Zavorotynskiy 2023-06-02 09:16:49 +02:00 committed by nick-delirium
parent 90b05ef82e
commit cb9d713a1f
2 changed files with 102 additions and 63 deletions

View file

@ -1,6 +1,8 @@
const _io = require('socket.io');
const express = require('express');
const {
extractRoomId,
extractTabId,
extractPeerId,
extractProjectKeyFromRequest,
extractSessionIdFromRequest,
@ -47,25 +49,30 @@ const respond = function (res, data) {
const socketsList = async function (req, res) {
debug && console.log("[WS]looking for all available sessions");
let filters = await extractPayloadFromRequest(req);
let liveSessions = {};
let withFilters = hasFilters(filters);
let liveSessionsPerProject = {};
let rooms = await getAvailableRooms(io);
for (let peerId of rooms.keys()) {
let {projectKey, sessionId} = extractPeerId(peerId);
for (let roomId of rooms.keys()) {
let {projectKey, sessionId} = extractPeerId(roomId);
if (projectKey !== undefined) {
liveSessions[projectKey] = liveSessions[projectKey] || [];
if (hasFilters(filters)) {
const connected_sockets = await io.in(peerId).fetchSockets();
liveSessionsPerProject[projectKey] = liveSessionsPerProject[projectKey] || new Set();
if (withFilters) {
const connected_sockets = await io.in(roomId).fetchSockets();
for (let item of connected_sockets) {
if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo
&& isValidSession(item.handshake.query.sessionInfo, filters.filter)) {
liveSessions[projectKey].push(sessionId);
liveSessionsPerProject[projectKey].add(sessionId);
}
}
} else {
liveSessions[projectKey].push(sessionId);
liveSessionsPerProject[projectKey].add(sessionId);
}
}
}
let liveSessions = {};
liveSessionsPerProject.forEach((sessions, projectId) => {
liveSessions[projectId] = Array.from(sessions);
});
respond(res, liveSessions);
}
@ -74,35 +81,36 @@ const socketsListByProject = async function (req, res) {
let _projectKey = extractProjectKeyFromRequest(req);
let _sessionId = extractSessionIdFromRequest(req);
let filters = await extractPayloadFromRequest(req);
let liveSessions = {};
let withFilters = hasFilters(filters);
let liveSessions = new Set();
let rooms = await getAvailableRooms(io);
for (let peerId of rooms.keys()) {
let {projectKey, sessionId} = extractPeerId(peerId);
for (let roomId of rooms.keys()) {
let {projectKey, sessionId} = extractPeerId(roomId);
if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) {
liveSessions[projectKey] = liveSessions[projectKey] || [];
if (hasFilters(filters)) {
const connected_sockets = await io.in(peerId).fetchSockets();
if (withFilters) {
const connected_sockets = await io.in(roomId).fetchSockets();
for (let item of connected_sockets) {
if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo
&& isValidSession(item.handshake.query.sessionInfo, filters.filter)) {
liveSessions[projectKey].push(sessionId);
liveSessions.add(sessionId);
}
}
} else {
liveSessions[projectKey].push(sessionId);
liveSessions.add(sessionId);
}
}
}
liveSessions[_projectKey] = liveSessions[_projectKey] || [];
respond(res, _sessionId === undefined ? sortPaginate(liveSessions[_projectKey], filters)
: liveSessions[_projectKey].length > 0 ? liveSessions[_projectKey][0]
let sessions = Array.from(liveSessions);
respond(res, _sessionId === undefined ? sortPaginate(sessions, filters)
: sessions.length > 0 ? sessions[0]
: null);
}
const socketsLive = async function (req, res) {
debug && console.log("[WS]looking for all available LIVE sessions");
let filters = await extractPayloadFromRequest(req);
let liveSessions = {};
let withFilters = hasFilters(filters);
let liveSessionsPerProject = {};
let rooms = await getAvailableRooms(io);
for (let peerId of rooms.keys()) {
let {projectKey} = extractPeerId(peerId);
@ -110,18 +118,22 @@ const socketsLive = async function (req, res) {
let connected_sockets = await io.in(peerId).fetchSockets();
for (let item of connected_sockets) {
if (item.handshake.query.identity === IDENTITIES.session) {
liveSessions[projectKey] = liveSessions[projectKey] || [];
if (hasFilters(filters)) {
liveSessionsPerProject[projectKey] = liveSessionsPerProject[projectKey] || new Set();
if (withFilters) {
if (item.handshake.query.sessionInfo && isValidSession(item.handshake.query.sessionInfo, filters.filter)) {
liveSessions[projectKey].push(item.handshake.query.sessionInfo);
liveSessionsPerProject[projectKey].add(item.handshake.query.sessionInfo);
}
} else {
liveSessions[projectKey].push(item.handshake.query.sessionInfo);
liveSessionsPerProject[projectKey].add(item.handshake.query.sessionInfo);
}
}
}
}
}
let liveSessions = {};
liveSessionsPerProject.forEach((sessions, projectId) => {
liveSessions[projectId] = Array.from(sessions);
});
respond(res, sortPaginate(liveSessions, filters));
}
@ -130,29 +142,29 @@ const socketsLiveByProject = async function (req, res) {
let _projectKey = extractProjectKeyFromRequest(req);
let _sessionId = extractSessionIdFromRequest(req);
let filters = await extractPayloadFromRequest(req);
let liveSessions = {};
let withFilters = hasFilters(filters);
let liveSessions = new Set();
let rooms = await getAvailableRooms(io);
for (let peerId of rooms.keys()) {
let {projectKey, sessionId} = extractPeerId(peerId);
for (let roomId of rooms.keys()) {
let {projectKey, sessionId} = extractPeerId(roomId);
if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) {
let connected_sockets = await io.in(peerId).fetchSockets();
let connected_sockets = await io.in(roomId).fetchSockets();
for (let item of connected_sockets) {
if (item.handshake.query.identity === IDENTITIES.session) {
liveSessions[projectKey] = liveSessions[projectKey] || [];
if (hasFilters(filters)) {
if (withFilters) {
if (item.handshake.query.sessionInfo && isValidSession(item.handshake.query.sessionInfo, filters.filter)) {
liveSessions[projectKey].push(item.handshake.query.sessionInfo);
liveSessions.add(item.handshake.query.sessionInfo);
}
} else {
liveSessions[projectKey].push(item.handshake.query.sessionInfo);
liveSessions.add(item.handshake.query.sessionInfo);
}
}
}
}
}
liveSessions[_projectKey] = liveSessions[_projectKey] || [];
respond(res, _sessionId === undefined ? sortPaginate(liveSessions[_projectKey], filters)
: liveSessions[_projectKey].length > 0 ? liveSessions[_projectKey][0]
let sessions = Array.from(liveSessions);
respond(res, _sessionId === undefined ? sortPaginate(sessions, filters)
: sessions.length > 0 ? sessions[0]
: null);
}
@ -178,10 +190,10 @@ const autocomplete = async function (req, res) {
respond(res, uniqueAutocomplete(results));
}
const findSessionSocketId = async (io, peerId) => {
const connected_sockets = await io.in(peerId).fetchSockets();
const findSessionSocketId = async (io, roomId, tabId) => {
const connected_sockets = await io.in(roomId).fetchSockets();
for (let item of connected_sockets) {
if (item.handshake.query.identity === IDENTITIES.session) {
if (item.handshake.query.identity === IDENTITIES.session && item.tabId === tabId) {
return item.id;
}
}
@ -191,8 +203,8 @@ const findSessionSocketId = async (io, peerId) => {
async function sessions_agents_count(io, socket) {
let c_sessions = 0, c_agents = 0;
const rooms = await getAvailableRooms(io);
if (rooms.get(socket.peerId)) {
const connected_sockets = await io.in(socket.peerId).fetchSockets();
if (rooms.get(socket.roomId)) {
const connected_sockets = await io.in(socket.roomId).fetchSockets();
for (let item of connected_sockets) {
if (item.handshake.query.identity === IDENTITIES.session) {
@ -211,8 +223,8 @@ async function sessions_agents_count(io, socket) {
async function get_all_agents_ids(io, socket) {
let agents = [];
const rooms = await getAvailableRooms(io);
if (rooms.get(socket.peerId)) {
const connected_sockets = await io.in(socket.peerId).fetchSockets();
if (rooms.get(socket.roomId)) {
const connected_sockets = await io.in(socket.roomId).fetchSockets();
for (let item of connected_sockets) {
if (item.handshake.query.identity === IDENTITIES.agent) {
agents.push(item.id);
@ -246,6 +258,8 @@ module.exports = {
debug && console.log(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`);
socket._connectedAt = new Date();
socket.peerId = socket.handshake.query.peerId;
socket.roomId = extractRoomId(socket.peerId);
socket.tabId = extractTabId(socket.peerId);
socket.identity = socket.handshake.query.identity;
let {c_sessions, c_agents} = await sessions_agents_count(io, socket);
if (socket.identity === IDENTITIES.session) {
@ -259,42 +273,42 @@ module.exports = {
debug && console.log(`notifying new session about agent-existence`);
let agents_ids = await get_all_agents_ids(io, socket);
io.to(socket.id).emit(EVENTS_DEFINITION.emit.AGENTS_CONNECTED, agents_ids);
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id);
socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id);
}
} else if (c_sessions <= 0) {
debug && console.log(`notifying new agent about no SESSIONS with peerId:${socket.peerId}`);
io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
}
await socket.join(socket.peerId);
await socket.join(socket.roomId);
const rooms = await getAvailableRooms(io);
if (rooms.get(socket.peerId)) {
debug && console.log(`${socket.id} joined room:${socket.peerId}, as:${socket.identity}, members:${rooms.get(socket.peerId).size}`);
if (rooms.get(socket.roomId)) {
debug && console.log(`${socket.id} joined room:${socket.roomId}, as:${socket.identity}, members:${rooms.get(socket.roomId).size}`);
}
if (socket.identity === IDENTITIES.agent) {
if (socket.handshake.query.agentInfo !== undefined) {
socket.handshake.query.agentInfo = JSON.parse(socket.handshake.query.agentInfo);
}
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo);
socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo);
}
socket.on('disconnect', async () => {
debug && console.log(`${socket.id} disconnected from ${socket.peerId}`);
debug && console.log(`${socket.id} disconnected from ${socket.roomId}`);
if (socket.identity === IDENTITIES.agent) {
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id);
socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id);
}
debug && console.log("checking for number of connected agents and sessions");
let {c_sessions, c_agents} = await sessions_agents_count(io, socket);
if (c_sessions === -1 && c_agents === -1) {
debug && console.log(`room not found: ${socket.peerId}`);
debug && console.log(`room not found: ${socket.roomId}`);
}
if (c_sessions === 0) {
debug && console.log(`notifying everyone in ${socket.peerId} about no SESSIONS`);
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
debug && console.log(`notifying everyone in ${socket.roomId} about no SESSIONS`);
socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
}
if (c_agents === 0) {
debug && console.log(`notifying everyone in ${socket.peerId} about no AGENTS`);
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NO_AGENTS);
debug && console.log(`notifying everyone in ${socket.roomId} about no AGENTS`);
socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_AGENTS);
}
});
@ -305,7 +319,7 @@ module.exports = {
return
}
socket.handshake.query.sessionInfo = {...socket.handshake.query.sessionInfo, ...args[0]};
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.UPDATE_EVENT, args[0]);
socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.UPDATE_EVENT, args[0]);
});
socket.on(EVENTS_DEFINITION.listen.CONNECT_ERROR, err => errorHandler(EVENTS_DEFINITION.listen.CONNECT_ERROR, err));
@ -317,17 +331,23 @@ module.exports = {
return
}
if (socket.identity === IDENTITIES.session) {
debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.peerId}`);
debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.roomId}`);
// TODO: emit message to all agents in the room (except tabs)
socket.to(socket.peerId).emit(eventName, args[0]);
} else {
debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to session of room:${socket.peerId}`);
let socketId = await findSessionSocketId(io, socket.peerId);
debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to session of room:${socket.roomId}`);
// TODO: new message structure: {meta: {tabId: string::id}, data: xxx -> args[0]}
if (args[0].meta === undefined && args[0].data === undefined) {
debug && console.log(`received event:${eventName}, from:${socket.identity}, but message structure is wrong, stopping onAny.`);
return
}
let socketId = await findSessionSocketId(io, socket.roomId, args[0].meta.tabId);
if (socketId === null) {
debug && console.log(`session not found for:${socket.peerId}`);
debug && console.log(`session not found for:${socket.roomId}`);
io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
} else {
debug && console.log("message sent");
io.to(socketId).emit(eventName, socket.id, args[0]);
io.to(socketId).emit(eventName, socket.id, args[0].data);
}
}
});
@ -342,7 +362,7 @@ module.exports = {
const arr = Array.from(rooms);
const filtered = arr.filter(room => !room[1].has(room[0]));
for (let i of filtered) {
let {projectKey, sessionId} = extractPeerId(i[0]);
let {projectKey, sessionId, tabId} = extractPeerId(i[0]);
if (projectKey !== null && sessionId !== null) {
count++;
}

View file

@ -1,8 +1,22 @@
let PROJECT_KEY_LENGTH = parseInt(process.env.PROJECT_KEY_LENGTH) || 20;
let debug = process.env.debug === "1" || false;
const extractRoomId = (peerId) => {
let {projectKey, sessionId, tabId} = extractPeerId(peerId);
if (projectKey && sessionId) {
return `${projectKey}-${sessionId}`;
}
return null;
}
const extractTabId = (peerId) => {
let {projectKey, sessionId, tabId} = extractPeerId(peerId);
if (tabId) {
return tabId;
}
return null;
}
const extractPeerId = (peerId) => {
let splited = peerId.split("-");
if (splited.length !== 2) {
if (splited.length < 2 || splited.length > 3) {
debug && console.error(`cannot split peerId: ${peerId}`);
return {};
}
@ -10,7 +24,10 @@ const extractPeerId = (peerId) => {
debug && console.error(`wrong project key length for peerId: ${peerId}`);
return {};
}
return {projectKey: splited[0], sessionId: splited[1]};
if (splited.length === 2) {
return {projectKey: splited[0], sessionId: splited[1], tabId: null};
}
return {projectKey: splited[0], sessionId: splited[1], tabId: splited[2]};
};
const request_logger = (identity) => {
return (req, res, next) => {
@ -246,6 +263,8 @@ const getCompressionConfig = function () {
}
module.exports = {
transformFilters,
extractRoomId,
extractTabId,
extractPeerId,
request_logger,
getValidAttributes,