diff --git a/ee/utilities/package-lock.json b/ee/utilities/package-lock.json index 6b9dbdf1c..1c14c5f25 100644 --- a/ee/utilities/package-lock.json +++ b/ee/utilities/package-lock.json @@ -10,9 +10,9 @@ "license": "Elastic License 2.0 (ELv2)", "dependencies": { "@maxmind/geoip2-node": "^3.4.0", - "@socket.io/redis-adapter": "^7.1.0", - "express": "^4.17.1", - "redis": "^4.0.3", + "@socket.io/redis-adapter": "^7.2.0", + "express": "^4.18.1", + "redis": "^4.2.0", "socket.io": "^4.5.1", "ua-parser-js": "^1.0.2", "uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.10.0" diff --git a/ee/utilities/package.json b/ee/utilities/package.json index bd35ec6a6..ba3997a90 100644 --- a/ee/utilities/package.json +++ b/ee/utilities/package.json @@ -19,9 +19,9 @@ "homepage": "https://github.com/openreplay/openreplay#readme", "dependencies": { "@maxmind/geoip2-node": "^3.4.0", - "@socket.io/redis-adapter": "^7.1.0", - "express": "^4.17.1", - "redis": "^4.0.3", + "@socket.io/redis-adapter": "^7.2.0", + "express": "^4.18.1", + "redis": "^4.2.0", "socket.io": "^4.5.1", "ua-parser-js": "^1.0.2", "uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.10.0" diff --git a/ee/utilities/servers/websocket-cluster.js b/ee/utilities/servers/websocket-cluster.js index b0649127c..cafaa4eaf 100644 --- a/ee/utilities/servers/websocket-cluster.js +++ b/ee/utilities/servers/websocket-cluster.js @@ -9,6 +9,8 @@ const { uniqueAutocomplete } = require('../utils/helper'); const { + IDENTITIES, + EVENTS_DEFINITION, extractSessionInfo } = require('../utils/assistHelper'); const { @@ -19,15 +21,6 @@ const { const {createAdapter} = require("@socket.io/redis-adapter"); const {createClient} = require("redis"); const wsRouter = express.Router(); -const UPDATE_EVENT = "UPDATE_SESSION"; -const IDENTITIES = {agent: 'agent', session: 'session'}; -const NEW_AGENT = "NEW_AGENT"; -const NO_AGENTS = "NO_AGENT"; -const AGENT_DISCONNECT = "AGENT_DISCONNECTED"; -const AGENTS_CONNECTED = "AGENTS_CONNECTED"; -const NO_SESSIONS = "SESSION_DISCONNECTED"; -const SESSION_ALREADY_CONNECTED = "SESSION_ALREADY_CONNECTED"; -const SESSION_RECONNECTED = "SESSION_RECONNECTED"; const REDIS_URL = process.env.REDIS_URL || "redis://localhost:6379"; const pubClient = createClient({url: REDIS_URL}); const subClient = pubClient.duplicate(); @@ -295,20 +288,20 @@ module.exports = { if (socket.identity === IDENTITIES.session) { if (c_sessions > 0) { debug && console.log(`session already connected, refusing new connexion`); - io.to(socket.id).emit(SESSION_ALREADY_CONNECTED); + io.to(socket.id).emit(EVENTS_DEFINITION.emit.SESSION_ALREADY_CONNECTED); return socket.disconnect(); } extractSessionInfo(socket); if (c_agents > 0) { debug && console.log(`notifying new session about agent-existence`); let agents_ids = await get_all_agents_ids(io, socket); - io.to(socket.id).emit(AGENTS_CONNECTED, agents_ids); - socket.to(socket.peerId).emit(SESSION_RECONNECTED, socket.id); + io.to(socket.id).emit(EVENTS_DEFINITION.emit.AGENTS_CONNECTED, agents_ids); + socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id); } } else if (c_sessions <= 0) { debug && console.log(`notifying new agent about no SESSIONS`); - io.to(socket.id).emit(NO_SESSIONS); + io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); } await io.of('/').adapter.remoteJoin(socket.id, socket.peerId); let rooms = await io.of('/').adapter.allRooms(); @@ -320,13 +313,13 @@ module.exports = { if (socket.handshake.query.agentInfo !== undefined) { socket.handshake.query.agentInfo = JSON.parse(socket.handshake.query.agentInfo); } - socket.to(socket.peerId).emit(NEW_AGENT, socket.id, socket.handshake.query.agentInfo); + socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo); } socket.on('disconnect', async () => { debug && console.log(`${socket.id} disconnected from ${socket.peerId}`); if (socket.identity === IDENTITIES.agent) { - socket.to(socket.peerId).emit(AGENT_DISCONNECT, socket.id); + socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id); } debug && console.log("checking for number of connected agents and sessions"); let {c_sessions, c_agents} = await sessions_agents_count(io, socket); @@ -335,25 +328,29 @@ module.exports = { } if (c_sessions === 0) { debug && console.log(`notifying everyone in ${socket.peerId} about no SESSIONS`); - socket.to(socket.peerId).emit(NO_SESSIONS); + socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); } if (c_agents === 0) { debug && console.log(`notifying everyone in ${socket.peerId} about no AGENTS`); - socket.to(socket.peerId).emit(NO_AGENTS); + socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NO_AGENTS); } }); - socket.on(UPDATE_EVENT, async (...args) => { + socket.on(EVENTS_DEFINITION.listen.UPDATE_EVENT, async (...args) => { debug && console.log(`${socket.id} sent update event.`); if (socket.identity !== IDENTITIES.session) { debug && console.log('Ignoring update event.'); return } socket.handshake.query.sessionInfo = {...socket.handshake.query.sessionInfo, ...args[0]}; - socket.to(socket.peerId).emit(UPDATE_EVENT, args[0]); + socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.UPDATE_EVENT, args[0]); }); socket.onAny(async (eventName, ...args) => { + if (Object.values(EVENTS_DEFINITION.listen).indexOf(eventName) >= 0) { + debug && console.log(`received event:${eventName}, should be handled by another listener, stopping onAny.`); + return + } if (socket.identity === IDENTITIES.session) { debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.peerId}`); socket.to(socket.peerId).emit(eventName, args[0]); @@ -362,7 +359,7 @@ module.exports = { let socketId = await findSessionSocketId(io, socket.peerId); if (socketId === null) { debug && console.log(`session not found for:${socket.peerId}`); - io.to(socket.id).emit(NO_SESSIONS); + io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); } else { debug && console.log("message sent"); io.to(socketId).emit(eventName, socket.id, args[0]); diff --git a/ee/utilities/servers/websocket.js b/ee/utilities/servers/websocket.js index 4fa61aa42..109408039 100644 --- a/ee/utilities/servers/websocket.js +++ b/ee/utilities/servers/websocket.js @@ -9,6 +9,8 @@ const { uniqueAutocomplete } = require('../utils/helper'); const { + IDENTITIES, + EVENTS_DEFINITION, extractSessionInfo } = require('../utils/assistHelper'); const { @@ -17,15 +19,6 @@ const { extractPayloadFromRequest, } = require('../utils/helper-ee'); const wsRouter = express.Router(); -const UPDATE_EVENT = "UPDATE_SESSION"; -const IDENTITIES = {agent: 'agent', session: 'session'}; -const NEW_AGENT = "NEW_AGENT"; -const NO_AGENTS = "NO_AGENT"; -const AGENT_DISCONNECT = "AGENT_DISCONNECTED"; -const AGENTS_CONNECTED = "AGENTS_CONNECTED"; -const NO_SESSIONS = "SESSION_DISCONNECTED"; -const SESSION_ALREADY_CONNECTED = "SESSION_ALREADY_CONNECTED"; -const SESSION_RECONNECTED = "SESSION_RECONNECTED"; let io; const debug = process.env.debug === "1" || false; @@ -273,20 +266,20 @@ module.exports = { if (socket.identity === IDENTITIES.session) { if (c_sessions > 0) { debug && console.log(`session already connected, refusing new connexion`); - io.to(socket.id).emit(SESSION_ALREADY_CONNECTED); + io.to(socket.id).emit(EVENTS_DEFINITION.emit.SESSION_ALREADY_CONNECTED); return socket.disconnect(); } extractSessionInfo(socket); if (c_agents > 0) { debug && console.log(`notifying new session about agent-existence`); let agents_ids = await get_all_agents_ids(io, socket); - io.to(socket.id).emit(AGENTS_CONNECTED, agents_ids); - socket.to(socket.peerId).emit(SESSION_RECONNECTED, socket.id); + io.to(socket.id).emit(EVENTS_DEFINITION.emit.AGENTS_CONNECTED, agents_ids); + socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id); } } else if (c_sessions <= 0) { debug && console.log(`notifying new agent about no SESSIONS`); - io.to(socket.id).emit(NO_SESSIONS); + io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); } socket.join(socket.peerId); if (io.sockets.adapter.rooms.get(socket.peerId)) { @@ -296,13 +289,13 @@ module.exports = { if (socket.handshake.query.agentInfo !== undefined) { socket.handshake.query.agentInfo = JSON.parse(socket.handshake.query.agentInfo); } - socket.to(socket.peerId).emit(NEW_AGENT, socket.id, socket.handshake.query.agentInfo); + socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo); } socket.on('disconnect', async () => { debug && console.log(`${socket.id} disconnected from ${socket.peerId}`); if (socket.identity === IDENTITIES.agent) { - socket.to(socket.peerId).emit(AGENT_DISCONNECT, socket.id); + socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id); } debug && console.log("checking for number of connected agents and sessions"); let {c_sessions, c_agents} = await sessions_agents_count(io, socket); @@ -315,21 +308,25 @@ module.exports = { } if (c_agents === 0) { debug && console.log(`notifying everyone in ${socket.peerId} about no AGENTS`); - socket.to(socket.peerId).emit(NO_AGENTS); + socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NO_AGENTS); } }); - socket.on(UPDATE_EVENT, async (...args) => { + socket.on(EVENTS_DEFINITION.listen.UPDATE_EVENT, async (...args) => { debug && console.log(`${socket.id} sent update event.`); if (socket.identity !== IDENTITIES.session) { debug && console.log('Ignoring update event.'); return } socket.handshake.query.sessionInfo = {...socket.handshake.query.sessionInfo, ...args[0]}; - socket.to(socket.peerId).emit(UPDATE_EVENT, args[0]); + socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.UPDATE_EVENT, args[0]); }); socket.onAny(async (eventName, ...args) => { + if (Object.values(EVENTS_DEFINITION.listen).indexOf(eventName) >= 0) { + debug && console.log(`received event:${eventName}, should be handled by another listener, stopping onAny.`); + return + } if (socket.identity === IDENTITIES.session) { debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.peerId}`); socket.to(socket.peerId).emit(eventName, args[0]); diff --git a/utilities/servers/websocket.js b/utilities/servers/websocket.js index 5636eafcc..d7eaab609 100644 --- a/utilities/servers/websocket.js +++ b/utilities/servers/websocket.js @@ -12,18 +12,11 @@ const { uniqueAutocomplete } = require('../utils/helper'); const { + IDENTITIES, + EVENTS_DEFINITION, extractSessionInfo } = require('../utils/assistHelper'); const wsRouter = express.Router(); -const UPDATE_EVENT = "UPDATE_SESSION"; -const IDENTITIES = {agent: 'agent', session: 'session'}; -const NEW_AGENT = "NEW_AGENT"; -const NO_AGENTS = "NO_AGENT"; -const AGENT_DISCONNECT = "AGENT_DISCONNECTED"; -const AGENTS_CONNECTED = "AGENTS_CONNECTED"; -const NO_SESSIONS = "SESSION_DISCONNECTED"; -const SESSION_ALREADY_CONNECTED = "SESSION_ALREADY_CONNECTED"; -const SESSION_RECONNECTED = "SESSION_RECONNECTED"; let io; const debug = process.env.debug === "1" || false; @@ -254,20 +247,20 @@ module.exports = { if (socket.identity === IDENTITIES.session) { if (c_sessions > 0) { debug && console.log(`session already connected, refusing new connexion`); - io.to(socket.id).emit(SESSION_ALREADY_CONNECTED); + io.to(socket.id).emit(EVENTS_DEFINITION.emit.SESSION_ALREADY_CONNECTED); return socket.disconnect(); } extractSessionInfo(socket); if (c_agents > 0) { debug && console.log(`notifying new session about agent-existence`); let agents_ids = await get_all_agents_ids(io, socket); - io.to(socket.id).emit(AGENTS_CONNECTED, agents_ids); - socket.to(socket.peerId).emit(SESSION_RECONNECTED, socket.id); + io.to(socket.id).emit(EVENTS_DEFINITION.emit.AGENTS_CONNECTED, agents_ids); + socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id); } } else if (c_sessions <= 0) { debug && console.log(`notifying new agent about no SESSIONS`); - io.to(socket.id).emit(NO_SESSIONS); + io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); } socket.join(socket.peerId); if (io.sockets.adapter.rooms.get(socket.peerId)) { @@ -277,13 +270,13 @@ module.exports = { if (socket.handshake.query.agentInfo !== undefined) { socket.handshake.query.agentInfo = JSON.parse(socket.handshake.query.agentInfo); } - socket.to(socket.peerId).emit(NEW_AGENT, socket.id, socket.handshake.query.agentInfo); + socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo); } socket.on('disconnect', async () => { debug && console.log(`${socket.id} disconnected from ${socket.peerId}`); if (socket.identity === IDENTITIES.agent) { - socket.to(socket.peerId).emit(AGENT_DISCONNECT, socket.id); + socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id); } debug && console.log("checking for number of connected agents and sessions"); let {c_sessions, c_agents} = await sessions_agents_count(io, socket); @@ -292,25 +285,29 @@ module.exports = { } if (c_sessions === 0) { debug && console.log(`notifying everyone in ${socket.peerId} about no SESSIONS`); - socket.to(socket.peerId).emit(NO_SESSIONS); + socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); } if (c_agents === 0) { debug && console.log(`notifying everyone in ${socket.peerId} about no AGENTS`); - socket.to(socket.peerId).emit(NO_AGENTS); + socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NO_AGENTS); } }); - socket.on(UPDATE_EVENT, async (...args) => { + socket.on(EVENTS_DEFINITION.listen.UPDATE_EVENT, async (...args) => { debug && console.log(`${socket.id} sent update event.`); if (socket.identity !== IDENTITIES.session) { debug && console.log('Ignoring update event.'); return } socket.handshake.query.sessionInfo = {...socket.handshake.query.sessionInfo, ...args[0]}; - socket.to(socket.peerId).emit(UPDATE_EVENT, args[0]); + socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.UPDATE_EVENT, args[0]); }); socket.onAny(async (eventName, ...args) => { + if (Object.values(EVENTS_DEFINITION.listen).indexOf(eventName) >= 0) { + debug && console.log(`received event:${eventName}, should be handled by another listener, stopping onAny.`); + return + } if (socket.identity === IDENTITIES.session) { debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.peerId}`); socket.to(socket.peerId).emit(eventName, args[0]); @@ -319,7 +316,7 @@ module.exports = { let socketId = await findSessionSocketId(io, socket.peerId); if (socketId === null) { debug && console.log(`session not found for:${socket.peerId}`); - io.to(socket.id).emit(NO_SESSIONS); + io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); } else { debug && console.log("message sent"); io.to(socketId).emit(eventName, socket.id, args[0]); diff --git a/utilities/utils/assistHelper.js b/utilities/utils/assistHelper.js index 63692f4fc..550ee8ae1 100644 --- a/utilities/utils/assistHelper.js +++ b/utilities/utils/assistHelper.js @@ -2,6 +2,20 @@ const uaParser = require('ua-parser-js'); const {geoip} = require('./geoIP'); let debug = process.env.debug === "1" || false; +const IDENTITIES = {agent: 'agent', session: 'session'}; +const EVENTS_DEFINITION = { + listen: {UPDATE_EVENT: "UPDATE_SESSION"} +}; +EVENTS_DEFINITION.emit = { + NEW_AGENT: "NEW_AGENT", + NO_AGENTS: "NO_AGENT", + AGENT_DISCONNECT: "AGENT_DISCONNECTED", + AGENTS_CONNECTED: "AGENTS_CONNECTED", + NO_SESSIONS: "SESSION_DISCONNECTED", + SESSION_ALREADY_CONNECTED: "SESSION_ALREADY_CONNECTED", + SESSION_RECONNECTED: "SESSION_RECONNECTED", + UPDATE_EVENT: EVENTS_DEFINITION.listen.UPDATE_EVENT +}; const BASE_sessionInfo = { "pageTitle": "Page", @@ -57,5 +71,5 @@ const extractSessionInfo = function (socket) { module.exports = { - extractSessionInfo + extractSessionInfo, EVENTS_DEFINITION, IDENTITIES }; \ No newline at end of file