Boost performance for one node assist mode (#1869)
* feat(assist): removed cache for solo-node assist mode * feat(assist): try to fetch from global io object * feat(assist): small changes * feat(assist): added cache per request instead of cache per room * feat(assist): fixed await call in global scope * feat(assist): try to fix circular structure issue * feat(assist): use preprocessor for sockets list * feat(assist): check the theory about fetchSockets data set * feat(assist): try to keep everything inside handshake.query object
This commit is contained in:
parent
e14cbe41e5
commit
a26b603945
7 changed files with 170 additions and 307 deletions
|
|
@ -15,16 +15,13 @@ const {
|
|||
RecordRequestDuration,
|
||||
IncreaseTotalRequests
|
||||
} = require('../utils/metrics');
|
||||
const {
|
||||
GetRoomInfo,
|
||||
GetRooms,
|
||||
GetSessions,
|
||||
} = require('../utils/rooms');
|
||||
const {fetchSockets} = require("./wsServer");
|
||||
const {IDENTITIES} = require("./assistHelper");
|
||||
|
||||
const debug_log = process.env.debug === "1";
|
||||
|
||||
const respond = function (req, res, data) {
|
||||
console.log("responding with data: ", data)
|
||||
debug_log && console.log("responding with data: ", JSON.stringify(data))
|
||||
let result = {data}
|
||||
if (process.env.uws !== "true") {
|
||||
res.statusCode = 200;
|
||||
|
|
@ -38,8 +35,18 @@ const respond = function (req, res, data) {
|
|||
RecordRequestDuration(req.method.toLowerCase(), res.handlerName, 200, duration/1000.0);
|
||||
}
|
||||
|
||||
const getParticularSession = function (sessionId, filters) {
|
||||
const sessInfo = GetRoomInfo(sessionId);
|
||||
const getParticularSession = async function (roomId, filters) {
|
||||
let connected_sockets = await fetchSockets(roomId);
|
||||
if (connected_sockets.length === 0) {
|
||||
return null;
|
||||
}
|
||||
let sessInfo;
|
||||
for (let item of connected_sockets) {
|
||||
if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo) {
|
||||
sessInfo = item.handshake.query.sessionInfo;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!sessInfo) {
|
||||
return null;
|
||||
}
|
||||
|
|
@ -52,23 +59,41 @@ const getParticularSession = function (sessionId, filters) {
|
|||
return null;
|
||||
}
|
||||
|
||||
const getAllSessions = function (projectKey, filters, onlineOnly= false) {
|
||||
const getAllSessions = async function (projectKey, filters, onlineOnly= false) {
|
||||
const sessions = [];
|
||||
const allRooms = onlineOnly ? GetSessions(projectKey) : GetRooms(projectKey);
|
||||
const connected_sockets = await fetchSockets();
|
||||
if (connected_sockets.length === 0) {
|
||||
return sessions;
|
||||
}
|
||||
|
||||
for (let sessionId of allRooms) {
|
||||
let sessInfo = GetRoomInfo(sessionId);
|
||||
if (!sessInfo) {
|
||||
const rooms = new Map();
|
||||
for (let item of connected_sockets) {
|
||||
// Prefilter checks
|
||||
if (rooms.has(item.handshake.query.roomId)) {
|
||||
continue;
|
||||
}
|
||||
if (item.handshake.query.projectKey !== projectKey || !item.handshake.query.sessionInfo) {
|
||||
continue;
|
||||
}
|
||||
if (onlineOnly && item.handshake.query.identity !== IDENTITIES.session) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Mark this room as visited
|
||||
rooms.set(item.handshake.query.roomId, true);
|
||||
|
||||
// Add session to the list without filtering
|
||||
if (!hasFilters(filters)) {
|
||||
sessions.push(sessInfo);
|
||||
sessions.push(item.handshake.query.sessionInfo);
|
||||
continue;
|
||||
}
|
||||
if (isValidSession(sessInfo, filters.filter)) {
|
||||
sessions.push(sessInfo);
|
||||
|
||||
// Add session to the list if it passes the filter
|
||||
if (isValidSession(item.handshake.query.sessionInfo, filters.filter)) {
|
||||
sessions.push(item.handshake.query.sessionInfo);
|
||||
}
|
||||
}
|
||||
|
||||
return sessions
|
||||
}
|
||||
|
||||
|
|
@ -83,11 +108,12 @@ const socketsListByProject = async function (req, res) {
|
|||
|
||||
// find a particular session
|
||||
if (_sessionId) {
|
||||
return respond(req, res, getParticularSession(_sessionId, filters));
|
||||
const sessInfo = await getParticularSession(`${_projectKey}-${_sessionId}`, filters);
|
||||
return respond(req, res, sessInfo);
|
||||
}
|
||||
|
||||
// find all sessions for a project
|
||||
const sessions = getAllSessions(_projectKey, filters);
|
||||
const sessions = await getAllSessions(_projectKey, filters);
|
||||
|
||||
// send response
|
||||
respond(req, res, sortPaginate(sessions, filters));
|
||||
|
|
@ -104,11 +130,12 @@ const socketsLiveByProject = async function (req, res) {
|
|||
|
||||
// find a particular session
|
||||
if (_sessionId) {
|
||||
return respond(req, res, getParticularSession(_sessionId, filters));
|
||||
let sessInfo = await getParticularSession(`${_projectKey}-${_sessionId}`, filters);
|
||||
return respond(req, res, sessInfo);
|
||||
}
|
||||
|
||||
// find all sessions for a project
|
||||
const sessions = getAllSessions(_projectKey, filters, true);
|
||||
const sessions = await getAllSessions(_projectKey, filters, true);
|
||||
|
||||
// send response
|
||||
respond(req, res, sortPaginate(sessions, filters));
|
||||
|
|
@ -119,12 +146,14 @@ const socketsLiveBySession = async function (req, res) {
|
|||
debug_log && console.log("[WS]looking for LIVE session");
|
||||
res.handlerName = 'socketsLiveBySession';
|
||||
|
||||
const _projectKey = extractProjectKeyFromRequest(req);
|
||||
const _sessionId = extractSessionIdFromRequest(req);
|
||||
const filters = await extractPayloadFromRequest(req, res);
|
||||
|
||||
// find a particular session
|
||||
if (_sessionId) {
|
||||
return respond(req, res, getParticularSession(_sessionId, filters));
|
||||
let sessInfo = await getParticularSession(`${_projectKey}-${_sessionId}`, filters);
|
||||
return respond(req, res, sessInfo);
|
||||
}
|
||||
return respond(req, res, null);
|
||||
}
|
||||
|
|
@ -140,14 +169,27 @@ const autocomplete = async function (req, res) {
|
|||
if (!hasQuery(filters)) {
|
||||
return respond(req, res, results);
|
||||
}
|
||||
let allSessions = GetSessions(_projectKey);
|
||||
for (let sessionId of allSessions) {
|
||||
let sessInfo = GetRoomInfo(sessionId);
|
||||
if (!sessInfo) {
|
||||
|
||||
let connected_sockets = await fetchSockets();
|
||||
if (connected_sockets.length === 0) {
|
||||
return results;
|
||||
}
|
||||
|
||||
const rooms = new Map();
|
||||
for (let item of connected_sockets) {
|
||||
if (rooms.has(item.handshake.query.roomId)) {
|
||||
continue;
|
||||
}
|
||||
results = [...results, ...getValidAttributes(sessInfo, filters.query)];
|
||||
if (item.handshake.query.sessionInfo) {
|
||||
if ((item.handshake.query.projectKey !== _projectKey) || (item.handshake.query.identity !== IDENTITIES.session)) {
|
||||
continue;
|
||||
}
|
||||
// Mark this room as visited
|
||||
rooms.set(item.handshake.query.roomId, true);
|
||||
results.push(...getValidAttributes(item.handshake.query.sessionInfo, filters.query))
|
||||
}
|
||||
}
|
||||
|
||||
respond(req, res, uniqueAutocomplete(results));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,49 +0,0 @@
|
|||
const roomsInfo = new Map(); // sessionID -> sessionInfo
|
||||
const projectSessions = new Map(); // projectKey -> Set(sessionIDs) // all rooms (even with agent only)
|
||||
const projectRooms = new Map(); // projectKey -> Set(roomIDs) // online rooms
|
||||
|
||||
function AddRoom(projKey, sessID, sessInfo) {
|
||||
roomsInfo.set(sessID, sessInfo);
|
||||
if (!projectRooms.has(projKey)) {
|
||||
projectRooms.set(projKey, new Set());
|
||||
}
|
||||
projectRooms.get(projKey).add(sessID);
|
||||
if (!projectSessions.has(projKey)) {
|
||||
projectSessions.set(projKey, new Set());
|
||||
}
|
||||
projectSessions.get(projKey).add(sessID);
|
||||
}
|
||||
|
||||
function UpdateRoom(sessID, sessInfo) {
|
||||
roomsInfo.set(sessID, sessInfo);
|
||||
}
|
||||
|
||||
function DeleteSession(projKey, sessID) {
|
||||
projectSessions.get(projKey)?.delete(sessID);
|
||||
}
|
||||
|
||||
function DeleteRoom(projKey, sessID) {
|
||||
projectRooms.get(projKey)?.delete(sessID);
|
||||
}
|
||||
|
||||
function GetRoomInfo(sessID) {
|
||||
return roomsInfo.get(sessID);
|
||||
}
|
||||
|
||||
function GetRooms(projectKey) {
|
||||
return projectRooms.get(projectKey) || new Set();
|
||||
}
|
||||
|
||||
function GetSessions(projectKey) {
|
||||
return projectSessions.get(projectKey) || new Set();
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
AddRoom,
|
||||
UpdateRoom,
|
||||
DeleteRoom,
|
||||
DeleteSession,
|
||||
GetRoomInfo,
|
||||
GetRooms,
|
||||
GetSessions,
|
||||
}
|
||||
|
|
@ -23,12 +23,6 @@ const {
|
|||
IncreaseOnlineRooms,
|
||||
DecreaseOnlineRooms,
|
||||
} = require('../utils/metrics');
|
||||
const {
|
||||
AddRoom,
|
||||
UpdateRoom,
|
||||
DeleteRoom,
|
||||
DeleteSession,
|
||||
} = require('../utils/rooms');
|
||||
|
||||
const debug_log = process.env.debug === "1";
|
||||
const error_log = process.env.ERROR === "1";
|
||||
|
|
@ -70,32 +64,29 @@ async function getRoomData(io, roomID) {
|
|||
|
||||
function processNewSocket(socket) {
|
||||
socket._connectedAt = new Date();
|
||||
socket.identity = socket.handshake.query.identity;
|
||||
socket.peerId = socket.handshake.query.peerId;
|
||||
let {projectKey: connProjectKey, sessionId: connSessionId, tabId: connTabId} = extractPeerId(socket.peerId);
|
||||
socket.roomId = `${connProjectKey}-${connSessionId}`;
|
||||
socket.projectId = socket.handshake.query.projectId;
|
||||
socket.projectKey = connProjectKey;
|
||||
socket.sessId = connSessionId;
|
||||
socket.tabId = connTabId;
|
||||
debug_log && console.log(`connProjectKey:${connProjectKey}, connSessionId:${connSessionId}, connTabId:${connTabId}, roomId:${socket.roomId}`);
|
||||
let {projectKey: connProjectKey, sessionId: connSessionId, tabId: connTabId} = extractPeerId(socket.handshake.query.peerId);
|
||||
socket.handshake.query.roomId = `${connProjectKey}-${connSessionId}`;
|
||||
socket.handshake.query.projectKey = connProjectKey;
|
||||
socket.handshake.query.sessId = connSessionId;
|
||||
socket.handshake.query.tabId = connTabId;
|
||||
debug_log && console.log(`connProjectKey:${connProjectKey}, connSessionId:${connSessionId}, connTabId:${connTabId}, roomId:${socket.handshake.query.roomId}`);
|
||||
}
|
||||
|
||||
async function onConnect(socket) {
|
||||
debug_log && console.log(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`);
|
||||
processNewSocket(socket);
|
||||
IncreaseTotalWSConnections(socket.identity);
|
||||
IncreaseOnlineConnections(socket.identity);
|
||||
IncreaseTotalWSConnections(socket.handshake.query.identity);
|
||||
IncreaseOnlineConnections(socket.handshake.query.identity);
|
||||
|
||||
const io = getServer();
|
||||
const {tabsCount, agentsCount, tabIDs, agentIDs} = await getRoomData(io, socket.roomId);
|
||||
const {tabsCount, agentsCount, tabIDs, agentIDs} = await getRoomData(io, socket.handshake.query.roomId);
|
||||
|
||||
if (socket.identity === IDENTITIES.session) {
|
||||
if (socket.handshake.query.identity === IDENTITIES.session) {
|
||||
// Check if session with the same tabID already connected, if so, refuse new connexion
|
||||
if (tabsCount > 0) {
|
||||
for (let tab of tabIDs) {
|
||||
if (tab === socket.tabId) {
|
||||
error_log && console.log(`session already connected, refusing new connexion, peerId: ${socket.peerId}`);
|
||||
if (tab === socket.handshake.query.tabId) {
|
||||
error_log && console.log(`session already connected, refusing new connexion, peerId: ${socket.handshake.query.peerId}`);
|
||||
io.to(socket.id).emit(EVENTS_DEFINITION.emit.SESSION_ALREADY_CONNECTED);
|
||||
return socket.disconnect();
|
||||
}
|
||||
|
|
@ -106,35 +97,34 @@ async function onConnect(socket) {
|
|||
// New session creates new room
|
||||
IncreaseTotalRooms();
|
||||
IncreaseOnlineRooms();
|
||||
AddRoom(socket.projectKey, socket.sessId, socket.handshake.query.sessionInfo);
|
||||
}
|
||||
// Inform all connected agents about reconnected session
|
||||
if (agentsCount > 0) {
|
||||
debug_log && console.log(`notifying new session about agent-existence`);
|
||||
io.to(socket.id).emit(EVENTS_DEFINITION.emit.AGENTS_CONNECTED, agentIDs);
|
||||
socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id);
|
||||
socket.to(socket.handshake.query.roomId).emit(EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id);
|
||||
}
|
||||
} else if (tabsCount <= 0) {
|
||||
debug_log && console.log(`notifying new agent about no SESSIONS with peerId:${socket.peerId}`);
|
||||
debug_log && console.log(`notifying new agent about no SESSIONS with peerId:${socket.handshake.query.peerId}`);
|
||||
io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
|
||||
}
|
||||
await socket.join(socket.roomId);
|
||||
await socket.join(socket.handshake.query.roomId);
|
||||
|
||||
if (debug_log) {
|
||||
let connectedSockets = await io.in(socket.roomId).fetchSockets();
|
||||
let connectedSockets = await io.in(socket.handshake.query.roomId).fetchSockets();
|
||||
if (connectedSockets.length > 0) {
|
||||
console.log(`${socket.id} joined room:${socket.roomId}, as:${socket.identity}, members:${connectedSockets.length}`);
|
||||
console.log(`${socket.id} joined room:${socket.handshake.query.roomId}, as:${socket.handshake.query.identity}, members:${connectedSockets.length}`);
|
||||
}
|
||||
}
|
||||
|
||||
if (socket.identity === IDENTITIES.agent) {
|
||||
if (socket.handshake.query.identity === IDENTITIES.agent) {
|
||||
if (socket.handshake.query.agentInfo !== undefined) {
|
||||
socket.handshake.query.agentInfo = JSON.parse(socket.handshake.query.agentInfo);
|
||||
socket.agentID = socket.handshake.query.agentInfo.id;
|
||||
socket.handshake.query.agentID = socket.handshake.query.agentInfo.id;
|
||||
// Stats
|
||||
startAssist(socket, socket.agentID);
|
||||
startAssist(socket, socket.handshake.query.agentID);
|
||||
}
|
||||
socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo);
|
||||
socket.to(socket.handshake.query.roomId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo);
|
||||
}
|
||||
|
||||
// Set disconnect handler
|
||||
|
|
@ -153,50 +143,46 @@ async function onConnect(socket) {
|
|||
}
|
||||
|
||||
async function onDisconnect(socket) {
|
||||
DecreaseOnlineConnections(socket.identity);
|
||||
debug_log && console.log(`${socket.id} disconnected from ${socket.roomId}`);
|
||||
DecreaseOnlineConnections(socket.handshake.query.identity);
|
||||
debug_log && console.log(`${socket.id} disconnected from ${socket.handshake.query.roomId}`);
|
||||
|
||||
if (socket.identity === IDENTITIES.agent) {
|
||||
socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id);
|
||||
if (socket.handshake.query.identity === IDENTITIES.agent) {
|
||||
socket.to(socket.handshake.query.roomId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id);
|
||||
// Stats
|
||||
endAssist(socket, socket.agentID);
|
||||
endAssist(socket, socket.handshake.query.agentID);
|
||||
}
|
||||
debug_log && console.log("checking for number of connected agents and sessions");
|
||||
const io = getServer();
|
||||
let {tabsCount, agentsCount, tabIDs, agentIDs} = await getRoomData(io, socket.roomId);
|
||||
let {tabsCount, agentsCount, tabIDs, agentIDs} = await getRoomData(io, socket.handshake.query.roomId);
|
||||
|
||||
if (tabsCount === -1 && agentsCount === -1) {
|
||||
DecreaseOnlineRooms();
|
||||
debug_log && console.log(`room not found: ${socket.roomId}`);
|
||||
DeleteSession(socket.projectKey, socket.sessId);
|
||||
DeleteRoom(socket.projectKey, socket.sessId);
|
||||
debug_log && console.log(`room not found: ${socket.handshake.query.roomId}`);
|
||||
return;
|
||||
}
|
||||
if (tabsCount === 0) {
|
||||
debug_log && console.log(`notifying everyone in ${socket.roomId} about no SESSIONS`);
|
||||
socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
|
||||
DeleteSession(socket.projectKey, socket.sessId);
|
||||
debug_log && console.log(`notifying everyone in ${socket.handshake.query.roomId} about no SESSIONS`);
|
||||
socket.to(socket.handshake.query.roomId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
|
||||
}
|
||||
if (agentsCount === 0) {
|
||||
debug_log && console.log(`notifying everyone in ${socket.roomId} about no AGENTS`);
|
||||
socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_AGENTS);
|
||||
debug_log && console.log(`notifying everyone in ${socket.handshake.query.roomId} about no AGENTS`);
|
||||
socket.to(socket.handshake.query.roomId).emit(EVENTS_DEFINITION.emit.NO_AGENTS);
|
||||
}
|
||||
}
|
||||
|
||||
async function onUpdateEvent(socket, ...args) {
|
||||
debug_log && console.log(`${socket.id} sent update event.`);
|
||||
if (socket.identity !== IDENTITIES.session) {
|
||||
if (socket.handshake.query.identity !== IDENTITIES.session) {
|
||||
debug_log && console.log('Ignoring update event.');
|
||||
return
|
||||
}
|
||||
|
||||
args[0] = updateSessionData(socket, args[0])
|
||||
Object.assign(socket.handshake.query.sessionInfo, args[0].data, {tabId: args[0]?.meta?.tabId});
|
||||
UpdateRoom(socket.sessId, socket.handshake.query.sessionInfo);
|
||||
|
||||
// Update sessionInfo for all agents in the room
|
||||
const io = getServer();
|
||||
const connected_sockets = await io.in(socket.roomId).fetchSockets();
|
||||
const connected_sockets = await io.in(socket.handshake.query.roomId).fetchSockets();
|
||||
for (let item of connected_sockets) {
|
||||
if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo) {
|
||||
Object.assign(item.handshake.query.sessionInfo, args[0]?.data, {tabId: args[0]?.meta?.tabId});
|
||||
|
|
@ -212,17 +198,17 @@ async function onAny(socket, eventName, ...args) {
|
|||
return
|
||||
}
|
||||
args[0] = updateSessionData(socket, args[0])
|
||||
if (socket.identity === IDENTITIES.session) {
|
||||
debug_log && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.roomId}`);
|
||||
socket.to(socket.roomId).emit(eventName, args[0]);
|
||||
if (socket.handshake.query.identity === IDENTITIES.session) {
|
||||
debug_log && console.log(`received event:${eventName}, from:${socket.handshake.query.identity}, sending message to room:${socket.handshake.query.roomId}`);
|
||||
socket.to(socket.handshake.query.roomId).emit(eventName, args[0]);
|
||||
} else {
|
||||
// Stats
|
||||
handleEvent(eventName, socket, args[0]);
|
||||
debug_log && console.log(`received event:${eventName}, from:${socket.identity}, sending message to session of room:${socket.roomId}`);
|
||||
debug_log && console.log(`received event:${eventName}, from:${socket.handshake.query.identity}, sending message to session of room:${socket.handshake.query.roomId}`);
|
||||
const io = getServer();
|
||||
let socketId = await findSessionSocketId(io, socket.roomId, args[0]?.meta?.tabId);
|
||||
let socketId = await findSessionSocketId(io, socket.handshake.query.roomId, args[0]?.meta?.tabId);
|
||||
if (socketId === null) {
|
||||
debug_log && console.log(`session not found for:${socket.roomId}`);
|
||||
debug_log && console.log(`session not found for:${socket.handshake.query.roomId}`);
|
||||
io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
|
||||
} else {
|
||||
debug_log && console.log("message sent");
|
||||
|
|
@ -233,7 +219,7 @@ async function onAny(socket, eventName, ...args) {
|
|||
|
||||
// Back compatibility (add top layer with meta information)
|
||||
function updateSessionData(socket, sessionData) {
|
||||
if (sessionData?.meta === undefined && socket.identity === IDENTITIES.session) {
|
||||
if (sessionData?.meta === undefined && socket.handshake.query.identity === IDENTITIES.session) {
|
||||
sessionData = {meta: {tabId: socket.tabId, version: 1}, data: sessionData};
|
||||
}
|
||||
return sessionData
|
||||
|
|
|
|||
|
|
@ -7,6 +7,16 @@ const getServer = function () {
|
|||
return io;
|
||||
}
|
||||
|
||||
const fetchSockets = async function (roomID) {
|
||||
if (!io) {
|
||||
return [];
|
||||
}
|
||||
if (!roomID) {
|
||||
return await io.fetchSockets();
|
||||
}
|
||||
return await io.in(roomID).fetchSockets();
|
||||
}
|
||||
|
||||
const createSocketIOServer = function (server, prefix) {
|
||||
if (io) {
|
||||
return io;
|
||||
|
|
@ -26,4 +36,5 @@ const createSocketIOServer = function (server, prefix) {
|
|||
module.exports = {
|
||||
createSocketIOServer,
|
||||
getServer,
|
||||
fetchSockets,
|
||||
}
|
||||
|
|
@ -14,7 +14,7 @@ const {
|
|||
socketsLiveByProject,
|
||||
socketsLiveBySession,
|
||||
autocomplete
|
||||
} = require('../utils/httpHandlers-cluster');
|
||||
} = require('../utils/httpHandlers');
|
||||
|
||||
const {createAdapter} = require("@socket.io/redis-adapter");
|
||||
const {createClient} = require("redis");
|
||||
|
|
|
|||
|
|
@ -1,176 +0,0 @@
|
|||
const {
|
||||
hasFilters,
|
||||
extractPeerId,
|
||||
isValidSession,
|
||||
sortPaginate,
|
||||
getValidAttributes,
|
||||
uniqueAutocomplete
|
||||
} = require("./helper");
|
||||
const {
|
||||
extractProjectKeyFromRequest,
|
||||
extractSessionIdFromRequest,
|
||||
extractPayloadFromRequest,
|
||||
getAvailableRooms
|
||||
} = require("./extractors");
|
||||
const {
|
||||
IDENTITIES
|
||||
} = require("./assistHelper");
|
||||
const {
|
||||
getServer
|
||||
} = require('../utils/wsServer');
|
||||
const {
|
||||
RecordRequestDuration,
|
||||
IncreaseTotalRequests
|
||||
} = require('../utils/metrics');
|
||||
|
||||
const debug_log = process.env.debug === "1";
|
||||
|
||||
const respond = function (req, res, data) {
|
||||
let result = {data}
|
||||
if (process.env.uws !== "true") {
|
||||
res.statusCode = 200;
|
||||
res.setHeader('Content-Type', 'application/json');
|
||||
res.end(JSON.stringify(result));
|
||||
} else {
|
||||
res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify(result));
|
||||
}
|
||||
const duration = performance.now() - req.startTs;
|
||||
IncreaseTotalRequests();
|
||||
RecordRequestDuration(req.method.toLowerCase(), res.handlerName, 200, duration/1000.0);
|
||||
}
|
||||
|
||||
const socketsListByProject = async function (req, res) {
|
||||
debug_log && console.log("[WS]looking for available sessions");
|
||||
res.handlerName = 'socketsListByProject';
|
||||
|
||||
let io = getServer();
|
||||
let _projectKey = extractProjectKeyFromRequest(req);
|
||||
let _sessionId = extractSessionIdFromRequest(req);
|
||||
if (_sessionId === undefined) {
|
||||
return respond(req, res, null);
|
||||
}
|
||||
let filters = await extractPayloadFromRequest(req, res);
|
||||
|
||||
let connected_sockets = await io.in(_projectKey + '-' + _sessionId).fetchSockets();
|
||||
for (let item of connected_sockets) {
|
||||
if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo
|
||||
&& isValidSession(item.handshake.query.sessionInfo, filters.filter)) {
|
||||
return respond(req, res, _sessionId);
|
||||
}
|
||||
}
|
||||
respond(req, res, null);
|
||||
}
|
||||
|
||||
const socketsLiveByProject = async function (req, res) {
|
||||
debug_log && console.log("[WS]looking for available LIVE sessions");
|
||||
res.handlerName = 'socketsLiveByProject';
|
||||
|
||||
let io = getServer();
|
||||
let _projectKey = extractProjectKeyFromRequest(req);
|
||||
let _sessionId = extractSessionIdFromRequest(req);
|
||||
let filters = await extractPayloadFromRequest(req, res);
|
||||
let withFilters = hasFilters(filters);
|
||||
let liveSessions = new Set();
|
||||
const sessIDs = new Set();
|
||||
|
||||
let rooms = await getAvailableRooms(io);
|
||||
for (let roomId of rooms.keys()) {
|
||||
let {projectKey, sessionId} = extractPeerId(roomId);
|
||||
if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) {
|
||||
let connected_sockets = await io.in(roomId).fetchSockets();
|
||||
for (let item of connected_sockets) {
|
||||
if (item.handshake.query.identity === IDENTITIES.session) {
|
||||
if (withFilters) {
|
||||
if (item.handshake.query.sessionInfo &&
|
||||
isValidSession(item.handshake.query.sessionInfo, filters.filter) &&
|
||||
!sessIDs.has(item.handshake.query.sessionInfo.sessionID)
|
||||
) {
|
||||
liveSessions.add(item.handshake.query.sessionInfo);
|
||||
sessIDs.add(item.handshake.query.sessionInfo.sessionID);
|
||||
}
|
||||
} else {
|
||||
if (!sessIDs.has(item.handshake.query.sessionInfo.sessionID)) {
|
||||
liveSessions.add(item.handshake.query.sessionInfo);
|
||||
sessIDs.add(item.handshake.query.sessionInfo.sessionID);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let sessions = Array.from(liveSessions);
|
||||
respond(req, res, _sessionId === undefined ? sortPaginate(sessions, filters) : sessions.length > 0 ? sessions[0] : null);
|
||||
}
|
||||
|
||||
const socketsLiveBySession = async function (req, res) {
|
||||
debug_log && console.log("[WS]looking for LIVE session");
|
||||
res.handlerName = 'socketsLiveBySession';
|
||||
|
||||
let io = getServer();
|
||||
let _projectKey = extractProjectKeyFromRequest(req);
|
||||
let _sessionId = extractSessionIdFromRequest(req);
|
||||
if (_sessionId === undefined) {
|
||||
return respond(req, res, null);
|
||||
}
|
||||
let filters = await extractPayloadFromRequest(req, res);
|
||||
let withFilters = hasFilters(filters);
|
||||
let liveSessions = new Set();
|
||||
const sessIDs = new Set();
|
||||
|
||||
let connected_sockets = await io.in(_projectKey + '-' + _sessionId).fetchSockets();
|
||||
for (let item of connected_sockets) {
|
||||
if (item.handshake.query.identity === IDENTITIES.session) {
|
||||
if (withFilters) {
|
||||
if (item.handshake.query.sessionInfo &&
|
||||
isValidSession(item.handshake.query.sessionInfo, filters.filter) &&
|
||||
!sessIDs.has(item.handshake.query.sessionInfo.sessionID)
|
||||
) {
|
||||
liveSessions.add(item.handshake.query.sessionInfo);
|
||||
sessIDs.add(item.handshake.query.sessionInfo.sessionID);
|
||||
}
|
||||
} else {
|
||||
if (!sessIDs.has(item.handshake.query.sessionInfo.sessionID)) {
|
||||
liveSessions.add(item.handshake.query.sessionInfo);
|
||||
sessIDs.add(item.handshake.query.sessionInfo.sessionID);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
let sessions = Array.from(liveSessions);
|
||||
respond(req, res, sessions.length > 0 ? sessions[0] : null);
|
||||
}
|
||||
|
||||
const autocomplete = async function (req, res) {
|
||||
debug_log && console.log("[WS]autocomplete");
|
||||
res.handlerName = 'autocomplete';
|
||||
|
||||
let io = getServer();
|
||||
let _projectKey = extractProjectKeyFromRequest(req);
|
||||
let filters = await extractPayloadFromRequest(req);
|
||||
let results = [];
|
||||
|
||||
if (filters.query && Object.keys(filters.query).length > 0) {
|
||||
let rooms = await getAvailableRooms(io);
|
||||
for (let roomId of rooms.keys()) {
|
||||
let {projectKey} = extractPeerId(roomId);
|
||||
if (projectKey === _projectKey) {
|
||||
let connected_sockets = await io.in(roomId).fetchSockets();
|
||||
for (let item of connected_sockets) {
|
||||
if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo) {
|
||||
results = [...results, ...getValidAttributes(item.handshake.query.sessionInfo, filters.query)];
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
respond(req, res, uniqueAutocomplete(results));
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
respond,
|
||||
socketsListByProject,
|
||||
socketsLiveByProject,
|
||||
socketsLiveBySession,
|
||||
autocomplete
|
||||
}
|
||||
|
|
@ -7,6 +7,54 @@ const getServer = function () {
|
|||
return io;
|
||||
}
|
||||
|
||||
let redisClient;
|
||||
const useRedis = process.env.redis === "true";
|
||||
|
||||
if (useRedis) {
|
||||
const {createClient} = require("redis");
|
||||
const REDIS_URL = (process.env.REDIS_URL || "localhost:6379").replace(/((^\w+:|^)\/\/|^)/, 'redis://');
|
||||
redisClient = createClient({url: REDIS_URL});
|
||||
redisClient.on("error", (error) => console.error(`Redis error : ${error}`));
|
||||
void redisClient.connect();
|
||||
}
|
||||
|
||||
const processSocketsList = function (sockets) {
|
||||
let res = []
|
||||
for (let socket of sockets) {
|
||||
let {handshake} = socket;
|
||||
res.push({handshake});
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
const doFetchAllSockets = async function () {
|
||||
if (useRedis) {
|
||||
try {
|
||||
let cachedResult = await redisClient.get('fetchSocketsResult');
|
||||
if (cachedResult) {
|
||||
return JSON.parse(cachedResult);
|
||||
}
|
||||
let result = await io.fetchSockets();
|
||||
let cachedString = JSON.stringify(processSocketsList(result));
|
||||
await redisClient.set('fetchSocketsResult', cachedString, {EX: 5});
|
||||
return result;
|
||||
} catch (error) {
|
||||
console.error('Error setting value with expiration:', error);
|
||||
}
|
||||
}
|
||||
return await io.fetchSockets();
|
||||
}
|
||||
|
||||
const fetchSockets = async function (roomID) {
|
||||
if (!io) {
|
||||
return [];
|
||||
}
|
||||
if (!roomID) {
|
||||
return await doFetchAllSockets();
|
||||
}
|
||||
return await io.in(roomID).fetchSockets();
|
||||
}
|
||||
|
||||
const createSocketIOServer = function (server, prefix) {
|
||||
if (io) {
|
||||
return io;
|
||||
|
|
@ -41,4 +89,5 @@ const createSocketIOServer = function (server, prefix) {
|
|||
module.exports = {
|
||||
createSocketIOServer,
|
||||
getServer,
|
||||
fetchSockets,
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue