From 30f4cdeff7e3762b4d321f64fc6d26dc1d8afff7 Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Thu, 18 Aug 2022 15:19:55 +0100 Subject: [PATCH] feat(assist): disconnect socket after X min --- ee/utilities/servers/websocket-cluster.js | 13 +++++++--- ee/utilities/servers/websocket.js | 20 +++++++++------ .../openreplay/charts/assist/values.yaml | 1 + utilities/servers/websocket.js | 16 +++++++----- utilities/utils/assistHelper.js | 25 ++++++++++++++++++- 5 files changed, 56 insertions(+), 19 deletions(-) diff --git a/ee/utilities/servers/websocket-cluster.js b/ee/utilities/servers/websocket-cluster.js index cafaa4eaf..dfe2b5848 100644 --- a/ee/utilities/servers/websocket-cluster.js +++ b/ee/utilities/servers/websocket-cluster.js @@ -11,7 +11,8 @@ const { const { IDENTITIES, EVENTS_DEFINITION, - extractSessionInfo + extractSessionInfo, + socketConnexionTimeout } = require('../utils/assistHelper'); const { extractProjectKeyFromRequest, @@ -282,6 +283,7 @@ module.exports = { createSocketIOServer(server, prefix); io.on('connection', async (socket) => { debug && console.log(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`); + socket._connectedAt = new Date(); socket.peerId = socket.handshake.query.peerId; socket.identity = socket.handshake.query.identity; let {c_sessions, c_agents} = await sessions_agents_count(io, socket); @@ -368,7 +370,7 @@ module.exports = { }); }); - console.log("WS server started") + console.log("WS server started"); setInterval(async (io) => { try { let rooms = await io.of('/').adapter.allRooms(); @@ -386,13 +388,16 @@ module.exports = { if (debug) { for (let item of validRooms) { let connectedSockets = await io.in(item).fetchSockets(); - console.log(`Room: ${item} connected: ${connectedSockets.length}`) + console.log(`Room: ${item} connected: ${connectedSockets.length}`); } } } catch (e) { console.error(e); } - }, 20000, io); + }, 30000, io); + + socketConnexionTimeout(io); + Promise.all([pubClient.connect(), subClient.connect()]) .then(() => { io.adapter(createAdapter(pubClient, subClient)); diff --git a/ee/utilities/servers/websocket.js b/ee/utilities/servers/websocket.js index 109408039..b9b817f06 100644 --- a/ee/utilities/servers/websocket.js +++ b/ee/utilities/servers/websocket.js @@ -11,7 +11,8 @@ const { const { IDENTITIES, EVENTS_DEFINITION, - extractSessionInfo + extractSessionInfo, + socketConnexionTimeout } = require('../utils/assistHelper'); const { extractProjectKeyFromRequest, @@ -260,6 +261,7 @@ module.exports = { createSocketIOServer(server, prefix); io.on('connection', async (socket) => { debug && console.log(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`); + socket._connectedAt = new Date(); socket.peerId = socket.handshake.query.peerId; socket.identity = socket.handshake.query.identity; let {c_sessions, c_agents} = await sessions_agents_count(io, socket); @@ -304,7 +306,7 @@ module.exports = { } if (c_sessions === 0) { debug && console.log(`notifying everyone in ${socket.peerId} about no SESSIONS`); - socket.to(socket.peerId).emit(NO_SESSIONS); + socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); } if (c_agents === 0) { debug && console.log(`notifying everyone in ${socket.peerId} about no AGENTS`); @@ -335,7 +337,7 @@ module.exports = { let socketId = await findSessionSocketId(io, socket.peerId); if (socketId === null) { debug && console.log(`session not found for:${socket.peerId}`); - io.to(socket.id).emit(NO_SESSIONS); + io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); } else { debug && console.log("message sent"); io.to(socketId).emit(eventName, socket.id, args[0]); @@ -344,13 +346,13 @@ module.exports = { }); }); - console.log("WS server started") + console.log("WS server started"); setInterval(async (io) => { try { let count = 0; console.log(` ====== Rooms: ${io.sockets.adapter.rooms.size} ====== `); - const arr = Array.from(io.sockets.adapter.rooms) - const filtered = arr.filter(room => !room[1].has(room[0])) + const arr = Array.from(io.sockets.adapter.rooms); + const filtered = arr.filter(room => !room[1].has(room[0])); for (let i of filtered) { let {projectKey, sessionId} = extractPeerId(i[0]); if (projectKey !== null && sessionId !== null) { @@ -360,13 +362,15 @@ module.exports = { console.log(` ====== Valid Rooms: ${count} ====== `); if (debug) { for (let item of filtered) { - console.log(`Room: ${item[0]} connected: ${item[1].size}`) + console.log(`Room: ${item[0]} connected: ${item[1].size}`); } } } catch (e) { console.error(e); } - }, 20000, io); + }, 30000, io); + + socketConnexionTimeout(io); }, handlers: { socketsList, diff --git a/scripts/helmcharts/openreplay/charts/assist/values.yaml b/scripts/helmcharts/openreplay/charts/assist/values.yaml index 4ffaf88e1..65e341f98 100644 --- a/scripts/helmcharts/openreplay/charts/assist/values.yaml +++ b/scripts/helmcharts/openreplay/charts/assist/values.yaml @@ -90,6 +90,7 @@ env: debug: 0 uws: false redis: false + CLEAR_SOCKET_TIME: 0 nodeSelector: {} diff --git a/utilities/servers/websocket.js b/utilities/servers/websocket.js index d7eaab609..037cb74bc 100644 --- a/utilities/servers/websocket.js +++ b/utilities/servers/websocket.js @@ -14,7 +14,8 @@ const { const { IDENTITIES, EVENTS_DEFINITION, - extractSessionInfo + extractSessionInfo, + socketConnexionTimeout } = require('../utils/assistHelper'); const wsRouter = express.Router(); @@ -241,6 +242,7 @@ module.exports = { createSocketIOServer(server, prefix); io.on('connection', async (socket) => { debug && console.log(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`); + socket._connectedAt = new Date(); socket.peerId = socket.handshake.query.peerId; socket.identity = socket.handshake.query.identity; let {c_sessions, c_agents} = await sessions_agents_count(io, socket); @@ -325,13 +327,13 @@ module.exports = { }); }); - console.log("WS server started") + console.log("WS server started"); setInterval(async (io) => { try { let count = 0; console.log(` ====== Rooms: ${io.sockets.adapter.rooms.size} ====== `); - const arr = Array.from(io.sockets.adapter.rooms) - const filtered = arr.filter(room => !room[1].has(room[0])) + const arr = Array.from(io.sockets.adapter.rooms); + const filtered = arr.filter(room => !room[1].has(room[0])); for (let i of filtered) { let {projectKey, sessionId} = extractPeerId(i[0]); if (projectKey !== null && sessionId !== null) { @@ -341,13 +343,15 @@ module.exports = { console.log(` ====== Valid Rooms: ${count} ====== `); if (debug) { for (let item of filtered) { - console.log(`Room: ${item[0]} connected: ${item[1].size}`) + console.log(`Room: ${item[0]} connected: ${item[1].size}`); } } } catch (e) { console.error(e); } - }, 20000, io); + }, 30000, io); + + socketConnexionTimeout(io); }, handlers: { socketsList, diff --git a/utilities/utils/assistHelper.js b/utilities/utils/assistHelper.js index 550ee8ae1..db7a45c0c 100644 --- a/utilities/utils/assistHelper.js +++ b/utilities/utils/assistHelper.js @@ -69,7 +69,30 @@ const extractSessionInfo = function (socket) { } } +function socketConnexionTimeout(io) { + if (process.env.CLEAR_SOCKET_TIME !== undefined && parseFloat(process.env.CLEAR_SOCKET_TIME) > 0) { + const CLEAR_SOCKET_TIME = parseFloat(process.env.CLEAR_SOCKET_TIME); + console.log(`WS manually disconnecting sockets after ${CLEAR_SOCKET_TIME} min`); + setInterval(async (io) => { + try { + const now = new Date(); + let allSockets = await io.fetchSockets(); + for (let socket of allSockets) { + if (socket._connectedAt !== undefined && ((now - socket._connectedAt) / 1000) / 60 > CLEAR_SOCKET_TIME) { + debug && console.log(`disconnecting ${socket.id} after more than ${CLEAR_SOCKET_TIME} of connexion.`); + socket.disconnect(); + } + } + } catch (e) { + console.error(e); + } + }, 0.5 * 60 * 1000, io); + // }, 2.5 * 60 * 1000, io); + } else { + debug && console.log(`WS no manually disconnecting sockets.`); + } +} module.exports = { - extractSessionInfo, EVENTS_DEFINITION, IDENTITIES + extractSessionInfo, EVENTS_DEFINITION, IDENTITIES, socketConnexionTimeout }; \ No newline at end of file