diff --git a/ee/utilities/.gitignore b/ee/utilities/.gitignore index 0aaf625c9..f54e439ba 100644 --- a/ee/utilities/.gitignore +++ b/ee/utilities/.gitignore @@ -10,6 +10,7 @@ build.sh servers/peerjs-server.js servers/sourcemaps-handler.js servers/sourcemaps-server.js -#servers/websocket.js -/utils /Dockerfile +/utils/geoIP.js +/utils/HeapSnapshot.js +/utils/helper.js diff --git a/ee/utilities/server.js b/ee/utilities/server.js index 429b37c25..fc319d79c 100644 --- a/ee/utilities/server.js +++ b/ee/utilities/server.js @@ -16,8 +16,9 @@ const PREFIX = process.env.prefix || `/assist` if (process.env.uws !== "true") { let wsapp = express(); + wsapp.use(express.json()); + wsapp.use(express.urlencoded({extended: true})); wsapp.use(request_logger("[wsapp]")); - wsapp.use(request_logger("[app]")); wsapp.get([PREFIX, `${PREFIX}/`], (req, res) => { res.statusCode = 200; res.end("ok!"); @@ -73,10 +74,14 @@ if (process.env.uws !== "true") { } } uapp.get(`${PREFIX}/${process.env.S3_KEY}/sockets-list`, uWrapper(socket.handlers.socketsList)); + uapp.post(`${PREFIX}/${process.env.S3_KEY}/sockets-list`, uWrapper(socket.handlers.socketsList)); uapp.get(`${PREFIX}/${process.env.S3_KEY}/sockets-list/:projectKey`, uWrapper(socket.handlers.socketsListByProject)); + uapp.post(`${PREFIX}/${process.env.S3_KEY}/sockets-list/:projectKey`, uWrapper(socket.handlers.socketsListByProject)); uapp.get(`${PREFIX}/${process.env.S3_KEY}/sockets-live`, uWrapper(socket.handlers.socketsLive)); + uapp.post(`${PREFIX}/${process.env.S3_KEY}/sockets-live`, uWrapper(socket.handlers.socketsLive)); uapp.get(`${PREFIX}/${process.env.S3_KEY}/sockets-live/:projectKey`, uWrapper(socket.handlers.socketsLiveByProject)); + uapp.post(`${PREFIX}/${process.env.S3_KEY}/sockets-live/:projectKey`, uWrapper(socket.handlers.socketsLiveByProject)); socket.start(uapp); diff --git a/ee/utilities/servers/websocket-cluster.js b/ee/utilities/servers/websocket-cluster.js index 0b8a56699..4b3cb0a42 100644 --- a/ee/utilities/servers/websocket-cluster.js +++ b/ee/utilities/servers/websocket-cluster.js @@ -1,7 +1,8 @@ const _io = require('socket.io'); const express = require('express'); const uaParser = require('ua-parser-js'); -const {extractPeerId} = require('../utils/helper'); +const {extractPeerId, hasFilters, isValidSession} = require('../utils/helper'); +const {extractFiltersFromRequest} = require('../utils/helper-ee'); const {geoip} = require('../utils/geoIP'); const {createAdapter} = require("@socket.io/redis-adapter"); const {createClient} = require("redis"); @@ -59,19 +60,6 @@ const uniqueSessions = function (data) { return resArr; } -const extractUserIdFromRequest = function (req) { - if (process.env.uws === "true") { - if (req.getQuery("userId")) { - debug && console.log(`[WS]where userId=${req.getQuery("userId")}`); - return req.getQuery("userId"); - } - } else if (req.query.userId) { - debug && console.log(`[WS]where userId=${req.query.userId}`); - return req.query.userId; - } - return undefined; -} - const extractProjectKeyFromRequest = function (req) { if (process.env.uws === "true") { if (req.getParameter(0)) { @@ -103,7 +91,7 @@ const respond = function (res, data) { const socketsList = async function (req, res) { debug && console.log("[WS]looking for all available sessions"); - let userId = extractUserIdFromRequest(req); + let filters = await extractFiltersFromRequest(req, res); let liveSessions = {}; let rooms = await getAvailableRooms(); @@ -111,10 +99,11 @@ const socketsList = async function (req, res) { let {projectKey, sessionId} = extractPeerId(peerId); if (projectKey !== undefined) { liveSessions[projectKey] = liveSessions[projectKey] || []; - if (userId) { + if (hasFilters(filters)) { const connected_sockets = await io.in(peerId).fetchSockets(); for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo && item.handshake.query.sessionInfo.userID === userId) { + if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo + && isValidSession(item.handshake.query.sessionInfo, filters)) { liveSessions[projectKey].push(sessionId); } } @@ -126,21 +115,23 @@ const socketsList = async function (req, res) { respond(res, liveSessions); } wsRouter.get(`/sockets-list`, socketsList); +wsRouter.post(`/sockets-list`, socketsList); const socketsListByProject = async function (req, res) { debug && console.log("[WS]looking for available sessions"); let _projectKey = extractProjectKeyFromRequest(req); - let userId = extractUserIdFromRequest(req); + let filters = await extractFiltersFromRequest(req, res); let liveSessions = {}; let rooms = await getAvailableRooms(); for (let peerId of rooms) { let {projectKey, sessionId} = extractPeerId(peerId); if (projectKey === _projectKey) { liveSessions[projectKey] = liveSessions[projectKey] || []; - if (userId) { + if (hasFilters(filters)) { const connected_sockets = await io.in(peerId).fetchSockets(); for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo && item.handshake.query.sessionInfo.userID === userId) { + if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo + && isValidSession(item.handshake.query.sessionInfo, filters)) { liveSessions[projectKey].push(sessionId); } } @@ -152,10 +143,11 @@ const socketsListByProject = async function (req, res) { respond(res, liveSessions[_projectKey] || []); } wsRouter.get(`/sockets-list/:projectKey`, socketsListByProject); +wsRouter.post(`/sockets-list/:projectKey`, socketsListByProject); const socketsLive = async function (req, res) { debug && console.log("[WS]looking for all available LIVE sessions"); - let userId = extractUserIdFromRequest(req); + let filters = await extractFiltersFromRequest(req, res); let liveSessions = {}; let rooms = await getAvailableRooms(); for (let peerId of rooms) { @@ -165,8 +157,8 @@ const socketsLive = async function (req, res) { for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session) { liveSessions[projectKey] = liveSessions[projectKey] || []; - if (userId) { - if (item.handshake.query.sessionInfo && item.handshake.query.sessionInfo.userID === userId) { + if (hasFilters(filters)) { + if (item.handshake.query.sessionInfo && isValidSession(item.handshake.query.sessionInfo, filters)) { liveSessions[projectKey].push(item.handshake.query.sessionInfo); } } else { @@ -180,11 +172,12 @@ const socketsLive = async function (req, res) { respond(res, liveSessions); } wsRouter.get(`/sockets-live`, socketsLive); +wsRouter.post(`/sockets-live`, socketsLive); const socketsLiveByProject = async function (req, res) { debug && console.log("[WS]looking for available LIVE sessions"); let _projectKey = extractProjectKeyFromRequest(req); - let userId = extractUserIdFromRequest(req); + let filters = await extractFiltersFromRequest(req, res); let liveSessions = {}; let rooms = await getAvailableRooms(); for (let peerId of rooms) { @@ -194,8 +187,8 @@ const socketsLiveByProject = async function (req, res) { for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session) { liveSessions[projectKey] = liveSessions[projectKey] || []; - if (userId) { - if (item.handshake.query.sessionInfo && item.handshake.query.sessionInfo.userID === userId) { + if (hasFilters(filters)) { + if (item.handshake.query.sessionInfo && isValidSession(item.handshake.query.sessionInfo, filters)) { liveSessions[projectKey].push(item.handshake.query.sessionInfo); } } else { @@ -209,6 +202,7 @@ const socketsLiveByProject = async function (req, res) { respond(res, liveSessions[_projectKey] || []); } wsRouter.get(`/sockets-live/:projectKey`, socketsLiveByProject); +wsRouter.post(`/sockets-live/:projectKey`, socketsLiveByProject); const findSessionSocketId = async (io, peerId) => { const connected_sockets = await io.in(peerId).fetchSockets(); diff --git a/ee/utilities/servers/websocket.js b/ee/utilities/servers/websocket.js index 51fa4cc41..63f38b94e 100644 --- a/ee/utilities/servers/websocket.js +++ b/ee/utilities/servers/websocket.js @@ -1,7 +1,8 @@ const _io = require('socket.io'); const express = require('express'); const uaParser = require('ua-parser-js'); -const {extractPeerId} = require('../utils/helper'); +const {extractPeerId, hasFilters, isValidSession} = require('../utils/helper'); +const {extractFiltersFromRequest} = require('../utils/helper-ee'); const {geoip} = require('../utils/geoIP'); const wsRouter = express.Router(); const UPDATE_EVENT = "UPDATE_SESSION"; @@ -42,19 +43,6 @@ const createSocketIOServer = function (server, prefix) { } } -const extractUserIdFromRequest = function (req) { - if (process.env.uws === "true") { - if (req.getQuery("userId")) { - debug && console.log(`[WS]where userId=${req.getQuery("userId")}`); - return req.getQuery("userId"); - } - } else if (req.query.userId) { - debug && console.log(`[WS]where userId=${req.query.userId}`); - return req.query.userId; - } - return undefined; -} - const extractProjectKeyFromRequest = function (req) { if (process.env.uws === "true") { if (req.getParameter(0)) { @@ -86,18 +74,18 @@ const respond = function (res, data) { const socketsList = async function (req, res) { debug && console.log("[WS]looking for all available sessions"); - let userId = extractUserIdFromRequest(req); - + let filters = await extractFiltersFromRequest(req, res); let liveSessions = {}; let rooms = await getAvailableRooms(); for (let peerId of rooms) { let {projectKey, sessionId} = extractPeerId(peerId); if (projectKey !== undefined) { liveSessions[projectKey] = liveSessions[projectKey] || []; - if (userId) { + if (hasFilters(filters)) { const connected_sockets = await io.in(peerId).fetchSockets(); for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo && item.handshake.query.sessionInfo.userID === userId) { + if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo + && isValidSession(item.handshake.query.sessionInfo, filters)) { liveSessions[projectKey].push(sessionId); } } @@ -109,21 +97,23 @@ const socketsList = async function (req, res) { respond(res, liveSessions); } wsRouter.get(`/sockets-list`, socketsList); +wsRouter.post(`/sockets-list`, socketsList); const socketsListByProject = async function (req, res) { debug && console.log("[WS]looking for available sessions"); let _projectKey = extractProjectKeyFromRequest(req); - let userId = extractUserIdFromRequest(req); + let filters = await extractFiltersFromRequest(req, res); let liveSessions = {}; let rooms = await getAvailableRooms(); for (let peerId of rooms) { let {projectKey, sessionId} = extractPeerId(peerId); if (projectKey === _projectKey) { liveSessions[projectKey] = liveSessions[projectKey] || []; - if (userId) { + if (hasFilters(filters)) { const connected_sockets = await io.in(peerId).fetchSockets(); for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo && item.handshake.query.sessionInfo.userID === userId) { + if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo + && isValidSession(item.handshake.query.sessionInfo, filters)) { liveSessions[projectKey].push(sessionId); } } @@ -135,10 +125,11 @@ const socketsListByProject = async function (req, res) { respond(res, liveSessions[_projectKey] || []); } wsRouter.get(`/sockets-list/:projectKey`, socketsListByProject); +wsRouter.post(`/sockets-list/:projectKey`, socketsListByProject); const socketsLive = async function (req, res) { debug && console.log("[WS]looking for all available LIVE sessions"); - let userId = extractUserIdFromRequest(req); + let filters = await extractFiltersFromRequest(req, res); let liveSessions = {}; let rooms = await getAvailableRooms(); for (let peerId of rooms) { @@ -148,8 +139,8 @@ const socketsLive = async function (req, res) { for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session) { liveSessions[projectKey] = liveSessions[projectKey] || []; - if (userId) { - if (item.handshake.query.sessionInfo && item.handshake.query.sessionInfo.userID === userId) { + if (hasFilters(filters)) { + if (item.handshake.query.sessionInfo && isValidSession(item.handshake.query.sessionInfo, filters)) { liveSessions[projectKey].push(item.handshake.query.sessionInfo); } } else { @@ -162,11 +153,12 @@ const socketsLive = async function (req, res) { respond(res, liveSessions); } wsRouter.get(`/sockets-live`, socketsLive); +wsRouter.post(`/sockets-live`, socketsLive); const socketsLiveByProject = async function (req, res) { debug && console.log("[WS]looking for available LIVE sessions"); let _projectKey = extractProjectKeyFromRequest(req); - let userId = extractUserIdFromRequest(req); + let filters = await extractFiltersFromRequest(req, res); let liveSessions = {}; let rooms = await getAvailableRooms(); for (let peerId of rooms) { @@ -176,8 +168,8 @@ const socketsLiveByProject = async function (req, res) { for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session) { liveSessions[projectKey] = liveSessions[projectKey] || []; - if (userId) { - if (item.handshake.query.sessionInfo && item.handshake.query.sessionInfo.userID === userId) { + if (hasFilters(filters)) { + if (item.handshake.query.sessionInfo && isValidSession(item.handshake.query.sessionInfo, filters)) { liveSessions[projectKey].push(item.handshake.query.sessionInfo); } } else { @@ -190,6 +182,7 @@ const socketsLiveByProject = async function (req, res) { respond(res, liveSessions[_projectKey] || []); } wsRouter.get(`/sockets-live/:projectKey`, socketsLiveByProject); +wsRouter.post(`/sockets-live/:projectKey`, socketsLiveByProject); const findSessionSocketId = async (io, peerId) => { const connected_sockets = await io.in(peerId).fetchSockets(); diff --git a/ee/utilities/utils/helper-ee.js b/ee/utilities/utils/helper-ee.js new file mode 100644 index 000000000..522158a01 --- /dev/null +++ b/ee/utilities/utils/helper-ee.js @@ -0,0 +1,53 @@ +const {objectToObjectOfArrays} = require('./helper'); +const getBodyFromUWSResponse = async function (res) { + return new Promise(((resolve, reject) => { + let buffer; + res.onData((ab, isLast) => { + let chunk = Buffer.from(ab); + if (buffer) { + buffer = Buffer.concat([buffer, chunk]); + } else { + buffer = Buffer.concat([chunk]); + } + if (isLast) { + let json; + try { + json = JSON.parse(buffer); + } catch (e) { + console.error(e); + /* res.close calls onAborted */ + try { + res.close(); + } catch (e2) { + console.error(e2); + } + json = {}; + } + resolve(json); + } + }); + })); +} +const extractFiltersFromRequest = async function (req, res) { + let filters = {}; + if (process.env.uws === "true") { + if (req.getQuery("userId")) { + debug && console.log(`[WS]where userId=${req.getQuery("userId")}`); + filters.userID = [req.getQuery("userId")]; + } + + let body = await getBodyFromUWSResponse(res); + filters = {...filters, ...body}; + } else { + if (req.query.userId) { + debug && console.log(`[WS]where userId=${req.query.userId}`); + filters.userID = [req.query.userId]; + } + filters = {...filters, ...req.body}; + } + filters = objectToObjectOfArrays({...filters, ...req.body}); + return Object.keys(filters).length > 0 ? filters : undefined; +} +module.exports = { + extractFiltersFromRequest +}; \ No newline at end of file