Assist optimisation (#1802)

* feat(assist): removed legacy endpoints

* feat(assist): added in-memory cache

* feat(assist): fixed 'string, not object' issue

* feat(assist): reafactored socketsLiveBySession method

* feat(assist): rewrote autocomplete

* feat(assist): fixed issue with empty session

* feat(assist): removed copy/paste code

* feat(assist): added ee solo mode
This commit is contained in:
Alexander 2024-01-02 14:22:11 +01:00 committed by GitHub
parent aea53f75fb
commit 716dc6a2ff
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 375 additions and 365 deletions

View file

@ -1,4 +1,7 @@
FROM node:20-alpine #ARCH can be amd64 or arm64
ARG ARCH=amd64
FROM --platform=linux/$ARCH node:20-alpine
LABEL Maintainer="KRAIEM Taha Yassine<tahayk2@gmail.com>" LABEL Maintainer="KRAIEM Taha Yassine<tahayk2@gmail.com>"
RUN apk add --no-cache tini RUN apk add --no-cache tini

View file

@ -1,14 +1,5 @@
const express = require('express'); const express = require('express');
const { const {
extractPeerId,
hasFilters,
isValidSession,
extractPayloadFromRequest,
sortPaginate,
getAvailableRooms,
} = require('../utils/helper');
const {
IDENTITIES,
socketConnexionTimeout, socketConnexionTimeout,
authorizer authorizer
} = require('../utils/assistHelper'); } = require('../utils/assistHelper');
@ -19,66 +10,21 @@ const {
createSocketIOServer createSocketIOServer
} = require('../utils/wsServer'); } = require('../utils/wsServer');
const { const {
respond,
socketsList,
socketsListByProject, socketsListByProject,
socketsLiveByProject, socketsLiveByProject,
socketsLiveBySession, socketsLiveBySession,
autocomplete autocomplete
} = require('../utils/httpHandlers'); } = require('../utils/httpHandlers');
let io;
const wsRouter = express.Router(); const wsRouter = express.Router();
wsRouter.get(`/sockets-list/:projectKey/autocomplete`, autocomplete); // autocomplete
const debug_log = process.env.debug === "1"; wsRouter.get(`/sockets-list/:projectKey/:sessionId`, socketsListByProject); // is_live
wsRouter.get(`/sockets-live/:projectKey/autocomplete`, autocomplete); // not using
const socketsLive = async function (req, res) {
res.handlerName = 'socketsLive';
debug_log && console.log("[WS]looking for all available LIVE sessions");
let filters = await extractPayloadFromRequest(req, res);
let withFilters = hasFilters(filters);
let liveSessionsPerProject = {};
let rooms = await getAvailableRooms(io);
for (let roomId of rooms.keys()) {
let {projectKey} = extractPeerId(roomId);
if (projectKey !== undefined) {
let connected_sockets = await io.in(roomId).fetchSockets();
for (let item of connected_sockets) {
if (item.handshake.query.identity === IDENTITIES.session) {
liveSessionsPerProject[projectKey] = liveSessionsPerProject[projectKey] || new Set();
if (withFilters) {
if (item.handshake.query.sessionInfo && isValidSession(item.handshake.query.sessionInfo, filters.filter)) {
liveSessionsPerProject[projectKey].add(item.handshake.query.sessionInfo);
}
} else {
liveSessionsPerProject[projectKey].add(item.handshake.query.sessionInfo);
}
}
}
}
}
let liveSessions = {};
liveSessionsPerProject.forEach((sessions, projectId) => {
liveSessions[projectId] = Array.from(sessions);
});
respond(req, res, sortPaginate(liveSessions, filters));
}
wsRouter.get(`/sockets-list`, socketsList);
wsRouter.post(`/sockets-list`, socketsList);
wsRouter.get(`/sockets-list/:projectKey/autocomplete`, autocomplete);
wsRouter.get(`/sockets-list/:projectKey`, socketsListByProject);
wsRouter.post(`/sockets-list/:projectKey`, socketsListByProject);
wsRouter.get(`/sockets-list/:projectKey/:sessionId`, socketsListByProject);
wsRouter.get(`/sockets-live`, socketsLive);
wsRouter.post(`/sockets-live`, socketsLive);
wsRouter.get(`/sockets-live/:projectKey/autocomplete`, autocomplete);
wsRouter.get(`/sockets-live/:projectKey`, socketsLiveByProject); wsRouter.get(`/sockets-live/:projectKey`, socketsLiveByProject);
wsRouter.post(`/sockets-live/:projectKey`, socketsLiveByProject); wsRouter.post(`/sockets-live/:projectKey`, socketsLiveByProject); // assist search
wsRouter.get(`/sockets-live/:projectKey/:sessionId`, socketsLiveBySession); wsRouter.get(`/sockets-live/:projectKey/:sessionId`, socketsLiveBySession); // session_exists, get_live_session_by_id
let io;
module.exports = { module.exports = {
wsRouter, wsRouter,
start: (server, prefix) => { start: (server, prefix) => {
@ -87,13 +33,10 @@ module.exports = {
io.on('connection', (socket) => onConnect(socket)); io.on('connection', (socket) => onConnect(socket));
console.log("WS server started"); console.log("WS server started");
socketConnexionTimeout(io); socketConnexionTimeout(io);
}, },
handlers: { handlers: {
socketsList,
socketsListByProject, socketsListByProject,
socketsLive,
socketsLiveByProject, socketsLiveByProject,
socketsLiveBySession socketsLiveBySession
} }

View file

@ -120,6 +120,10 @@ const hasFilters = function (filters) {
return filters && filters.filter && Object.keys(filters.filter).length > 0; return filters && filters.filter && Object.keys(filters.filter).length > 0;
} }
const hasQuery = function (filters) {
return filters && filters.query && Object.keys(filters.query).length > 0;
}
const objectToObjectOfArrays = function (obj) { const objectToObjectOfArrays = function (obj) {
let _obj = {} let _obj = {}
if (obj) { if (obj) {
@ -161,7 +165,7 @@ const extractPayloadFromRequest = async function (req, res) {
"filter": {}, // for sessions search "filter": {}, // for sessions search
"sort": { "sort": {
"key": req.body.sort && req.body.sort.key ? req.body.sort.key : undefined, "key": req.body.sort && req.body.sort.key ? req.body.sort.key : undefined,
"order": req.body.sort && req.body.sort.order === "DESC" "order": req.body.sort && req.body.sort?.order.toLowerCase() === "desc"
}, },
"pagination": { "pagination": {
"limit": req.body.pagination && req.body.pagination.limit ? req.body.pagination.limit : undefined, "limit": req.body.pagination && req.body.pagination.limit ? req.body.pagination.limit : undefined,
@ -214,20 +218,18 @@ const sortPaginate = function (list, filters) {
} }
const total = list.length; const total = list.length;
list.sort((a, b) => { if (filters.sort.key && filters.sort.key !== "timestamp") {
const tA = getValue(a, "timestamp");
const tB = getValue(b, "timestamp");
return tA < tB ? 1 : tA > tB ? -1 : 0; // b - a
});
if (filters.sort.order) {
list.reverse();
}
if ((filters.sort.key || "timestamp") !== "timestamp") {
list.sort((a, b) => { list.sort((a, b) => {
const vA = getValue(a, filters.sort.key); const vA = getValue(a, filters.sort.key);
const vB = getValue(b, filters.sort.key); const vB = getValue(b, filters.sort.key);
return vA > vB ? 1 : vA < vB ? -1 : 0; return vA > vB ? 1 : vA < vB ? -1 : 0;
}); });
} else {
list.sort((a, b) => {
const tA = getValue(a, "timestamp");
const tB = getValue(b, "timestamp");
return tB - tA
});
} }
if (filters.sort.order) { if (filters.sort.order) {
list.reverse(); list.reverse();
@ -291,6 +293,7 @@ module.exports = {
extractSessionIdFromRequest, extractSessionIdFromRequest,
isValidSession, isValidSession,
hasFilters, hasFilters,
hasQuery,
objectToObjectOfArrays, objectToObjectOfArrays,
extractPayloadFromRequest, extractPayloadFromRequest,
sortPaginate, sortPaginate,

View file

@ -1,6 +1,6 @@
const { const {
hasFilters, hasFilters,
extractPeerId, hasQuery,
isValidSession, isValidSession,
sortPaginate, sortPaginate,
getValidAttributes, getValidAttributes,
@ -10,22 +10,21 @@ const {
extractProjectKeyFromRequest, extractProjectKeyFromRequest,
extractSessionIdFromRequest, extractSessionIdFromRequest,
extractPayloadFromRequest, extractPayloadFromRequest,
getAvailableRooms
} = require("./extractors"); } = require("./extractors");
const {
IDENTITIES
} = require("./assistHelper");
const {
getServer
} = require('../utils/wsServer');
const { const {
RecordRequestDuration, RecordRequestDuration,
IncreaseTotalRequests IncreaseTotalRequests
} = require('../utils/metrics'); } = require('../utils/metrics');
const {
GetRoomInfo,
GetRooms,
GetSessions,
} = require('../utils/rooms');
const debug_log = process.env.debug === "1"; const debug_log = process.env.debug === "1";
const respond = function (req, res, data) { const respond = function (req, res, data) {
console.log("responding with data: ", data)
let result = {data} let result = {data}
if (process.env.uws !== "true") { if (process.env.uws !== "true") {
res.statusCode = 200; res.statusCode = 200;
@ -39,174 +38,121 @@ const respond = function (req, res, data) {
RecordRequestDuration(req.method.toLowerCase(), res.handlerName, 200, duration/1000.0); RecordRequestDuration(req.method.toLowerCase(), res.handlerName, 200, duration/1000.0);
} }
const socketsList = async function (req, res) { const getParticularSession = function (sessionId, filters) {
res.handlerName = 'socketsList'; const sessInfo = GetRoomInfo(sessionId);
let io = getServer(); if (!sessInfo) {
debug_log && console.log("[WS]looking for all available sessions"); return null;
let filters = await extractPayloadFromRequest(req, res);
let withFilters = hasFilters(filters);
let liveSessionsPerProject = {};
let rooms = await getAvailableRooms(io);
for (let roomId of rooms.keys()) {
let {projectKey, sessionId} = extractPeerId(roomId);
if (projectKey !== undefined) {
liveSessionsPerProject[projectKey] = liveSessionsPerProject[projectKey] || new Set();
if (withFilters) {
const connected_sockets = await io.in(roomId).fetchSockets();
for (let item of connected_sockets) {
if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo
&& isValidSession(item.handshake.query.sessionInfo, filters.filter)) {
liveSessionsPerProject[projectKey].add(sessionId);
}
}
} else {
liveSessionsPerProject[projectKey].add(sessionId);
}
}
} }
let liveSessions = {}; if (!hasFilters(filters)) {
liveSessionsPerProject.forEach((sessions, projectId) => { return sessInfo;
liveSessions[projectId] = Array.from(sessions); }
}); if (isValidSession(sessInfo, filters.filter)) {
respond(req, res, liveSessions); return sessInfo;
}
return null;
} }
const getAllSessions = function (projectKey, filters, onlineOnly= false) {
const sessions = [];
const allRooms = onlineOnly ? GetSessions(projectKey) : GetRooms(projectKey);
for (let sessionId of allRooms) {
let sessInfo = GetRoomInfo(sessionId);
if (!sessInfo) {
continue;
}
if (!hasFilters(filters)) {
sessions.push(sessInfo);
continue;
}
if (isValidSession(sessInfo, filters.filter)) {
sessions.push(sessInfo);
}
}
return sessions
}
// Sort by projectKey
const socketsListByProject = async function (req, res) { const socketsListByProject = async function (req, res) {
res.handlerName = 'socketsListByProject';
let io = getServer();
debug_log && console.log("[WS]looking for available sessions"); debug_log && console.log("[WS]looking for available sessions");
let _projectKey = extractProjectKeyFromRequest(req); res.handlerName = 'socketsListByProject';
let _sessionId = extractSessionIdFromRequest(req);
let filters = await extractPayloadFromRequest(req, res); const _projectKey = extractProjectKeyFromRequest(req);
let withFilters = hasFilters(filters); const _sessionId = extractSessionIdFromRequest(req);
let liveSessions = new Set(); const filters = await extractPayloadFromRequest(req, res);
let rooms = await getAvailableRooms(io);
for (let roomId of rooms.keys()) { // find a particular session
let {projectKey, sessionId} = extractPeerId(roomId); if (_sessionId) {
if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) { return respond(req, res, getParticularSession(_sessionId, filters));
if (withFilters) {
const connected_sockets = await io.in(roomId).fetchSockets();
for (let item of connected_sockets) {
if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo
&& isValidSession(item.handshake.query.sessionInfo, filters.filter)) {
liveSessions.add(sessionId);
}
}
} else {
liveSessions.add(sessionId);
}
}
} }
let sessions = Array.from(liveSessions);
respond(req, res, _sessionId === undefined ? sortPaginate(sessions, filters) // find all sessions for a project
: sessions.length > 0 ? sessions[0] const sessions = getAllSessions(_projectKey, filters);
: null);
// send response
respond(req, res, sortPaginate(sessions, filters));
} }
// Sort by projectKey
const socketsLiveByProject = async function (req, res) { const socketsLiveByProject = async function (req, res) {
res.handlerName = 'socketsLiveByProject';
let io = getServer();
debug_log && console.log("[WS]looking for available LIVE sessions"); debug_log && console.log("[WS]looking for available LIVE sessions");
let _projectKey = extractProjectKeyFromRequest(req); res.handlerName = 'socketsLiveByProject';
let _sessionId = extractSessionIdFromRequest(req);
let filters = await extractPayloadFromRequest(req, res); const _projectKey = extractProjectKeyFromRequest(req);
let withFilters = hasFilters(filters); const _sessionId = extractSessionIdFromRequest(req);
let liveSessions = new Set(); const filters = await extractPayloadFromRequest(req, res);
const sessIDs = new Set();
let rooms = await getAvailableRooms(io); // find a particular session
for (let roomId of rooms.keys()) { if (_sessionId) {
let {projectKey, sessionId} = extractPeerId(roomId); return respond(req, res, getParticularSession(_sessionId, filters));
if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) {
let connected_sockets = await io.in(roomId).fetchSockets();
for (let item of connected_sockets) {
if (item.handshake.query.identity === IDENTITIES.session) {
if (withFilters) {
if (item.handshake.query.sessionInfo &&
isValidSession(item.handshake.query.sessionInfo, filters.filter) &&
!sessIDs.has(item.handshake.query.sessionInfo.sessionID)
) {
liveSessions.add(item.handshake.query.sessionInfo);
sessIDs.add(item.handshake.query.sessionInfo.sessionID);
}
} else {
if (!sessIDs.has(item.handshake.query.sessionInfo.sessionID)) {
liveSessions.add(item.handshake.query.sessionInfo);
sessIDs.add(item.handshake.query.sessionInfo.sessionID);
}
}
}
}
}
} }
let sessions = Array.from(liveSessions);
respond(req, res, _sessionId === undefined ? sortPaginate(sessions, filters) : sessions.length > 0 ? sessions[0] : null); // find all sessions for a project
const sessions = getAllSessions(_projectKey, filters, true);
// send response
respond(req, res, sortPaginate(sessions, filters));
} }
// Sort by roomID (projectKey+sessionId)
const socketsLiveBySession = async function (req, res) { const socketsLiveBySession = async function (req, res) {
res.handlerName = 'socketsLiveBySession';
let io = getServer();
debug_log && console.log("[WS]looking for LIVE session"); debug_log && console.log("[WS]looking for LIVE session");
let _projectKey = extractProjectKeyFromRequest(req); res.handlerName = 'socketsLiveBySession';
let _sessionId = extractSessionIdFromRequest(req);
if (_sessionId === undefined) {
return respond(req, res, null);
}
let filters = await extractPayloadFromRequest(req, res);
let withFilters = hasFilters(filters);
let liveSessions = new Set();
const sessIDs = new Set();
let connected_sockets = await io.in(_projectKey + '-' + _sessionId).fetchSockets(); const _sessionId = extractSessionIdFromRequest(req);
for (let item of connected_sockets) { const filters = await extractPayloadFromRequest(req, res);
if (item.handshake.query.identity === IDENTITIES.session) {
if (withFilters) {
if (item.handshake.query.sessionInfo &&
isValidSession(item.handshake.query.sessionInfo, filters.filter) &&
!sessIDs.has(item.handshake.query.sessionInfo.sessionID)
) {
liveSessions.add(item.handshake.query.sessionInfo);
sessIDs.add(item.handshake.query.sessionInfo.sessionID);
}
} else {
if (!sessIDs.has(item.handshake.query.sessionInfo.sessionID)) {
liveSessions.add(item.handshake.query.sessionInfo);
sessIDs.add(item.handshake.query.sessionInfo.sessionID);
}
}
}
// find a particular session
if (_sessionId) {
return respond(req, res, getParticularSession(_sessionId, filters));
} }
let sessions = Array.from(liveSessions); return respond(req, res, null);
respond(req, res, sessions.length > 0 ? sessions[0] : null);
} }
// Sort by projectKey
const autocomplete = async function (req, res) { const autocomplete = async function (req, res) {
res.handlerName = 'autocomplete';
let io = getServer();
debug_log && console.log("[WS]autocomplete"); debug_log && console.log("[WS]autocomplete");
let _projectKey = extractProjectKeyFromRequest(req); res.handlerName = 'autocomplete';
let filters = await extractPayloadFromRequest(req);
const _projectKey = extractProjectKeyFromRequest(req);
const filters = await extractPayloadFromRequest(req);
let results = []; let results = [];
if (filters.query && Object.keys(filters.query).length > 0) { if (!hasQuery(filters)) {
let rooms = await getAvailableRooms(io); return respond(req, res, results);
for (let roomId of rooms.keys()) { }
let {projectKey} = extractPeerId(roomId); let allSessions = GetSessions(_projectKey);
if (projectKey === _projectKey) { for (let sessionId of allSessions) {
let connected_sockets = await io.in(roomId).fetchSockets(); let sessInfo = GetRoomInfo(sessionId);
for (let item of connected_sockets) { if (!sessInfo) {
if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo) { continue;
results = [...results, ...getValidAttributes(item.handshake.query.sessionInfo, filters.query)];
}
}
}
} }
results = [...results, ...getValidAttributes(sessInfo, filters.query)];
} }
respond(req, res, uniqueAutocomplete(results)); respond(req, res, uniqueAutocomplete(results));
} }
module.exports = { module.exports = {
respond, respond,
socketsList,
socketsListByProject, socketsListByProject,
socketsLiveByProject, socketsLiveByProject,
socketsLiveBySession, socketsLiveBySession,

49
assist/utils/rooms.js Normal file
View file

@ -0,0 +1,49 @@
const roomsInfo = new Map(); // sessionID -> sessionInfo
const projectSessions = new Map(); // projectKey -> Set(sessionIDs) // all rooms (even with agent only)
const projectRooms = new Map(); // projectKey -> Set(roomIDs) // online rooms
function AddRoom(projKey, sessID, sessInfo) {
roomsInfo.set(sessID, sessInfo);
if (!projectRooms.has(projKey)) {
projectRooms.set(projKey, new Set());
}
projectRooms.get(projKey).add(sessID);
if (!projectSessions.has(projKey)) {
projectSessions.set(projKey, new Set());
}
projectSessions.get(projKey).add(sessID);
}
function UpdateRoom(sessID, sessInfo) {
roomsInfo.set(sessID, sessInfo);
}
function DeleteSession(projKey, sessID) {
projectSessions.get(projKey)?.delete(sessID);
}
function DeleteRoom(projKey, sessID) {
projectRooms.get(projKey)?.delete(sessID);
}
function GetRoomInfo(sessID) {
return roomsInfo.get(sessID);
}
function GetRooms(projectKey) {
return projectRooms.get(projectKey) || new Set();
}
function GetSessions(projectKey) {
return projectSessions.get(projectKey) || new Set();
}
module.exports = {
AddRoom,
UpdateRoom,
DeleteRoom,
DeleteSession,
GetRoomInfo,
GetRooms,
GetSessions,
}

View file

@ -23,6 +23,12 @@ const {
IncreaseOnlineRooms, IncreaseOnlineRooms,
DecreaseOnlineRooms, DecreaseOnlineRooms,
} = require('../utils/metrics'); } = require('../utils/metrics');
const {
AddRoom,
UpdateRoom,
DeleteRoom,
DeleteSession,
} = require('../utils/rooms');
const debug_log = process.env.debug === "1"; const debug_log = process.env.debug === "1";
const error_log = process.env.ERROR === "1"; const error_log = process.env.ERROR === "1";
@ -95,12 +101,13 @@ async function onConnect(socket) {
} }
} }
} }
extractSessionInfo(socket);
if (tabsCount < 0) { if (tabsCount < 0) {
// New session creates new room // New session creates new room
IncreaseTotalRooms(); IncreaseTotalRooms();
IncreaseOnlineRooms(); IncreaseOnlineRooms();
AddRoom(socket.projectKey, socket.sessId, socket.handshake.query.sessionInfo);
} }
extractSessionInfo(socket);
// Inform all connected agents about reconnected session // Inform all connected agents about reconnected session
if (agentsCount > 0) { if (agentsCount > 0) {
debug_log && console.log(`notifying new session about agent-existence`); debug_log && console.log(`notifying new session about agent-existence`);
@ -161,11 +168,14 @@ async function onDisconnect(socket) {
if (tabsCount === -1 && agentsCount === -1) { if (tabsCount === -1 && agentsCount === -1) {
DecreaseOnlineRooms(); DecreaseOnlineRooms();
debug_log && console.log(`room not found: ${socket.roomId}`); debug_log && console.log(`room not found: ${socket.roomId}`);
DeleteSession(socket.projectKey, socket.sessId);
DeleteRoom(socket.projectKey, socket.sessId);
return; return;
} }
if (tabsCount === 0) { if (tabsCount === 0) {
debug_log && console.log(`notifying everyone in ${socket.roomId} about no SESSIONS`); debug_log && console.log(`notifying everyone in ${socket.roomId} about no SESSIONS`);
socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
DeleteSession(socket.projectKey, socket.sessId);
} }
if (agentsCount === 0) { if (agentsCount === 0) {
debug_log && console.log(`notifying everyone in ${socket.roomId} about no AGENTS`); debug_log && console.log(`notifying everyone in ${socket.roomId} about no AGENTS`);
@ -182,6 +192,7 @@ async function onUpdateEvent(socket, ...args) {
args[0] = updateSessionData(socket, args[0]) args[0] = updateSessionData(socket, args[0])
Object.assign(socket.handshake.query.sessionInfo, args[0].data, {tabId: args[0]?.meta?.tabId}); Object.assign(socket.handshake.query.sessionInfo, args[0].data, {tabId: args[0]?.meta?.tabId});
UpdateRoom(socket.sessId, socket.handshake.query.sessionInfo);
// Update sessionInfo for all agents in the room // Update sessionInfo for all agents in the room
const io = getServer(); const io = getServer();

View file

@ -1,6 +1,10 @@
FROM node:18-alpine #ARCH can be amd64 or arm64
ARG ARCH=amd64
FROM --platform=linux/$ARCH node:20-alpine
LABEL Maintainer="KRAIEM Taha Yassine<tahayk2@gmail.com>" LABEL Maintainer="KRAIEM Taha Yassine<tahayk2@gmail.com>"
RUN apk add --no-cache tini git libc6-compat && ln -s /lib/libc.musl-x86_64.so.1 /lib/ld-linux-x86-64.so.2 RUN apk add --no-cache tini git libc6-compat
# && ln -s /lib/libc.musl-x86_64.so.1 /lib/ld-linux-x86-64.so.2
ARG envarg ARG envarg
ENV ENTERPRISE_BUILD=${envarg} \ ENV ENTERPRISE_BUILD=${envarg} \

View file

@ -92,15 +92,9 @@ if (process.env.uws !== "true") {
return fn(req, res); return fn(req, res);
} }
} }
uapp.get(`${PREFIX}/${P_KEY}/sockets-list`, uWrapper(socket.handlers.socketsList));
uapp.post(`${PREFIX}/${P_KEY}/sockets-list`, uWrapper(socket.handlers.socketsList));
uapp.get(`${PREFIX}/${P_KEY}/sockets-list/:projectKey/autocomplete`, uWrapper(socket.handlers.autocomplete));
uapp.get(`${PREFIX}/${P_KEY}/sockets-list/:projectKey`, uWrapper(socket.handlers.socketsListByProject));
uapp.post(`${PREFIX}/${P_KEY}/sockets-list/:projectKey`, uWrapper(socket.handlers.socketsListByProject));
uapp.get(`${PREFIX}/${P_KEY}/sockets-list/:projectKey/:sessionId`, uWrapper(socket.handlers.socketsListByProject));
uapp.get(`${PREFIX}/${P_KEY}/sockets-live`, uWrapper(socket.handlers.socketsLive)); uapp.get(`${PREFIX}/${P_KEY}/sockets-list/:projectKey/autocomplete`, uWrapper(socket.handlers.autocomplete));
uapp.post(`${PREFIX}/${P_KEY}/sockets-live`, uWrapper(socket.handlers.socketsLive)); uapp.get(`${PREFIX}/${P_KEY}/sockets-list/:projectKey/:sessionId`, uWrapper(socket.handlers.socketsListByProject));
uapp.get(`${PREFIX}/${P_KEY}/sockets-live/:projectKey/autocomplete`, uWrapper(socket.handlers.autocomplete)); uapp.get(`${PREFIX}/${P_KEY}/sockets-live/:projectKey/autocomplete`, uWrapper(socket.handlers.autocomplete));
uapp.get(`${PREFIX}/${P_KEY}/sockets-live/:projectKey`, uWrapper(socket.handlers.socketsLiveByProject)); uapp.get(`${PREFIX}/${P_KEY}/sockets-live/:projectKey`, uWrapper(socket.handlers.socketsLiveByProject));
uapp.post(`${PREFIX}/${P_KEY}/sockets-live/:projectKey`, uWrapper(socket.handlers.socketsLiveByProject)); uapp.post(`${PREFIX}/${P_KEY}/sockets-live/:projectKey`, uWrapper(socket.handlers.socketsLiveByProject));

View file

@ -1,19 +1,8 @@
const express = require('express'); const express = require('express');
const { const {
extractPeerId,
hasFilters,
isValidSession,
sortPaginate,
} = require('../utils/helper');
const {
IDENTITIES,
socketConnexionTimeout, socketConnexionTimeout,
authorizer authorizer
} = require('../utils/assistHelper'); } = require('../utils/assistHelper');
const {
extractPayloadFromRequest,
getAvailableRooms
} = require('../utils/helper-ee');
const { const {
createSocketIOServer createSocketIOServer
} = require('../utils/wsServer'); } = require('../utils/wsServer');
@ -21,78 +10,30 @@ const {
onConnect onConnect
} = require('../utils/socketHandlers'); } = require('../utils/socketHandlers');
const { const {
respond,
socketsList,
socketsListByProject, socketsListByProject,
socketsLiveByProject, socketsLiveByProject,
socketsLiveBySession, socketsLiveBySession,
autocomplete autocomplete
} = require('../utils/httpHandlers'); } = require('../utils/httpHandlers-cluster');
const {createAdapter} = require("@socket.io/redis-adapter"); const {createAdapter} = require("@socket.io/redis-adapter");
const {createClient} = require("redis"); const {createClient} = require("redis");
const wsRouter = express.Router();
const REDIS_URL = (process.env.REDIS_URL || "localhost:6379").replace(/((^\w+:|^)\/\/|^)/, 'redis://'); const REDIS_URL = (process.env.REDIS_URL || "localhost:6379").replace(/((^\w+:|^)\/\/|^)/, 'redis://');
const pubClient = createClient({url: REDIS_URL}); const pubClient = createClient({url: REDIS_URL});
const subClient = pubClient.duplicate(); const subClient = pubClient.duplicate();
console.log(`Using Redis: ${REDIS_URL}`); console.log(`Using Redis: ${REDIS_URL}`);
let io;
const debug_log = process.env.debug === "1"; const debug_log = process.env.debug === "1";
const socketsLive = async function (req, res) { const wsRouter = express.Router();
res.handlerName = 'socketsLive'; wsRouter.get(`/sockets-list/:projectKey/autocomplete`, autocomplete); // autocomplete
debug_log && console.log("[WS]looking for all available LIVE sessions"); wsRouter.get(`/sockets-list/:projectKey/:sessionId`, socketsListByProject); // is_live
let filters = await extractPayloadFromRequest(req, res); wsRouter.get(`/sockets-live/:projectKey/autocomplete`, autocomplete); // not using
let withFilters = hasFilters(filters);
let liveSessionsPerProject = {};
const sessIDs = new Set();
let rooms = await getAvailableRooms(io);
for (let roomId of rooms.keys()) {
let {projectKey} = extractPeerId(roomId);
if (projectKey !== undefined) {
let connected_sockets = await io.in(roomId).fetchSockets();
for (let item of connected_sockets) {
if (item.handshake.query.identity === IDENTITIES.session) {
liveSessionsPerProject[projectKey] = liveSessionsPerProject[projectKey] || new Set();
if (withFilters) {
if (item.handshake.query.sessionInfo &&
isValidSession(item.handshake.query.sessionInfo, filters.filter) &&
!sessIDs.has(item.handshake.query.sessionInfo.sessionID)
) {
liveSessionsPerProject[projectKey].add(item.handshake.query.sessionInfo);
sessIDs.add(item.handshake.query.sessionInfo.sessionID);
}
} else {
if (!sessIDs.has(item.handshake.query.sessionInfo.sessionID)) {
liveSessionsPerProject[projectKey].add(item.handshake.query.sessionInfo);
sessIDs.add(item.handshake.query.sessionInfo.sessionID);
}
}
}
}
}
}
let liveSessions = {};
liveSessionsPerProject.forEach((sessions, projectId) => {
liveSessions[projectId] = Array.from(sessions);
});
respond(req, res, sortPaginate(liveSessions, filters));
}
wsRouter.get(`/sockets-list`, socketsList);
wsRouter.post(`/sockets-list`, socketsList);
wsRouter.get(`/sockets-list/:projectKey/autocomplete`, autocomplete);
wsRouter.get(`/sockets-list/:projectKey`, socketsListByProject);
wsRouter.post(`/sockets-list/:projectKey`, socketsListByProject);
wsRouter.get(`/sockets-list/:projectKey/:sessionId`, socketsListByProject);
wsRouter.get(`/sockets-live`, socketsLive);
wsRouter.post(`/sockets-live`, socketsLive);
wsRouter.get(`/sockets-live/:projectKey/autocomplete`, autocomplete);
wsRouter.get(`/sockets-live/:projectKey`, socketsLiveByProject); wsRouter.get(`/sockets-live/:projectKey`, socketsLiveByProject);
wsRouter.post(`/sockets-live/:projectKey`, socketsLiveByProject); wsRouter.post(`/sockets-live/:projectKey`, socketsLiveByProject); // assist search
wsRouter.get(`/sockets-live/:projectKey/:sessionId`, socketsLiveBySession); wsRouter.get(`/sockets-live/:projectKey/:sessionId`, socketsLiveBySession); // session_exists, get_live_session_by_id
let io;
module.exports = { module.exports = {
wsRouter, wsRouter,
start: (server, prefix) => { start: (server, prefix) => {
@ -117,9 +58,7 @@ module.exports = {
}); });
}, },
handlers: { handlers: {
socketsList,
socketsListByProject, socketsListByProject,
socketsLive,
socketsLiveByProject, socketsLiveByProject,
socketsLiveBySession, socketsLiveBySession,
autocomplete autocomplete

View file

@ -1,16 +1,5 @@
const express = require('express'); const express = require('express');
const { const {
extractPeerId,
hasFilters,
isValidSession,
sortPaginate,
} = require('../utils/helper');
const {
extractPayloadFromRequest,
getAvailableRooms
} = require('../utils/helper-ee');
const {
IDENTITIES,
socketConnexionTimeout, socketConnexionTimeout,
authorizer authorizer
} = require('../utils/assistHelper'); } = require('../utils/assistHelper');
@ -21,8 +10,6 @@ const {
onConnect onConnect
} = require('../utils/socketHandlers'); } = require('../utils/socketHandlers');
const { const {
respond,
socketsList,
socketsListByProject, socketsListByProject,
socketsLiveByProject, socketsLiveByProject,
socketsLiveBySession, socketsLiveBySession,
@ -30,56 +17,14 @@ const {
} = require('../utils/httpHandlers'); } = require('../utils/httpHandlers');
const wsRouter = express.Router(); const wsRouter = express.Router();
wsRouter.get(`/sockets-list/:projectKey/autocomplete`, autocomplete); // autocomplete
wsRouter.get(`/sockets-list/:projectKey/:sessionId`, socketsListByProject); // is_live
wsRouter.get(`/sockets-live/:projectKey/autocomplete`, autocomplete); // not using
wsRouter.get(`/sockets-live/:projectKey`, socketsLiveByProject);
wsRouter.post(`/sockets-live/:projectKey`, socketsLiveByProject); // assist search
wsRouter.get(`/sockets-live/:projectKey/:sessionId`, socketsLiveBySession); // session_exists, get_live_session_by_id
let io; let io;
const debug_log = process.env.debug === "1";
const socketsLive = async function (req, res) {
res.handlerName = 'socketsLive';
debug_log && console.log("[WS]looking for all available LIVE sessions");
let filters = await extractPayloadFromRequest(req, res);
let withFilters = hasFilters(filters);
let liveSessionsPerProject = {};
let rooms = await getAvailableRooms(io);
for (let roomId of rooms.keys()) {
let {projectKey} = extractPeerId(roomId);
if (projectKey !== undefined) {
let connected_sockets = await io.in(roomId).fetchSockets();
for (let item of connected_sockets) {
if (item.handshake.query.identity === IDENTITIES.session) {
liveSessionsPerProject[projectKey] = liveSessionsPerProject[projectKey] || new Set();
if (withFilters) {
if (item.handshake.query.sessionInfo && isValidSession(item.handshake.query.sessionInfo, filters.filter)) {
liveSessionsPerProject[projectKey].add(item.handshake.query.sessionInfo);
}
} else {
liveSessionsPerProject[projectKey].add(item.handshake.query.sessionInfo);
}
}
}
}
}
let liveSessions = {};
liveSessionsPerProject.forEach((sessions, projectId) => {
liveSessions[projectId] = Array.from(sessions);
});
respond(req, res, sortPaginate(liveSessions, filters));
}
wsRouter.get(`/sockets-list`, socketsList);
wsRouter.post(`/sockets-list`, socketsList);
wsRouter.get(`/sockets-list/:projectKey/autocomplete`, autocomplete);
wsRouter.get(`/sockets-list/:projectKey`, socketsListByProject);
wsRouter.post(`/sockets-list/:projectKey`, socketsListByProject);
wsRouter.get(`/sockets-list/:projectKey/:sessionId`, socketsListByProject);
wsRouter.get(`/sockets-live`, socketsLive);
wsRouter.post(`/sockets-live`, socketsLive);
wsRouter.get(`/sockets-live/:projectKey/autocomplete`, autocomplete);
wsRouter.get(`/sockets-live/:projectKey`, socketsLiveByProject);
wsRouter.post(`/sockets-live/:projectKey`, socketsLiveByProject);
wsRouter.get(`/sockets-live/:projectKey/:sessionId`, socketsLiveBySession);
module.exports = { module.exports = {
wsRouter, wsRouter,
start: (server, prefix) => { start: (server, prefix) => {
@ -88,13 +33,10 @@ module.exports = {
io.on('connection', (socket) => onConnect(socket)); io.on('connection', (socket) => onConnect(socket));
console.log("WS server started"); console.log("WS server started");
socketConnexionTimeout(io); socketConnexionTimeout(io);
}, },
handlers: { handlers: {
socketsList,
socketsListByProject, socketsListByProject,
socketsLive,
socketsLiveByProject, socketsLiveByProject,
socketsLiveBySession, socketsLiveBySession,
autocomplete autocomplete

View file

@ -0,0 +1,176 @@
const {
hasFilters,
extractPeerId,
isValidSession,
sortPaginate,
getValidAttributes,
uniqueAutocomplete
} = require("./helper");
const {
extractProjectKeyFromRequest,
extractSessionIdFromRequest,
extractPayloadFromRequest,
getAvailableRooms
} = require("./extractors");
const {
IDENTITIES
} = require("./assistHelper");
const {
getServer
} = require('../utils/wsServer');
const {
RecordRequestDuration,
IncreaseTotalRequests
} = require('../utils/metrics');
const debug_log = process.env.debug === "1";
const respond = function (req, res, data) {
let result = {data}
if (process.env.uws !== "true") {
res.statusCode = 200;
res.setHeader('Content-Type', 'application/json');
res.end(JSON.stringify(result));
} else {
res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify(result));
}
const duration = performance.now() - req.startTs;
IncreaseTotalRequests();
RecordRequestDuration(req.method.toLowerCase(), res.handlerName, 200, duration/1000.0);
}
const socketsListByProject = async function (req, res) {
debug_log && console.log("[WS]looking for available sessions");
res.handlerName = 'socketsListByProject';
let io = getServer();
let _projectKey = extractProjectKeyFromRequest(req);
let _sessionId = extractSessionIdFromRequest(req);
if (_sessionId === undefined) {
return respond(req, res, null);
}
let filters = await extractPayloadFromRequest(req, res);
let connected_sockets = await io.in(_projectKey + '-' + _sessionId).fetchSockets();
for (let item of connected_sockets) {
if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo
&& isValidSession(item.handshake.query.sessionInfo, filters.filter)) {
return respond(req, res, _sessionId);
}
}
respond(req, res, null);
}
const socketsLiveByProject = async function (req, res) {
debug_log && console.log("[WS]looking for available LIVE sessions");
res.handlerName = 'socketsLiveByProject';
let io = getServer();
let _projectKey = extractProjectKeyFromRequest(req);
let _sessionId = extractSessionIdFromRequest(req);
let filters = await extractPayloadFromRequest(req, res);
let withFilters = hasFilters(filters);
let liveSessions = new Set();
const sessIDs = new Set();
let rooms = await getAvailableRooms(io);
for (let roomId of rooms.keys()) {
let {projectKey, sessionId} = extractPeerId(roomId);
if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) {
let connected_sockets = await io.in(roomId).fetchSockets();
for (let item of connected_sockets) {
if (item.handshake.query.identity === IDENTITIES.session) {
if (withFilters) {
if (item.handshake.query.sessionInfo &&
isValidSession(item.handshake.query.sessionInfo, filters.filter) &&
!sessIDs.has(item.handshake.query.sessionInfo.sessionID)
) {
liveSessions.add(item.handshake.query.sessionInfo);
sessIDs.add(item.handshake.query.sessionInfo.sessionID);
}
} else {
if (!sessIDs.has(item.handshake.query.sessionInfo.sessionID)) {
liveSessions.add(item.handshake.query.sessionInfo);
sessIDs.add(item.handshake.query.sessionInfo.sessionID);
}
}
}
}
}
}
let sessions = Array.from(liveSessions);
respond(req, res, _sessionId === undefined ? sortPaginate(sessions, filters) : sessions.length > 0 ? sessions[0] : null);
}
const socketsLiveBySession = async function (req, res) {
debug_log && console.log("[WS]looking for LIVE session");
res.handlerName = 'socketsLiveBySession';
let io = getServer();
let _projectKey = extractProjectKeyFromRequest(req);
let _sessionId = extractSessionIdFromRequest(req);
if (_sessionId === undefined) {
return respond(req, res, null);
}
let filters = await extractPayloadFromRequest(req, res);
let withFilters = hasFilters(filters);
let liveSessions = new Set();
const sessIDs = new Set();
let connected_sockets = await io.in(_projectKey + '-' + _sessionId).fetchSockets();
for (let item of connected_sockets) {
if (item.handshake.query.identity === IDENTITIES.session) {
if (withFilters) {
if (item.handshake.query.sessionInfo &&
isValidSession(item.handshake.query.sessionInfo, filters.filter) &&
!sessIDs.has(item.handshake.query.sessionInfo.sessionID)
) {
liveSessions.add(item.handshake.query.sessionInfo);
sessIDs.add(item.handshake.query.sessionInfo.sessionID);
}
} else {
if (!sessIDs.has(item.handshake.query.sessionInfo.sessionID)) {
liveSessions.add(item.handshake.query.sessionInfo);
sessIDs.add(item.handshake.query.sessionInfo.sessionID);
}
}
}
}
let sessions = Array.from(liveSessions);
respond(req, res, sessions.length > 0 ? sessions[0] : null);
}
const autocomplete = async function (req, res) {
debug_log && console.log("[WS]autocomplete");
res.handlerName = 'autocomplete';
let io = getServer();
let _projectKey = extractProjectKeyFromRequest(req);
let filters = await extractPayloadFromRequest(req);
let results = [];
if (filters.query && Object.keys(filters.query).length > 0) {
let rooms = await getAvailableRooms(io);
for (let roomId of rooms.keys()) {
let {projectKey} = extractPeerId(roomId);
if (projectKey === _projectKey) {
let connected_sockets = await io.in(roomId).fetchSockets();
for (let item of connected_sockets) {
if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo) {
results = [...results, ...getValidAttributes(item.handshake.query.sessionInfo, filters.query)];
}
}
}
}
}
respond(req, res, uniqueAutocomplete(results));
}
module.exports = {
respond,
socketsListByProject,
socketsLiveByProject,
socketsLiveBySession,
autocomplete
}