From 83325af69fcaefd59e8b9a4d928f559c1457c659 Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Wed, 16 Mar 2022 15:49:54 +0100 Subject: [PATCH] feat(utilities): FOSS&EE assist-standalone refactored feat(utilities): EE assist-redis refactored feat(utilities): EE assist-redis search by userId --- ee/utilities/servers/websocket-cluster.js | 210 +++++++++++++--------- ee/utilities/servers/websocket.js | 6 +- utilities/servers/websocket.js | 6 +- 3 files changed, 134 insertions(+), 88 deletions(-) diff --git a/ee/utilities/servers/websocket-cluster.js b/ee/utilities/servers/websocket-cluster.js index 940f83879..f1cba3014 100644 --- a/ee/utilities/servers/websocket-cluster.js +++ b/ee/utilities/servers/websocket-cluster.js @@ -5,8 +5,7 @@ const geoip2Reader = require('@maxmind/geoip2-node').Reader; const {extractPeerId} = require('./peerjs-server'); const {createAdapter} = require("@socket.io/redis-adapter"); const {createClient} = require("redis"); - -var wsRouter = express.Router(); +const wsRouter = express.Router(); const UPDATE_EVENT = "UPDATE_SESSION"; const IDENTITIES = {agent: 'agent', session: 'session'}; const NEW_AGENT = "NEW_AGENT"; @@ -15,14 +14,37 @@ const AGENT_DISCONNECT = "AGENT_DISCONNECTED"; const AGENTS_CONNECTED = "AGENTS_CONNECTED"; const NO_SESSIONS = "SESSION_DISCONNECTED"; const SESSION_ALREADY_CONNECTED = "SESSION_ALREADY_CONNECTED"; -// const wsReconnectionTimeout = process.env.wsReconnectionTimeout | 10 * 1000; +const REDIS_URL = process.env.REDIS_URL || "redis://localhost:6379"; +const pubClient = createClient({url: REDIS_URL}); +const subClient = pubClient.duplicate(); let io; const debug = process.env.debug === "1" || false; -const REDIS_URL = process.env.REDIS_URL || "redis://localhost:6379"; -const pubClient = createClient({url: REDIS_URL}); -const subClient = pubClient.duplicate(); +const createSocketIOServer = function (server) { + if (process.env.uws !== "true") { + io = _io(server, { + maxHttpBufferSize: (parseInt(process.env.maxHttpBufferSize) || 5) * 1e6, + cors: { + origin: "*", + methods: ["GET", "POST", "PUT"] + }, + path: '/socket' + }); + } else { + io = new _io.Server({ + maxHttpBufferSize: (parseInt(process.env.maxHttpBufferSize) || 5) * 1e6, + cors: { + origin: "*", + methods: ["GET", "POST", "PUT"] + }, + path: '/socket', + // transports: ['websocket'], + // upgrade: false + }); + io.attachApp(server); + } +} const uniqueSessions = function (data) { let resArr = []; @@ -36,18 +58,40 @@ const uniqueSessions = function (data) { return resArr; } -const socketsList = async function (req, res) { - debug && console.log("[WS]looking for all available sessions"); - let liveSessions = {}; - let rooms = await io.of('/').adapter.allRooms(); - for (let peerId of rooms) { - let {projectKey, sessionId} = extractPeerId(peerId); - if (projectKey !== undefined) { - liveSessions[projectKey] = liveSessions[projectKey] || []; - liveSessions[projectKey].push(sessionId); +const extractUserIdFromRequest = function (req) { + if (process.env.uws === "true") { + if (req.getQuery("userId")) { + debug && console.log(`[WS]where userId=${req.getQuery("userId")}`); + return req.getQuery("userId"); } + } else if (req.query.userId) { + debug && console.log(`[WS]where userId=${req.query.userId}`); + return req.query.userId; } - let result = {"data": liveSessions}; + return undefined; +} + +const extractProjectKeyFromRequest = function (req) { + if (process.env.uws === "true") { + if (req.getParameter(0)) { + debug && console.log(`[WS]where projectKey=${req.getParameter(0)}`); + return req.getParameter(0); + } + } else if (req.params.projectKey) { + debug && console.log(`[WS]where projectKey=${req.params.projectKey}`); + return req.params.projectKey; + } + return undefined; +} + + +const getAvailableRooms = async function () { + let rooms = await io.of('/').adapter.allRooms(); + return rooms; +} + +const respond = function (res, data) { + let result = {data} if (process.env.uws !== "true") { res.statusCode = 200; res.setHeader('Content-Type', 'application/json'); @@ -56,37 +100,64 @@ const socketsList = async function (req, res) { res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify(result)); } } + +const socketsList = async function (req, res) { + debug && console.log("[WS]looking for all available sessions"); + let userId = extractUserIdFromRequest(req); + + let liveSessions = {}; + let rooms = await getAvailableRooms(); + for (let peerId of rooms) { + let {projectKey, sessionId} = extractPeerId(peerId); + if (projectKey !== undefined) { + liveSessions[projectKey] = liveSessions[projectKey] || []; + if (userId) { + const connected_sockets = await io.in(peerId).fetchSockets(); + for (let item of connected_sockets) { + if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo && item.handshake.query.sessionInfo.userID === userId) { + liveSessions[projectKey].push(sessionId); + } + } + } else { + liveSessions[projectKey].push(sessionId); + } + } + } + respond(res, liveSessions); +} wsRouter.get(`/${process.env.S3_KEY}/sockets-list`, socketsList); const socketsListByProject = async function (req, res) { - if (process.env.uws === "true") { - req.params = {projectKey: req.getParameter(0)}; - } - debug && console.log(`[WS]looking for available sessions for ${req.params.projectKey}`); + debug && console.log("[WS]looking for available sessions"); + let _projectKey = extractProjectKeyFromRequest(req); + let userId = extractUserIdFromRequest(req); let liveSessions = {}; - let rooms = await io.of('/').adapter.allRooms(); + let rooms = await getAvailableRooms(); for (let peerId of rooms) { let {projectKey, sessionId} = extractPeerId(peerId); - if (projectKey === req.params.projectKey) { + if (projectKey === _projectKey) { liveSessions[projectKey] = liveSessions[projectKey] || []; - liveSessions[projectKey].push(sessionId); + if (userId) { + const connected_sockets = await io.in(peerId).fetchSockets(); + for (let item of connected_sockets) { + if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo && item.handshake.query.sessionInfo.userID === userId) { + liveSessions[projectKey].push(sessionId); + } + } + } else { + liveSessions[projectKey].push(sessionId); + } } } - let result = {"data": liveSessions[req.params.projectKey] || []}; - if (process.env.uws !== "true") { - res.statusCode = 200; - res.setHeader('Content-Type', 'application/json'); - res.end(JSON.stringify(result)); - } else { - res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify(result)); - } + respond(res, liveSessions[_projectKey] || []); } wsRouter.get(`/${process.env.S3_KEY}/sockets-list/:projectKey`, socketsListByProject); const socketsLive = async function (req, res) { debug && console.log("[WS]looking for all available LIVE sessions"); + let userId = extractUserIdFromRequest(req); let liveSessions = {}; - let rooms = await io.of('/').adapter.allRooms(); + let rooms = await getAvailableRooms(); for (let peerId of rooms) { let {projectKey, sessionId} = extractPeerId(peerId); if (projectKey !== undefined) { @@ -94,51 +165,48 @@ const socketsLive = async function (req, res) { for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session) { liveSessions[projectKey] = liveSessions[projectKey] || []; - liveSessions[projectKey].push(item.handshake.query.sessionInfo); + if (userId) { + if (item.handshake.query.sessionInfo && item.handshake.query.sessionInfo.userID === userId) { + liveSessions[projectKey].push(item.handshake.query.sessionInfo); + } + } else { + liveSessions[projectKey].push(item.handshake.query.sessionInfo); + } } } - liveSessions[projectKey] = uniqueSessions(liveSessions[projectKey]); + liveSessions[projectKey] = uniqueSessions(liveSessions[_projectKey]); } } - let result = {"data": liveSessions}; - if (process.env.uws !== "true") { - res.statusCode = 200; - res.setHeader('Content-Type', 'application/json'); - res.end(JSON.stringify(result)); - } else { - res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify(result)); - } + respond(res, liveSessions); } wsRouter.get(`/${process.env.S3_KEY}/sockets-live`, socketsLive); const socketsLiveByProject = async function (req, res) { - if (process.env.uws === "true") { - req.params = {projectKey: req.getParameter(0)}; - } - debug && console.log(`[WS]looking for available LIVE sessions for ${req.params.projectKey}`); + debug && console.log("[WS]looking for available LIVE sessions"); + let _projectKey = extractProjectKeyFromRequest(req); + let userId = extractUserIdFromRequest(req); let liveSessions = {}; - let rooms = await io.of('/').adapter.allRooms(); + let rooms = await getAvailableRooms(); for (let peerId of rooms) { let {projectKey, sessionId} = extractPeerId(peerId); - if (projectKey === req.params.projectKey) { + if (projectKey === _projectKey) { let connected_sockets = await io.in(peerId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session) { liveSessions[projectKey] = liveSessions[projectKey] || []; - liveSessions[projectKey].push(item.handshake.query.sessionInfo); + if (userId) { + if (item.handshake.query.sessionInfo && item.handshake.query.sessionInfo.userID === userId) { + liveSessions[projectKey].push(item.handshake.query.sessionInfo); + } + } else { + liveSessions[projectKey].push(item.handshake.query.sessionInfo); + } } } - liveSessions[projectKey] = uniqueSessions(liveSessions[projectKey]); + liveSessions[projectKey] = uniqueSessions(liveSessions[_projectKey]); } } - let result = {"data": liveSessions[req.params.projectKey] || []}; - if (process.env.uws !== "true") { - res.statusCode = 200; - res.setHeader('Content-Type', 'application/json'); - res.end(JSON.stringify(result)); - } else { - res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify(result)); - } + respond(res, liveSessions[_projectKey] || []); } wsRouter.get(`/${process.env.S3_KEY}/sockets-live/:projectKey`, socketsLiveByProject); @@ -220,34 +288,12 @@ function extractSessionInfo(socket) { module.exports = { wsRouter, start: (server) => { - if (process.env.uws !== "true") { - io = _io(server, { - maxHttpBufferSize: (parseInt(process.env.maxHttpBufferSize) || 5) * 1e6, - cors: { - origin: "*", - methods: ["GET", "POST", "PUT"] - }, - path: '/socket' - }); - } else { - io = new _io.Server({ - maxHttpBufferSize: (parseInt(process.env.maxHttpBufferSize) || 5) * 1e6, - cors: { - origin: "*", - methods: ["GET", "POST", "PUT"] - }, - path: '/socket', - // transports: ['websocket'], - // upgrade: false - }); - io.attachApp(server); - } - + createSocketIOServer(server); io.on('connection', async (socket) => { debug && console.log(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`); socket.peerId = socket.handshake.query.peerId; socket.identity = socket.handshake.query.identity; - let {projectKey, sessionId} = extractPeerId(socket.peerId); + const {projectKey, sessionId} = extractPeerId(socket.peerId); socket.sessionId = sessionId; socket.projectKey = projectKey; socket.lastMessageReceivedAt = Date.now(); diff --git a/ee/utilities/servers/websocket.js b/ee/utilities/servers/websocket.js index 7e5f062fa..63559e11b 100644 --- a/ee/utilities/servers/websocket.js +++ b/ee/utilities/servers/websocket.js @@ -337,10 +337,10 @@ module.exports = { socket.onAny(async (eventName, ...args) => { socket.lastMessageReceivedAt = Date.now(); if (socket.identity === IDENTITIES.session) { - debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.peerId}, members: ${io.sockets.adapter.rooms.get(socket.peerId).size}`); + debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.peerId}`); socket.to(socket.peerId).emit(eventName, args[0]); } else { - debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to session of room:${socket.peerId}, members:${io.sockets.adapter.rooms.get(socket.peerId).size}`); + debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to session of room:${socket.peerId}`); let socketId = await findSessionSocketId(io, socket.peerId); if (socketId === null) { debug && console.log(`session not found for:${socket.peerId}`); @@ -354,7 +354,7 @@ module.exports = { }); console.log("WS server started") - setInterval((io) => { + setInterval(async (io) => { try { let count = 0; console.log(` ====== Rooms: ${io.sockets.adapter.rooms.size} ====== `); diff --git a/utilities/servers/websocket.js b/utilities/servers/websocket.js index 66d0fcde0..dd4c012dc 100644 --- a/utilities/servers/websocket.js +++ b/utilities/servers/websocket.js @@ -308,10 +308,10 @@ module.exports = { socket.onAny(async (eventName, ...args) => { socket.lastMessageReceivedAt = Date.now(); if (socket.identity === IDENTITIES.session) { - debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.peerId}, members: ${io.sockets.adapter.rooms.get(socket.peerId).size}`); + debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.peerId}`); socket.to(socket.peerId).emit(eventName, args[0]); } else { - debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to session of room:${socket.peerId}, members:${io.sockets.adapter.rooms.get(socket.peerId).size}`); + debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to session of room:${socket.peerId}`); let socketId = await findSessionSocketId(io, socket.peerId); if (socketId === null) { debug && console.log(`session not found for:${socket.peerId}`); @@ -325,7 +325,7 @@ module.exports = { }); console.log("WS server started") - setInterval((io) => { + setInterval(async (io) => { try { let count = 0; console.log(` ====== Rooms: ${io.sockets.adapter.rooms.size} ====== `);