From e043c400d5a5b0e5ea9489b0aaaa9586905f522f Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Wed, 16 Mar 2022 13:36:22 +0100 Subject: [PATCH 1/7] feat(utilities): FOSS-WS get live sessions by userId --- utilities/servers/websocket.js | 74 +++++++++++++++++++++++++++++----- 1 file changed, 64 insertions(+), 10 deletions(-) diff --git a/utilities/servers/websocket.js b/utilities/servers/websocket.js index b772228be..c902f1b9d 100644 --- a/utilities/servers/websocket.js +++ b/utilities/servers/websocket.js @@ -17,14 +17,29 @@ const SESSION_ALREADY_CONNECTED = "SESSION_ALREADY_CONNECTED"; let io; let debug = process.env.debug === "1" || false; -const socketsList = function (req, res) { +const socketsList = async function (req, res) { debug && console.log("[WS]looking for all available sessions"); + let userId; + if (req.query.userId) { + debug && console.log(`[WS]where userId=${req.query.userId}`); + userId = req.query.userId; + } let liveSessions = {}; for (let peerId of io.sockets.adapter.rooms.keys()) { let {projectKey, sessionId} = extractPeerId(peerId); if (projectKey !== undefined) { - liveSessions[projectKey] = liveSessions[projectKey] || []; - liveSessions[projectKey].push(sessionId); + if (userId) { + const connected_sockets = await io.in(peerId).fetchSockets(); + for (let item of connected_sockets) { + if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo && item.handshake.query.sessionInfo.userID === userId) { + liveSessions[projectKey] = liveSessions[projectKey] || []; + liveSessions[projectKey].push(sessionId); + } + } + } else { + liveSessions[projectKey] = liveSessions[projectKey] || []; + liveSessions[projectKey].push(sessionId); + } } } res.statusCode = 200; @@ -33,14 +48,29 @@ const socketsList = function (req, res) { } wsRouter.get(`/${process.env.S3_KEY}/sockets-list`, socketsList); -const socketsListByProject = function (req, res) { +const socketsListByProject = async function (req, res) { debug && console.log(`[WS]looking for available sessions for ${req.params.projectKey}`); + let userId; + if (req.query.userId) { + debug && console.log(`[WS]where userId=${req.query.userId}`); + userId = req.query.userId; + } let liveSessions = {}; for (let peerId of io.sockets.adapter.rooms.keys()) { let {projectKey, sessionId} = extractPeerId(peerId); if (projectKey === req.params.projectKey) { - liveSessions[projectKey] = liveSessions[projectKey] || []; - liveSessions[projectKey].push(sessionId); + if (userId) { + const connected_sockets = await io.in(peerId).fetchSockets(); + for (let item of connected_sockets) { + if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo && item.handshake.query.sessionInfo.userID === userId) { + liveSessions[projectKey] = liveSessions[projectKey] || []; + liveSessions[projectKey].push(sessionId); + } + } + } else { + liveSessions[projectKey] = liveSessions[projectKey] || []; + liveSessions[projectKey].push(sessionId); + } } } res.statusCode = 200; @@ -51,6 +81,11 @@ wsRouter.get(`/${process.env.S3_KEY}/sockets-list/:projectKey`, socketsListByPro const socketsLive = async function (req, res) { debug && console.log("[WS]looking for all available LIVE sessions"); + let userId; + if (req.query.userId) { + debug && console.log(`[WS]where userId=${req.query.userId}`); + userId = req.query.userId; + } let liveSessions = {}; for (let peerId of io.sockets.adapter.rooms.keys()) { let {projectKey, sessionId} = extractPeerId(peerId); @@ -58,8 +93,15 @@ const socketsLive = async function (req, res) { let connected_sockets = await io.in(peerId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session) { - liveSessions[projectKey] = liveSessions[projectKey] || []; - liveSessions[projectKey].push(item.handshake.query.sessionInfo); + if (userId) { + if (item.handshake.query.sessionInfo && item.handshake.query.sessionInfo.userID === userId) { + liveSessions[projectKey] = liveSessions[projectKey] || []; + liveSessions[projectKey].push(item.handshake.query.sessionInfo); + } + } else { + liveSessions[projectKey] = liveSessions[projectKey] || []; + liveSessions[projectKey].push(item.handshake.query.sessionInfo); + } } } } @@ -73,6 +115,11 @@ wsRouter.get(`/${process.env.S3_KEY}/sockets-live`, socketsLive); const socketsLiveByProject = async function (req, res) { debug && console.log(`[WS]looking for available LIVE sessions for ${req.params.projectKey}`); + let userId; + if (req.query.userId) { + debug && console.log(`[WS]where userId=${req.query.userId}`); + userId = req.query.userId; + } let liveSessions = {}; for (let peerId of io.sockets.adapter.rooms.keys()) { let {projectKey, sessionId} = extractPeerId(peerId); @@ -80,8 +127,15 @@ const socketsLiveByProject = async function (req, res) { let connected_sockets = await io.in(peerId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session) { - liveSessions[projectKey] = liveSessions[projectKey] || []; - liveSessions[projectKey].push(item.handshake.query.sessionInfo); + if (userId) { + if (item.handshake.query.sessionInfo && item.handshake.query.sessionInfo.userID === userId) { + liveSessions[projectKey] = liveSessions[projectKey] || []; + liveSessions[projectKey].push(item.handshake.query.sessionInfo); + } + } else { + liveSessions[projectKey] = liveSessions[projectKey] || []; + liveSessions[projectKey].push(item.handshake.query.sessionInfo); + } } } } From d96dc8b6ade8ee6dc367c318e94993cf5c2d7582 Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Wed, 16 Mar 2022 13:47:42 +0100 Subject: [PATCH 2/7] feat(utilities): FOSS-WS refactored get live sessions by userId --- utilities/servers/websocket.js | 46 +++++++++++++--------------------- 1 file changed, 17 insertions(+), 29 deletions(-) diff --git a/utilities/servers/websocket.js b/utilities/servers/websocket.js index c902f1b9d..e23002246 100644 --- a/utilities/servers/websocket.js +++ b/utilities/servers/websocket.js @@ -12,32 +12,35 @@ const AGENT_DISCONNECT = "AGENT_DISCONNECTED"; const AGENTS_CONNECTED = "AGENTS_CONNECTED"; const NO_SESSIONS = "SESSION_DISCONNECTED"; const SESSION_ALREADY_CONNECTED = "SESSION_ALREADY_CONNECTED"; -// const wsReconnectionTimeout = process.env.wsReconnectionTimeout | 10 * 1000; let io; let debug = process.env.debug === "1" || false; +const extractUserIdFromRequest = function (req) { + if (req.query.userId) { + debug && console.log(`[WS]where userId=${req.query.userId}`); + return req.query.userId; + } + return undefined; +} + const socketsList = async function (req, res) { debug && console.log("[WS]looking for all available sessions"); - let userId; - if (req.query.userId) { - debug && console.log(`[WS]where userId=${req.query.userId}`); - userId = req.query.userId; - } + let userId = extractUserIdFromRequest(req); + let liveSessions = {}; for (let peerId of io.sockets.adapter.rooms.keys()) { let {projectKey, sessionId} = extractPeerId(peerId); if (projectKey !== undefined) { + liveSessions[projectKey] = liveSessions[projectKey] || []; if (userId) { const connected_sockets = await io.in(peerId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo && item.handshake.query.sessionInfo.userID === userId) { - liveSessions[projectKey] = liveSessions[projectKey] || []; liveSessions[projectKey].push(sessionId); } } } else { - liveSessions[projectKey] = liveSessions[projectKey] || []; liveSessions[projectKey].push(sessionId); } } @@ -50,25 +53,20 @@ wsRouter.get(`/${process.env.S3_KEY}/sockets-list`, socketsList); const socketsListByProject = async function (req, res) { debug && console.log(`[WS]looking for available sessions for ${req.params.projectKey}`); - let userId; - if (req.query.userId) { - debug && console.log(`[WS]where userId=${req.query.userId}`); - userId = req.query.userId; - } + let userId = extractUserIdFromRequest(req); let liveSessions = {}; for (let peerId of io.sockets.adapter.rooms.keys()) { let {projectKey, sessionId} = extractPeerId(peerId); if (projectKey === req.params.projectKey) { + liveSessions[projectKey] = liveSessions[projectKey] || []; if (userId) { const connected_sockets = await io.in(peerId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo && item.handshake.query.sessionInfo.userID === userId) { - liveSessions[projectKey] = liveSessions[projectKey] || []; liveSessions[projectKey].push(sessionId); } } } else { - liveSessions[projectKey] = liveSessions[projectKey] || []; liveSessions[projectKey].push(sessionId); } } @@ -81,11 +79,7 @@ wsRouter.get(`/${process.env.S3_KEY}/sockets-list/:projectKey`, socketsListByPro const socketsLive = async function (req, res) { debug && console.log("[WS]looking for all available LIVE sessions"); - let userId; - if (req.query.userId) { - debug && console.log(`[WS]where userId=${req.query.userId}`); - userId = req.query.userId; - } + let userId = extractUserIdFromRequest(req); let liveSessions = {}; for (let peerId of io.sockets.adapter.rooms.keys()) { let {projectKey, sessionId} = extractPeerId(peerId); @@ -93,13 +87,12 @@ const socketsLive = async function (req, res) { let connected_sockets = await io.in(peerId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session) { + liveSessions[projectKey] = liveSessions[projectKey] || []; if (userId) { if (item.handshake.query.sessionInfo && item.handshake.query.sessionInfo.userID === userId) { - liveSessions[projectKey] = liveSessions[projectKey] || []; liveSessions[projectKey].push(item.handshake.query.sessionInfo); } } else { - liveSessions[projectKey] = liveSessions[projectKey] || []; liveSessions[projectKey].push(item.handshake.query.sessionInfo); } } @@ -115,11 +108,7 @@ wsRouter.get(`/${process.env.S3_KEY}/sockets-live`, socketsLive); const socketsLiveByProject = async function (req, res) { debug && console.log(`[WS]looking for available LIVE sessions for ${req.params.projectKey}`); - let userId; - if (req.query.userId) { - debug && console.log(`[WS]where userId=${req.query.userId}`); - userId = req.query.userId; - } + let userId = extractUserIdFromRequest(req); let liveSessions = {}; for (let peerId of io.sockets.adapter.rooms.keys()) { let {projectKey, sessionId} = extractPeerId(peerId); @@ -127,13 +116,12 @@ const socketsLiveByProject = async function (req, res) { let connected_sockets = await io.in(peerId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session) { + liveSessions[projectKey] = liveSessions[projectKey] || []; if (userId) { if (item.handshake.query.sessionInfo && item.handshake.query.sessionInfo.userID === userId) { - liveSessions[projectKey] = liveSessions[projectKey] || []; liveSessions[projectKey].push(item.handshake.query.sessionInfo); } } else { - liveSessions[projectKey] = liveSessions[projectKey] || []; liveSessions[projectKey].push(item.handshake.query.sessionInfo); } } From ca8e96354662c227d89200c337e10b83b076f429 Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Wed, 16 Mar 2022 15:16:42 +0100 Subject: [PATCH 3/7] feat(utilities): FOSS&EE assist-standalone refactor feat(utilities): FOSS&EE assist-standalone search by userId --- ee/utilities/server.js | 2 +- ee/utilities/servers/websocket.js | 199 ++++++++++++++++++------------ scripts/helmcharts/vars.yaml | 2 +- utilities/servers/websocket.js | 85 ++++++++----- 4 files changed, 176 insertions(+), 112 deletions(-) diff --git a/ee/utilities/server.js b/ee/utilities/server.js index f1209c9ff..d049faa19 100644 --- a/ee/utilities/server.js +++ b/ee/utilities/server.js @@ -3,7 +3,7 @@ var {peerRouter, peerConnection, peerDisconnect, peerError} = require('./servers var express = require('express'); const {ExpressPeerServer} = require('peer'); var socket; -if (process.env.cluster === "true") { +if (process.env.redis === "true") { console.log("Using Redis"); socket = require("./servers/websocket-cluster"); } else { diff --git a/ee/utilities/servers/websocket.js b/ee/utilities/servers/websocket.js index e087dba31..4aa048b1f 100644 --- a/ee/utilities/servers/websocket.js +++ b/ee/utilities/servers/websocket.js @@ -2,8 +2,8 @@ const _io = require('socket.io'); const express = require('express'); const uaParser = require('ua-parser-js'); const geoip2Reader = require('@maxmind/geoip2-node').Reader; -var {extractPeerId} = require('./peerjs-server'); -var wsRouter = express.Router(); +const {extractPeerId} = require('./peerjs-server'); +const wsRouter = express.Router(); const UPDATE_EVENT = "UPDATE_SESSION"; const IDENTITIES = {agent: 'agent', session: 'session'}; const NEW_AGENT = "NEW_AGENT"; @@ -12,22 +12,61 @@ const AGENT_DISCONNECT = "AGENT_DISCONNECTED"; const AGENTS_CONNECTED = "AGENTS_CONNECTED"; const NO_SESSIONS = "SESSION_DISCONNECTED"; const SESSION_ALREADY_CONNECTED = "SESSION_ALREADY_CONNECTED"; -// const wsReconnectionTimeout = process.env.wsReconnectionTimeout | 10 * 1000; let io; -let debug = process.env.debug === "1" || false; +const debug = process.env.debug === "1" || false; -const socketsList = function (req, res) { - debug && console.log("[WS]looking for all available sessions"); - let liveSessions = {}; - for (let peerId of io.sockets.adapter.rooms.keys()) { - let {projectKey, sessionId} = extractPeerId(peerId); - if (projectKey !== undefined) { - liveSessions[projectKey] = liveSessions[projectKey] || []; - liveSessions[projectKey].push(sessionId); - } +const createSocketIOServer = function (server) { + if (process.env.uws !== "true") { + io = _io(server, { + maxHttpBufferSize: (parseInt(process.env.maxHttpBufferSize) || 5) * 1e6, + cors: { + origin: "*", + methods: ["GET", "POST", "PUT"] + }, + path: '/socket' + }); + } else { + io = new _io.Server({ + maxHttpBufferSize: (parseInt(process.env.maxHttpBufferSize) || 5) * 1e6, + cors: { + origin: "*", + methods: ["GET", "POST", "PUT"] + }, + path: '/socket', + // transports: ['websocket'], + // upgrade: false + }); + io.attachApp(server); } - let result = {"data": liveSessions}; +} + +const extractUserIdFromRequest = function (req) { + if (req.query.userId) { + debug && console.log(`[WS]where userId=${req.query.userId}`); + return req.query.userId; + } + return undefined; +} + +const extractProjectKeyFromRequest = function (req) { + if (process.env.uws === "true") { + debug && console.log(`[WS]where projectKey=${req.getParameter(0)}`); + return req.getParameter(0); + } else if (req.params.projectKey) { + debug && console.log(`[WS]where projectKey=${req.params.projectKey}`); + return req.params.projectKey; + } + return undefined; +} + + +const getAvailableRooms = async function () { + return io.sockets.adapter.rooms.keys(); +} + +const respond = function (res, data) { + let result = {data} if (process.env.uws !== "true") { res.statusCode = 200; res.setHeader('Content-Type', 'application/json'); @@ -36,84 +75,111 @@ const socketsList = function (req, res) { res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify(result)); } } -wsRouter.get(`/${process.env.S3_KEY}/sockets-list`, socketsList); -const socketsListByProject = function (req, res) { - if (process.env.uws === "true") { - req.params = {projectKey: req.getParameter(0)}; - } - debug && console.log(`[WS]looking for available sessions for ${req.params.projectKey}`); +const socketsList = async function (req, res) { + debug && console.log("[WS]looking for all available sessions"); + let userId = extractUserIdFromRequest(req); + let liveSessions = {}; - for (let peerId of io.sockets.adapter.rooms.keys()) { + let rooms = await getAvailableRooms(); + for (let peerId of rooms) { let {projectKey, sessionId} = extractPeerId(peerId); - if (projectKey === req.params.projectKey) { + if (projectKey !== undefined) { liveSessions[projectKey] = liveSessions[projectKey] || []; - liveSessions[projectKey].push(sessionId); + if (userId) { + const connected_sockets = await io.in(peerId).fetchSockets(); + for (let item of connected_sockets) { + if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo && item.handshake.query.sessionInfo.userID === userId) { + liveSessions[projectKey].push(sessionId); + } + } + } else { + liveSessions[projectKey].push(sessionId); + } } } - let result = {"data": liveSessions[req.params.projectKey] || []}; - if (process.env.uws !== "true") { - res.statusCode = 200; - res.setHeader('Content-Type', 'application/json'); - res.end(JSON.stringify(result)); - } else { - res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify(result)); + respond(res, liveSessions); +} +wsRouter.get(`/${process.env.S3_KEY}/sockets-list`, socketsList); + +const socketsListByProject = async function (req, res) { + debug && console.log("[WS]looking for available sessions"); + let _projectKey = extractProjectKeyFromRequest(req); + let userId = extractUserIdFromRequest(req); + let liveSessions = {}; + let rooms = await getAvailableRooms(); + for (let peerId of rooms) { + let {projectKey, sessionId} = extractPeerId(peerId); + if (projectKey === _projectKey) { + liveSessions[projectKey] = liveSessions[projectKey] || []; + if (userId) { + const connected_sockets = await io.in(peerId).fetchSockets(); + for (let item of connected_sockets) { + if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo && item.handshake.query.sessionInfo.userID === userId) { + liveSessions[projectKey].push(sessionId); + } + } + } else { + liveSessions[projectKey].push(sessionId); + } + } } + respond(res, liveSessions[_projectKey] || []); } wsRouter.get(`/${process.env.S3_KEY}/sockets-list/:projectKey`, socketsListByProject); const socketsLive = async function (req, res) { debug && console.log("[WS]looking for all available LIVE sessions"); + let userId = extractUserIdFromRequest(req); let liveSessions = {}; - for (let peerId of io.sockets.adapter.rooms.keys()) { + let rooms = await getAvailableRooms(); + for (let peerId of rooms) { let {projectKey, sessionId} = extractPeerId(peerId); if (projectKey !== undefined) { let connected_sockets = await io.in(peerId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session) { liveSessions[projectKey] = liveSessions[projectKey] || []; - liveSessions[projectKey].push(item.handshake.query.sessionInfo); + if (userId) { + if (item.handshake.query.sessionInfo && item.handshake.query.sessionInfo.userID === userId) { + liveSessions[projectKey].push(item.handshake.query.sessionInfo); + } + } else { + liveSessions[projectKey].push(item.handshake.query.sessionInfo); + } } } } } - let result = {"data": liveSessions}; - if (process.env.uws !== "true") { - res.statusCode = 200; - res.setHeader('Content-Type', 'application/json'); - res.end(JSON.stringify(result)); - } else { - res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify(result)); - } + respond(res, liveSessions); } wsRouter.get(`/${process.env.S3_KEY}/sockets-live`, socketsLive); const socketsLiveByProject = async function (req, res) { - if (process.env.uws === "true") { - req.params = {projectKey: req.getParameter(0)}; - } - debug && console.log(`[WS]looking for available LIVE sessions for ${req.params.projectKey}`); + debug && console.log("[WS]looking for available LIVE sessions"); + let _projectKey = extractProjectKeyFromRequest(req); + let userId = extractUserIdFromRequest(req); let liveSessions = {}; - for (let peerId of io.sockets.adapter.rooms.keys()) { + let rooms = await getAvailableRooms(); + for (let peerId of rooms) { let {projectKey, sessionId} = extractPeerId(peerId); - if (projectKey === req.params.projectKey) { + if (projectKey === _projectKey) { let connected_sockets = await io.in(peerId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session) { liveSessions[projectKey] = liveSessions[projectKey] || []; - liveSessions[projectKey].push(item.handshake.query.sessionInfo); + if (userId) { + if (item.handshake.query.sessionInfo && item.handshake.query.sessionInfo.userID === userId) { + liveSessions[projectKey].push(item.handshake.query.sessionInfo); + } + } else { + liveSessions[projectKey].push(item.handshake.query.sessionInfo); + } } } } } - let result = {"data": liveSessions[req.params.projectKey] || []}; - if (process.env.uws !== "true") { - res.statusCode = 200; - res.setHeader('Content-Type', 'application/json'); - res.end(JSON.stringify(result)); - } else { - res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify(result)); - } + respond(res, liveSessions[_projectKey] || []); } wsRouter.get(`/${process.env.S3_KEY}/sockets-live/:projectKey`, socketsLiveByProject); @@ -193,28 +259,7 @@ function extractSessionInfo(socket) { module.exports = { wsRouter, start: (server) => { - if (process.env.uws !== "true") { - io = _io(server, { - maxHttpBufferSize: (parseInt(process.env.maxHttpBufferSize) || 5) * 1e6, - cors: { - origin: "*", - methods: ["GET", "POST", "PUT"] - }, - path: '/socket' - }); - } else { - io = new _io.Server({ - maxHttpBufferSize: (parseInt(process.env.maxHttpBufferSize) || 5) * 1e6, - cors: { - origin: "*", - methods: ["GET", "POST", "PUT"] - }, - path: '/socket', - // transports: ['websocket'], - // upgrade: false - }); - io.attachApp(server); - } + createSocketIOServer(server); io.on('connection', async (socket) => { debug && console.log(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`); socket.peerId = socket.handshake.query.peerId; diff --git a/scripts/helmcharts/vars.yaml b/scripts/helmcharts/vars.yaml index 25d6a67d6..f5cc11067 100644 --- a/scripts/helmcharts/vars.yaml +++ b/scripts/helmcharts/vars.yaml @@ -100,7 +100,7 @@ utilities: env: debug: 0 uws: false - cluster: false + redis: false # If you want to override something # chartname: diff --git a/utilities/servers/websocket.js b/utilities/servers/websocket.js index e23002246..66d0fcde0 100644 --- a/utilities/servers/websocket.js +++ b/utilities/servers/websocket.js @@ -2,8 +2,8 @@ const _io = require('socket.io'); const express = require('express'); const uaParser = require('ua-parser-js'); const geoip2Reader = require('@maxmind/geoip2-node').Reader; -var {extractPeerId} = require('./peerjs-server'); -var wsRouter = express.Router(); +const {extractPeerId} = require('./peerjs-server'); +const wsRouter = express.Router(); const UPDATE_EVENT = "UPDATE_SESSION"; const IDENTITIES = {agent: 'agent', session: 'session'}; const NEW_AGENT = "NEW_AGENT"; @@ -14,7 +14,19 @@ const NO_SESSIONS = "SESSION_DISCONNECTED"; const SESSION_ALREADY_CONNECTED = "SESSION_ALREADY_CONNECTED"; let io; -let debug = process.env.debug === "1" || false; +const debug = process.env.debug === "1" || false; + +const createSocketIOServer = function (server) { + io = _io(server, { + maxHttpBufferSize: (parseInt(process.env.maxHttpBufferSize) || 5) * 1e6, + cors: { + origin: "*", + methods: ["GET", "POST", "PUT"] + }, + path: '/socket' + }); +} + const extractUserIdFromRequest = function (req) { if (req.query.userId) { debug && console.log(`[WS]where userId=${req.query.userId}`); @@ -23,13 +35,32 @@ const extractUserIdFromRequest = function (req) { return undefined; } +const extractProjectKeyFromRequest = function (req) { + if (req.params.projectKey) { + debug && console.log(`[WS]where projectKey=${req.params.projectKey}`); + return req.params.projectKey; + } + return undefined; +} + + +const getAvailableRooms = async function () { + return io.sockets.adapter.rooms.keys(); +} + +const respond = function (res, data) { + res.statusCode = 200; + res.setHeader('Content-Type', 'application/json'); + res.end(JSON.stringify({"data": data})); +} const socketsList = async function (req, res) { debug && console.log("[WS]looking for all available sessions"); let userId = extractUserIdFromRequest(req); let liveSessions = {}; - for (let peerId of io.sockets.adapter.rooms.keys()) { + let rooms = await getAvailableRooms(); + for (let peerId of rooms) { let {projectKey, sessionId} = extractPeerId(peerId); if (projectKey !== undefined) { liveSessions[projectKey] = liveSessions[projectKey] || []; @@ -45,19 +76,19 @@ const socketsList = async function (req, res) { } } } - res.statusCode = 200; - res.setHeader('Content-Type', 'application/json'); - res.end(JSON.stringify({"data": liveSessions})); + respond(res, liveSessions); } wsRouter.get(`/${process.env.S3_KEY}/sockets-list`, socketsList); const socketsListByProject = async function (req, res) { - debug && console.log(`[WS]looking for available sessions for ${req.params.projectKey}`); + debug && console.log("[WS]looking for available sessions"); + let _projectKey = extractProjectKeyFromRequest(req); let userId = extractUserIdFromRequest(req); let liveSessions = {}; - for (let peerId of io.sockets.adapter.rooms.keys()) { + let rooms = await getAvailableRooms(); + for (let peerId of rooms) { let {projectKey, sessionId} = extractPeerId(peerId); - if (projectKey === req.params.projectKey) { + if (projectKey === _projectKey) { liveSessions[projectKey] = liveSessions[projectKey] || []; if (userId) { const connected_sockets = await io.in(peerId).fetchSockets(); @@ -71,9 +102,7 @@ const socketsListByProject = async function (req, res) { } } } - res.statusCode = 200; - res.setHeader('Content-Type', 'application/json'); - res.end(JSON.stringify({"data": liveSessions[req.params.projectKey] || []})); + respond(res, liveSessions[_projectKey] || []); } wsRouter.get(`/${process.env.S3_KEY}/sockets-list/:projectKey`, socketsListByProject); @@ -81,7 +110,8 @@ const socketsLive = async function (req, res) { debug && console.log("[WS]looking for all available LIVE sessions"); let userId = extractUserIdFromRequest(req); let liveSessions = {}; - for (let peerId of io.sockets.adapter.rooms.keys()) { + let rooms = await getAvailableRooms(); + for (let peerId of rooms) { let {projectKey, sessionId} = extractPeerId(peerId); if (projectKey !== undefined) { let connected_sockets = await io.in(peerId).fetchSockets(); @@ -99,20 +129,19 @@ const socketsLive = async function (req, res) { } } } - - res.statusCode = 200; - res.setHeader('Content-Type', 'application/json'); - res.end(JSON.stringify({"data": liveSessions})); + respond(res, liveSessions); } wsRouter.get(`/${process.env.S3_KEY}/sockets-live`, socketsLive); const socketsLiveByProject = async function (req, res) { - debug && console.log(`[WS]looking for available LIVE sessions for ${req.params.projectKey}`); + debug && console.log("[WS]looking for available LIVE sessions"); + let _projectKey = extractProjectKeyFromRequest(req); let userId = extractUserIdFromRequest(req); let liveSessions = {}; - for (let peerId of io.sockets.adapter.rooms.keys()) { + let rooms = await getAvailableRooms(); + for (let peerId of rooms) { let {projectKey, sessionId} = extractPeerId(peerId); - if (projectKey === req.params.projectKey) { + if (projectKey === _projectKey) { let connected_sockets = await io.in(peerId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session) { @@ -128,9 +157,7 @@ const socketsLiveByProject = async function (req, res) { } } } - res.statusCode = 200; - res.setHeader('Content-Type', 'application/json'); - res.end(JSON.stringify({"data": liveSessions[req.params.projectKey] || []})); + respond(res, liveSessions[_projectKey] || []); } wsRouter.get(`/${process.env.S3_KEY}/sockets-live/:projectKey`, socketsLiveByProject); @@ -210,15 +237,7 @@ function extractSessionInfo(socket) { module.exports = { wsRouter, start: (server) => { - io = _io(server, { - maxHttpBufferSize: (parseInt(process.env.maxHttpBufferSize) || 5) * 1e6, - cors: { - origin: "*", - methods: ["GET", "POST", "PUT"] - }, - path: '/socket' - }); - + createSocketIOServer(server); io.on('connection', async (socket) => { debug && console.log(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`); socket.peerId = socket.handshake.query.peerId; From 55bf3c703660044e35dcecc0f8245ee4743d8f7d Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Wed, 16 Mar 2022 15:24:16 +0100 Subject: [PATCH 4/7] feat(utilities): EE assist-standalone fixed extract userId --- ee/utilities/servers/websocket.js | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/ee/utilities/servers/websocket.js b/ee/utilities/servers/websocket.js index 4aa048b1f..7e5f062fa 100644 --- a/ee/utilities/servers/websocket.js +++ b/ee/utilities/servers/websocket.js @@ -42,7 +42,12 @@ const createSocketIOServer = function (server) { } const extractUserIdFromRequest = function (req) { - if (req.query.userId) { + if (process.env.uws === "true") { + if (req.getQuery("userId")) { + debug && console.log(`[WS]where userId=${req.getQuery("userId")}`); + return req.getQuery("userId"); + } + } else if (req.query.userId) { debug && console.log(`[WS]where userId=${req.query.userId}`); return req.query.userId; } @@ -51,8 +56,10 @@ const extractUserIdFromRequest = function (req) { const extractProjectKeyFromRequest = function (req) { if (process.env.uws === "true") { - debug && console.log(`[WS]where projectKey=${req.getParameter(0)}`); - return req.getParameter(0); + if (req.getParameter(0)) { + debug && console.log(`[WS]where projectKey=${req.getParameter(0)}`); + return req.getParameter(0); + } } else if (req.params.projectKey) { debug && console.log(`[WS]where projectKey=${req.params.projectKey}`); return req.params.projectKey; From 83325af69fcaefd59e8b9a4d928f559c1457c659 Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Wed, 16 Mar 2022 15:49:54 +0100 Subject: [PATCH 5/7] feat(utilities): FOSS&EE assist-standalone refactored feat(utilities): EE assist-redis refactored feat(utilities): EE assist-redis search by userId --- ee/utilities/servers/websocket-cluster.js | 210 +++++++++++++--------- ee/utilities/servers/websocket.js | 6 +- utilities/servers/websocket.js | 6 +- 3 files changed, 134 insertions(+), 88 deletions(-) diff --git a/ee/utilities/servers/websocket-cluster.js b/ee/utilities/servers/websocket-cluster.js index 940f83879..f1cba3014 100644 --- a/ee/utilities/servers/websocket-cluster.js +++ b/ee/utilities/servers/websocket-cluster.js @@ -5,8 +5,7 @@ const geoip2Reader = require('@maxmind/geoip2-node').Reader; const {extractPeerId} = require('./peerjs-server'); const {createAdapter} = require("@socket.io/redis-adapter"); const {createClient} = require("redis"); - -var wsRouter = express.Router(); +const wsRouter = express.Router(); const UPDATE_EVENT = "UPDATE_SESSION"; const IDENTITIES = {agent: 'agent', session: 'session'}; const NEW_AGENT = "NEW_AGENT"; @@ -15,14 +14,37 @@ const AGENT_DISCONNECT = "AGENT_DISCONNECTED"; const AGENTS_CONNECTED = "AGENTS_CONNECTED"; const NO_SESSIONS = "SESSION_DISCONNECTED"; const SESSION_ALREADY_CONNECTED = "SESSION_ALREADY_CONNECTED"; -// const wsReconnectionTimeout = process.env.wsReconnectionTimeout | 10 * 1000; +const REDIS_URL = process.env.REDIS_URL || "redis://localhost:6379"; +const pubClient = createClient({url: REDIS_URL}); +const subClient = pubClient.duplicate(); let io; const debug = process.env.debug === "1" || false; -const REDIS_URL = process.env.REDIS_URL || "redis://localhost:6379"; -const pubClient = createClient({url: REDIS_URL}); -const subClient = pubClient.duplicate(); +const createSocketIOServer = function (server) { + if (process.env.uws !== "true") { + io = _io(server, { + maxHttpBufferSize: (parseInt(process.env.maxHttpBufferSize) || 5) * 1e6, + cors: { + origin: "*", + methods: ["GET", "POST", "PUT"] + }, + path: '/socket' + }); + } else { + io = new _io.Server({ + maxHttpBufferSize: (parseInt(process.env.maxHttpBufferSize) || 5) * 1e6, + cors: { + origin: "*", + methods: ["GET", "POST", "PUT"] + }, + path: '/socket', + // transports: ['websocket'], + // upgrade: false + }); + io.attachApp(server); + } +} const uniqueSessions = function (data) { let resArr = []; @@ -36,18 +58,40 @@ const uniqueSessions = function (data) { return resArr; } -const socketsList = async function (req, res) { - debug && console.log("[WS]looking for all available sessions"); - let liveSessions = {}; - let rooms = await io.of('/').adapter.allRooms(); - for (let peerId of rooms) { - let {projectKey, sessionId} = extractPeerId(peerId); - if (projectKey !== undefined) { - liveSessions[projectKey] = liveSessions[projectKey] || []; - liveSessions[projectKey].push(sessionId); +const extractUserIdFromRequest = function (req) { + if (process.env.uws === "true") { + if (req.getQuery("userId")) { + debug && console.log(`[WS]where userId=${req.getQuery("userId")}`); + return req.getQuery("userId"); } + } else if (req.query.userId) { + debug && console.log(`[WS]where userId=${req.query.userId}`); + return req.query.userId; } - let result = {"data": liveSessions}; + return undefined; +} + +const extractProjectKeyFromRequest = function (req) { + if (process.env.uws === "true") { + if (req.getParameter(0)) { + debug && console.log(`[WS]where projectKey=${req.getParameter(0)}`); + return req.getParameter(0); + } + } else if (req.params.projectKey) { + debug && console.log(`[WS]where projectKey=${req.params.projectKey}`); + return req.params.projectKey; + } + return undefined; +} + + +const getAvailableRooms = async function () { + let rooms = await io.of('/').adapter.allRooms(); + return rooms; +} + +const respond = function (res, data) { + let result = {data} if (process.env.uws !== "true") { res.statusCode = 200; res.setHeader('Content-Type', 'application/json'); @@ -56,37 +100,64 @@ const socketsList = async function (req, res) { res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify(result)); } } + +const socketsList = async function (req, res) { + debug && console.log("[WS]looking for all available sessions"); + let userId = extractUserIdFromRequest(req); + + let liveSessions = {}; + let rooms = await getAvailableRooms(); + for (let peerId of rooms) { + let {projectKey, sessionId} = extractPeerId(peerId); + if (projectKey !== undefined) { + liveSessions[projectKey] = liveSessions[projectKey] || []; + if (userId) { + const connected_sockets = await io.in(peerId).fetchSockets(); + for (let item of connected_sockets) { + if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo && item.handshake.query.sessionInfo.userID === userId) { + liveSessions[projectKey].push(sessionId); + } + } + } else { + liveSessions[projectKey].push(sessionId); + } + } + } + respond(res, liveSessions); +} wsRouter.get(`/${process.env.S3_KEY}/sockets-list`, socketsList); const socketsListByProject = async function (req, res) { - if (process.env.uws === "true") { - req.params = {projectKey: req.getParameter(0)}; - } - debug && console.log(`[WS]looking for available sessions for ${req.params.projectKey}`); + debug && console.log("[WS]looking for available sessions"); + let _projectKey = extractProjectKeyFromRequest(req); + let userId = extractUserIdFromRequest(req); let liveSessions = {}; - let rooms = await io.of('/').adapter.allRooms(); + let rooms = await getAvailableRooms(); for (let peerId of rooms) { let {projectKey, sessionId} = extractPeerId(peerId); - if (projectKey === req.params.projectKey) { + if (projectKey === _projectKey) { liveSessions[projectKey] = liveSessions[projectKey] || []; - liveSessions[projectKey].push(sessionId); + if (userId) { + const connected_sockets = await io.in(peerId).fetchSockets(); + for (let item of connected_sockets) { + if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo && item.handshake.query.sessionInfo.userID === userId) { + liveSessions[projectKey].push(sessionId); + } + } + } else { + liveSessions[projectKey].push(sessionId); + } } } - let result = {"data": liveSessions[req.params.projectKey] || []}; - if (process.env.uws !== "true") { - res.statusCode = 200; - res.setHeader('Content-Type', 'application/json'); - res.end(JSON.stringify(result)); - } else { - res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify(result)); - } + respond(res, liveSessions[_projectKey] || []); } wsRouter.get(`/${process.env.S3_KEY}/sockets-list/:projectKey`, socketsListByProject); const socketsLive = async function (req, res) { debug && console.log("[WS]looking for all available LIVE sessions"); + let userId = extractUserIdFromRequest(req); let liveSessions = {}; - let rooms = await io.of('/').adapter.allRooms(); + let rooms = await getAvailableRooms(); for (let peerId of rooms) { let {projectKey, sessionId} = extractPeerId(peerId); if (projectKey !== undefined) { @@ -94,51 +165,48 @@ const socketsLive = async function (req, res) { for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session) { liveSessions[projectKey] = liveSessions[projectKey] || []; - liveSessions[projectKey].push(item.handshake.query.sessionInfo); + if (userId) { + if (item.handshake.query.sessionInfo && item.handshake.query.sessionInfo.userID === userId) { + liveSessions[projectKey].push(item.handshake.query.sessionInfo); + } + } else { + liveSessions[projectKey].push(item.handshake.query.sessionInfo); + } } } - liveSessions[projectKey] = uniqueSessions(liveSessions[projectKey]); + liveSessions[projectKey] = uniqueSessions(liveSessions[_projectKey]); } } - let result = {"data": liveSessions}; - if (process.env.uws !== "true") { - res.statusCode = 200; - res.setHeader('Content-Type', 'application/json'); - res.end(JSON.stringify(result)); - } else { - res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify(result)); - } + respond(res, liveSessions); } wsRouter.get(`/${process.env.S3_KEY}/sockets-live`, socketsLive); const socketsLiveByProject = async function (req, res) { - if (process.env.uws === "true") { - req.params = {projectKey: req.getParameter(0)}; - } - debug && console.log(`[WS]looking for available LIVE sessions for ${req.params.projectKey}`); + debug && console.log("[WS]looking for available LIVE sessions"); + let _projectKey = extractProjectKeyFromRequest(req); + let userId = extractUserIdFromRequest(req); let liveSessions = {}; - let rooms = await io.of('/').adapter.allRooms(); + let rooms = await getAvailableRooms(); for (let peerId of rooms) { let {projectKey, sessionId} = extractPeerId(peerId); - if (projectKey === req.params.projectKey) { + if (projectKey === _projectKey) { let connected_sockets = await io.in(peerId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session) { liveSessions[projectKey] = liveSessions[projectKey] || []; - liveSessions[projectKey].push(item.handshake.query.sessionInfo); + if (userId) { + if (item.handshake.query.sessionInfo && item.handshake.query.sessionInfo.userID === userId) { + liveSessions[projectKey].push(item.handshake.query.sessionInfo); + } + } else { + liveSessions[projectKey].push(item.handshake.query.sessionInfo); + } } } - liveSessions[projectKey] = uniqueSessions(liveSessions[projectKey]); + liveSessions[projectKey] = uniqueSessions(liveSessions[_projectKey]); } } - let result = {"data": liveSessions[req.params.projectKey] || []}; - if (process.env.uws !== "true") { - res.statusCode = 200; - res.setHeader('Content-Type', 'application/json'); - res.end(JSON.stringify(result)); - } else { - res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify(result)); - } + respond(res, liveSessions[_projectKey] || []); } wsRouter.get(`/${process.env.S3_KEY}/sockets-live/:projectKey`, socketsLiveByProject); @@ -220,34 +288,12 @@ function extractSessionInfo(socket) { module.exports = { wsRouter, start: (server) => { - if (process.env.uws !== "true") { - io = _io(server, { - maxHttpBufferSize: (parseInt(process.env.maxHttpBufferSize) || 5) * 1e6, - cors: { - origin: "*", - methods: ["GET", "POST", "PUT"] - }, - path: '/socket' - }); - } else { - io = new _io.Server({ - maxHttpBufferSize: (parseInt(process.env.maxHttpBufferSize) || 5) * 1e6, - cors: { - origin: "*", - methods: ["GET", "POST", "PUT"] - }, - path: '/socket', - // transports: ['websocket'], - // upgrade: false - }); - io.attachApp(server); - } - + createSocketIOServer(server); io.on('connection', async (socket) => { debug && console.log(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`); socket.peerId = socket.handshake.query.peerId; socket.identity = socket.handshake.query.identity; - let {projectKey, sessionId} = extractPeerId(socket.peerId); + const {projectKey, sessionId} = extractPeerId(socket.peerId); socket.sessionId = sessionId; socket.projectKey = projectKey; socket.lastMessageReceivedAt = Date.now(); diff --git a/ee/utilities/servers/websocket.js b/ee/utilities/servers/websocket.js index 7e5f062fa..63559e11b 100644 --- a/ee/utilities/servers/websocket.js +++ b/ee/utilities/servers/websocket.js @@ -337,10 +337,10 @@ module.exports = { socket.onAny(async (eventName, ...args) => { socket.lastMessageReceivedAt = Date.now(); if (socket.identity === IDENTITIES.session) { - debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.peerId}, members: ${io.sockets.adapter.rooms.get(socket.peerId).size}`); + debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.peerId}`); socket.to(socket.peerId).emit(eventName, args[0]); } else { - debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to session of room:${socket.peerId}, members:${io.sockets.adapter.rooms.get(socket.peerId).size}`); + debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to session of room:${socket.peerId}`); let socketId = await findSessionSocketId(io, socket.peerId); if (socketId === null) { debug && console.log(`session not found for:${socket.peerId}`); @@ -354,7 +354,7 @@ module.exports = { }); console.log("WS server started") - setInterval((io) => { + setInterval(async (io) => { try { let count = 0; console.log(` ====== Rooms: ${io.sockets.adapter.rooms.size} ====== `); diff --git a/utilities/servers/websocket.js b/utilities/servers/websocket.js index 66d0fcde0..dd4c012dc 100644 --- a/utilities/servers/websocket.js +++ b/utilities/servers/websocket.js @@ -308,10 +308,10 @@ module.exports = { socket.onAny(async (eventName, ...args) => { socket.lastMessageReceivedAt = Date.now(); if (socket.identity === IDENTITIES.session) { - debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.peerId}, members: ${io.sockets.adapter.rooms.get(socket.peerId).size}`); + debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.peerId}`); socket.to(socket.peerId).emit(eventName, args[0]); } else { - debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to session of room:${socket.peerId}, members:${io.sockets.adapter.rooms.get(socket.peerId).size}`); + debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to session of room:${socket.peerId}`); let socketId = await findSessionSocketId(io, socket.peerId); if (socketId === null) { debug && console.log(`session not found for:${socket.peerId}`); @@ -325,7 +325,7 @@ module.exports = { }); console.log("WS server started") - setInterval((io) => { + setInterval(async (io) => { try { let count = 0; console.log(` ====== Rooms: ${io.sockets.adapter.rooms.size} ====== `); From 729d5715b45beaab3adae6647fb7cf87d0787e64 Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Wed, 16 Mar 2022 16:02:47 +0100 Subject: [PATCH 6/7] feat(utilities): FOSS&EE assist socket creation customize path --- ee/utilities/servers/websocket-cluster.js | 10 +++++----- ee/utilities/servers/websocket.js | 10 +++++----- utilities/servers/websocket.js | 8 ++++---- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/ee/utilities/servers/websocket-cluster.js b/ee/utilities/servers/websocket-cluster.js index f1cba3014..c044043a5 100644 --- a/ee/utilities/servers/websocket-cluster.js +++ b/ee/utilities/servers/websocket-cluster.js @@ -21,7 +21,7 @@ const subClient = pubClient.duplicate(); let io; const debug = process.env.debug === "1" || false; -const createSocketIOServer = function (server) { +const createSocketIOServer = function (server, prefix) { if (process.env.uws !== "true") { io = _io(server, { maxHttpBufferSize: (parseInt(process.env.maxHttpBufferSize) || 5) * 1e6, @@ -29,7 +29,7 @@ const createSocketIOServer = function (server) { origin: "*", methods: ["GET", "POST", "PUT"] }, - path: '/socket' + path: (prefix ? prefix : '') + '/socket' }); } else { io = new _io.Server({ @@ -38,7 +38,7 @@ const createSocketIOServer = function (server) { origin: "*", methods: ["GET", "POST", "PUT"] }, - path: '/socket', + path: (prefix ? prefix : '') + '/socket' // transports: ['websocket'], // upgrade: false }); @@ -287,8 +287,8 @@ function extractSessionInfo(socket) { module.exports = { wsRouter, - start: (server) => { - createSocketIOServer(server); + start: (server, prefix) => { + createSocketIOServer(server, prefix); io.on('connection', async (socket) => { debug && console.log(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`); socket.peerId = socket.handshake.query.peerId; diff --git a/ee/utilities/servers/websocket.js b/ee/utilities/servers/websocket.js index 63559e11b..0bd397d96 100644 --- a/ee/utilities/servers/websocket.js +++ b/ee/utilities/servers/websocket.js @@ -16,7 +16,7 @@ const SESSION_ALREADY_CONNECTED = "SESSION_ALREADY_CONNECTED"; let io; const debug = process.env.debug === "1" || false; -const createSocketIOServer = function (server) { +const createSocketIOServer = function (server, prefix) { if (process.env.uws !== "true") { io = _io(server, { maxHttpBufferSize: (parseInt(process.env.maxHttpBufferSize) || 5) * 1e6, @@ -24,7 +24,7 @@ const createSocketIOServer = function (server) { origin: "*", methods: ["GET", "POST", "PUT"] }, - path: '/socket' + path: (prefix ? prefix : '') + '/socket' }); } else { io = new _io.Server({ @@ -33,7 +33,7 @@ const createSocketIOServer = function (server) { origin: "*", methods: ["GET", "POST", "PUT"] }, - path: '/socket', + path: (prefix ? prefix : '') + '/socket' // transports: ['websocket'], // upgrade: false }); @@ -265,8 +265,8 @@ function extractSessionInfo(socket) { module.exports = { wsRouter, - start: (server) => { - createSocketIOServer(server); + start: (server, prefix) => { + createSocketIOServer(server, prefix); io.on('connection', async (socket) => { debug && console.log(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`); socket.peerId = socket.handshake.query.peerId; diff --git a/utilities/servers/websocket.js b/utilities/servers/websocket.js index dd4c012dc..772bd7315 100644 --- a/utilities/servers/websocket.js +++ b/utilities/servers/websocket.js @@ -16,14 +16,14 @@ const SESSION_ALREADY_CONNECTED = "SESSION_ALREADY_CONNECTED"; let io; const debug = process.env.debug === "1" || false; -const createSocketIOServer = function (server) { +const createSocketIOServer = function (server, prefix) { io = _io(server, { maxHttpBufferSize: (parseInt(process.env.maxHttpBufferSize) || 5) * 1e6, cors: { origin: "*", methods: ["GET", "POST", "PUT"] }, - path: '/socket' + path: (prefix ? prefix : '') + '/socket' }); } @@ -236,8 +236,8 @@ function extractSessionInfo(socket) { module.exports = { wsRouter, - start: (server) => { - createSocketIOServer(server); + start: (server, prefix) => { + createSocketIOServer(server, prefix); io.on('connection', async (socket) => { debug && console.log(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`); socket.peerId = socket.handshake.query.peerId; From c45d87d12ec10405bedbd6cc742c5a58a8b72388 Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Wed, 16 Mar 2022 16:25:38 +0100 Subject: [PATCH 7/7] feat(api): assist search by userId --- api/chalicelib/core/assist.py | 5 +++-- api/routers/core.py | 11 ++--------- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/api/chalicelib/core/assist.py b/api/chalicelib/core/assist.py index 70f563ec8..44adfe2d1 100644 --- a/api/chalicelib/core/assist.py +++ b/api/chalicelib/core/assist.py @@ -64,9 +64,10 @@ def get_live_sessions(project_id, filters=None): return helper.list_to_camel_case(results) -def get_live_sessions_ws(project_id): +def get_live_sessions_ws(project_id, user_id=None): project_key = projects.get_project_key(project_id) - connected_peers = requests.get(config("peers") % config("S3_KEY") + f"/{project_key}") + connected_peers = requests.get(config("peers") % config("S3_KEY") \ + + f"/{project_key}" + (f"?userId={user_id}" if user_id else "")) if connected_peers.status_code != 200: print("!! issue with the peer-server") print(connected_peers.text) diff --git a/api/routers/core.py b/api/routers/core.py index df9ce0e8f..97a749429 100644 --- a/api/routers/core.py +++ b/api/routers/core.py @@ -820,15 +820,8 @@ def all_issue_types(context: schemas.CurrentContext = Depends(OR_context)): @app.get('/{projectId}/assist/sessions', tags=["assist"]) -def sessions_live(projectId: int, context: schemas.CurrentContext = Depends(OR_context)): - data = assist.get_live_sessions_ws(projectId) - return {'data': data} - - -@app.post('/{projectId}/assist/sessions', tags=["assist"]) -def sessions_live_search(projectId: int, data: schemas.AssistSearchPayloadSchema = Body(...), - context: schemas.CurrentContext = Depends(OR_context)): - data = assist.get_live_sessions_ws(projectId) +def sessions_live(projectId: int, userId: str = None, context: schemas.CurrentContext = Depends(OR_context)): + data = assist.get_live_sessions_ws(projectId, user_id=userId) return {'data': data}