diff --git a/assist/servers/websocket.js b/assist/servers/websocket.js index 1c9611203..c3c5ae936 100644 --- a/assist/servers/websocket.js +++ b/assist/servers/websocket.js @@ -171,7 +171,6 @@ const socketsLiveByProject = async function (req, res) { } } let sessions = Array.from(liveSessions); - console.log("sessions: ", sessions); respond(res, _sessionId === undefined ? sortPaginate(sessions, filters) : sessions.length > 0 ? sessions[0] : null); } @@ -265,12 +264,13 @@ module.exports = { debug && console.log(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`); socket._connectedAt = new Date(); - socket.peerId = socket.handshake.query.peerId; let {projectKey: connProjectKey, sessionId: connSessionId, tabId:connTabId} = extractPeerId(socket.handshake.query.peerId); + socket.peerId = socket.handshake.query.peerId; socket.roomId = extractRoomId(socket.peerId); - debug && console.log(`connProjectKey:${connProjectKey}, connSessionId:${connSessionId}, connTabId:${connTabId}, roomId:${socket.roomId}`); socket.tabId = connTabId; socket.identity = socket.handshake.query.identity; + debug && console.log(`connProjectKey:${connProjectKey}, connSessionId:${connSessionId}, connTabId:${connTabId}, roomId:${socket.roomId}`); + let {c_sessions, c_agents} = await sessions_agents_count(io, socket); if (socket.identity === IDENTITIES.session) { if (c_sessions > 0) { diff --git a/ee/assist/servers/websocket-cluster.js b/ee/assist/servers/websocket-cluster.js index 03e43b07a..f669acb7a 100644 --- a/ee/assist/servers/websocket-cluster.js +++ b/ee/assist/servers/websocket-cluster.js @@ -31,7 +31,7 @@ const pubClient = createClient({url: REDIS_URL}); const subClient = pubClient.duplicate(); console.log(`Using Redis: ${REDIS_URL}`); let io; -const debug = process.env.debug === "1"; +const debug = true;// = process.env.debug === "1"; const createSocketIOServer = function (server, prefix) { if (process.env.uws !== "true") { @@ -58,6 +58,7 @@ const createSocketIOServer = function (server, prefix) { } } +// TODO: Maybe we should use a Set instead of an array const uniqueSessions = function (data) { let resArr = []; let resArrIDS = []; @@ -85,25 +86,30 @@ const respond = function (res, data) { const socketsList = async function (req, res) { debug && console.log("[WS]looking for all available sessions"); let filters = await extractPayloadFromRequest(req, res); - let liveSessions = {}; + let withFilters = hasFilters(filters); + let liveSessionsPerProject = {}; let rooms = await getAvailableRooms(io); for (let peerId of rooms.keys()) { let {projectKey, sessionId} = extractPeerId(peerId); if (projectKey !== undefined) { - liveSessions[projectKey] = liveSessions[projectKey] || []; - if (hasFilters(filters)) { + liveSessionsPerProject[projectKey] = liveSessionsPerProject[projectKey] || new Set(); + if (withFilters) { const connected_sockets = await io.in(peerId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo && isValidSession(item.handshake.query.sessionInfo, filters.filter)) { - liveSessions[projectKey].push(sessionId); + liveSessionsPerProject[projectKey].add(sessionId); } } } else { - liveSessions[projectKey].push(sessionId); + liveSessionsPerProject[projectKey].add(sessionId); } } } + let liveSessions = {}; + liveSessionsPerProject.forEach((sessions, projectId) => { + liveSessions[projectId] = Array.from(sessions); + }); respond(res, liveSessions); } @@ -112,35 +118,37 @@ const socketsListByProject = async function (req, res) { let _projectKey = extractProjectKeyFromRequest(req); let _sessionId = extractSessionIdFromRequest(req); let filters = await extractPayloadFromRequest(req, res); - let liveSessions = {}; + let withFilters = hasFilters(filters); + let liveSessions = new Set(); let rooms = await getAvailableRooms(io); for (let peerId of rooms.keys()) { let {projectKey, sessionId} = extractPeerId(peerId); if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) { - liveSessions[projectKey] = liveSessions[projectKey] || []; - if (hasFilters(filters)) { + if (withFilters) { const connected_sockets = await io.in(peerId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo && isValidSession(item.handshake.query.sessionInfo, filters.filter)) { - liveSessions[projectKey].push(sessionId); + liveSessions.add(sessionId); } } } else { - liveSessions[projectKey].push(sessionId); + liveSessions.add(sessionId); } } } - liveSessions[_projectKey] = liveSessions[_projectKey] || []; - respond(res, _sessionId === undefined ? sortPaginate(liveSessions[_projectKey], filters) - : liveSessions[_projectKey].length > 0 ? liveSessions[_projectKey][0] + let sessions = Array.from(liveSessions); + respond(res, _sessionId === undefined ? sortPaginate(sessions, filters) + : sessions.length > 0 ? sessions[0] : null); } const socketsLive = async function (req, res) { debug && console.log("[WS]looking for all available LIVE sessions"); let filters = await extractPayloadFromRequest(req, res); - let liveSessions = {}; + let withFilters = hasFilters(filters); + let liveSessionsPerProject = {}; + const sessIDs = new Set(); let rooms = await getAvailableRooms(io); for (let peerId of rooms.keys()) { let {projectKey} = extractPeerId(peerId); @@ -148,19 +156,31 @@ const socketsLive = async function (req, res) { let connected_sockets = await io.in(peerId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session) { - liveSessions[projectKey] = liveSessions[projectKey] || []; - if (hasFilters(filters)) { - if (item.handshake.query.sessionInfo && isValidSession(item.handshake.query.sessionInfo, filters.filter)) { - liveSessions[projectKey].push(item.handshake.query.sessionInfo); + liveSessionsPerProject[projectKey] = liveSessionsPerProject[projectKey] || new Set(); + if (withFilters) { + if (item.handshake.query.sessionInfo && + isValidSession(item.handshake.query.sessionInfo, filters.filter) && + !sessIDs.has(item.handshake.query.sessionInfo.sessionID) + ) { + liveSessionsPerProject[projectKey].add(item.handshake.query.sessionInfo); + sessIDs.add(item.handshake.query.sessionInfo.sessionID); } } else { - liveSessions[projectKey].push(item.handshake.query.sessionInfo); + if (!sessIDs.has(item.handshake.query.sessionInfo.sessionID)) { + liveSessionsPerProject[projectKey].add(item.handshake.query.sessionInfo); + sessIDs.add(item.handshake.query.sessionInfo.sessionID); + } } } } - liveSessions[projectKey] = uniqueSessions(liveSessions[projectKey]); + // Should be already unique + // liveSessionsPerProject[projectKey] = uniqueSessions(liveSessionsPerProject[projectKey]); } } + let liveSessions = {}; + liveSessionsPerProject.forEach((sessions, projectId) => { + liveSessions[projectId] = Array.from(sessions); + }); respond(res, sortPaginate(liveSessions, filters)); } @@ -169,7 +189,9 @@ const socketsLiveByProject = async function (req, res) { let _projectKey = extractProjectKeyFromRequest(req); let _sessionId = extractSessionIdFromRequest(req); let filters = await extractPayloadFromRequest(req, res); - let liveSessions = {}; + let withFilters = hasFilters(filters); + let liveSessions = new Set(); + const sessIDs = new Set(); let rooms = await getAvailableRooms(io); for (let peerId of rooms.keys()) { let {projectKey, sessionId} = extractPeerId(peerId); @@ -177,23 +199,28 @@ const socketsLiveByProject = async function (req, res) { let connected_sockets = await io.in(peerId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session) { - liveSessions[projectKey] = liveSessions[projectKey] || []; - if (hasFilters(filters)) { - if (item.handshake.query.sessionInfo && isValidSession(item.handshake.query.sessionInfo, filters.filter)) { - liveSessions[projectKey].push(item.handshake.query.sessionInfo); + if (withFilters) { + if (item.handshake.query.sessionInfo && + isValidSession(item.handshake.query.sessionInfo, filters.filter) && + !sessIDs.has(item.handshake.query.sessionInfo.sessionID) + ) { + liveSessions.add(item.handshake.query.sessionInfo); + sessIDs.add(item.handshake.query.sessionInfo.sessionID); } } else { - liveSessions[projectKey].push(item.handshake.query.sessionInfo); + if (!sessIDs.has(item.handshake.query.sessionInfo.sessionID)) { + liveSessions.add(item.handshake.query.sessionInfo); + sessIDs.add(item.handshake.query.sessionInfo.sessionID); + } } } } - liveSessions[projectKey] = uniqueSessions(liveSessions[projectKey] || []); + // Should be unique already because of using sessIDs set + // liveSessions[projectKey] = uniqueSessions(liveSessions[projectKey] || []); } } - liveSessions[_projectKey] = liveSessions[_projectKey] || []; - respond(res, _sessionId === undefined ? sortPaginate(liveSessions[_projectKey], filters) - : liveSessions[_projectKey].length > 0 ? liveSessions[_projectKey][0] - : null); + let sessions = Array.from(liveSessions); + respond(res, _sessionId === undefined ? sortPaginate(sessions, filters) : sessions.length > 0 ? sessions[0] : null); } const autocomplete = async function (req, res) { @@ -218,10 +245,10 @@ const autocomplete = async function (req, res) { respond(res, uniqueAutocomplete(results)); } -const findSessionSocketId = async (io, peerId) => { - const connected_sockets = await io.in(peerId).fetchSockets(); +const findSessionSocketId = async (io, roomId, tabId) => { + const connected_sockets = await io.in(roomId).fetchSockets(); for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session) { + if (item.handshake.query.identity === IDENTITIES.session && item.tabId === tabId) { return item.id; } } @@ -285,14 +312,31 @@ module.exports = { socket.on(EVENTS_DEFINITION.listen.ERROR, err => errorHandler(EVENTS_DEFINITION.listen.ERROR, err)); debug && console.log(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`); socket._connectedAt = new Date(); + + let {projectKey: connProjectKey, sessionId: connSessionId, tabId:connTabId} = extractPeerId(socket.handshake.query.peerId); socket.peerId = socket.handshake.query.peerId; + socket.roomId = extractRoomId(socket.peerId); + socket.tabId = connTabId; socket.identity = socket.handshake.query.identity; + debug && console.log(`connProjectKey:${connProjectKey}, connSessionId:${connSessionId}, connTabId:${connTabId}, roomId:${socket.roomId}`); + let {c_sessions, c_agents} = await sessions_agents_count(io, socket); if (socket.identity === IDENTITIES.session) { if (c_sessions > 0) { - debug && console.log(`session already connected, refusing new connexion`); - io.to(socket.id).emit(EVENTS_DEFINITION.emit.SESSION_ALREADY_CONNECTED); - return socket.disconnect(); + const rooms = await getAvailableRooms(io); + for (let roomId of rooms.keys()) { + let {projectKey} = extractPeerId(roomId); + if (projectKey === connProjectKey) { + const connected_sockets = await io.in(roomId).fetchSockets(); + for (let item of connected_sockets) { + if (item.tabId === connTabId) { + debug && console.log(`session already connected, refusing new connexion`); + io.to(socket.id).emit(EVENTS_DEFINITION.emit.SESSION_ALREADY_CONNECTED); + return socket.disconnect(); + } + } + } + } } extractSessionInfo(socket); if (c_agents > 0) { @@ -306,36 +350,36 @@ module.exports = { debug && console.log(`notifying new agent about no SESSIONS with peerId:${socket.peerId}`); io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); } - await socket.join(socket.peerId); + await socket.join(socket.roomId); const rooms = await getAvailableRooms(io); - if (rooms.has(socket.peerId)) { + if (rooms.has(socket.roomId)) { let connectedSockets = await io.in(socket.peerId).fetchSockets(); - debug && console.log(`${socket.id} joined room:${socket.peerId}, as:${socket.identity}, members:${connectedSockets.length}`); + debug && console.log(`${socket.id} joined room:${socket.roomId}, as:${socket.identity}, members:${connectedSockets.length}`); } if (socket.identity === IDENTITIES.agent) { if (socket.handshake.query.agentInfo !== undefined) { socket.handshake.query.agentInfo = JSON.parse(socket.handshake.query.agentInfo); } - socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo); + socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo); } socket.on('disconnect', async () => { - debug && console.log(`${socket.id} disconnected from ${socket.peerId}`); + debug && console.log(`${socket.id} disconnected from ${socket.roomId}`); if (socket.identity === IDENTITIES.agent) { - socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id); + socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id); } debug && console.log("checking for number of connected agents and sessions"); let {c_sessions, c_agents} = await sessions_agents_count(io, socket); if (c_sessions === -1 && c_agents === -1) { - debug && console.log(`room not found: ${socket.peerId}`); + debug && console.log(`room not found: ${socket.roomId}`); } if (c_sessions === 0) { - debug && console.log(`notifying everyone in ${socket.peerId} about no SESSIONS`); - socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); + debug && console.log(`notifying everyone in ${socket.roomId} about no SESSIONS`); + socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); } if (c_agents === 0) { - debug && console.log(`notifying everyone in ${socket.peerId} about no AGENTS`); - socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NO_AGENTS); + debug && console.log(`notifying everyone in ${socket.roomId} about no AGENTS`); + socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_AGENTS); } }); @@ -345,8 +389,21 @@ module.exports = { debug && console.log('Ignoring update event.'); return } - socket.handshake.query.sessionInfo = {...socket.handshake.query.sessionInfo, ...args[0]}; - socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.UPDATE_EVENT, args[0]); + Object.assign(socket.handshake.query.sessionInfo, args[0].data, {tabId: args[0].meta.tabId}); + socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.UPDATE_EVENT, args[0]); + // Update sessionInfo for all sessions (TODO: rewrite this) + const rooms = await getAvailableRooms(io); + for (let roomId of rooms.keys()) { + let {projectKey} = extractPeerId(roomId); + if (projectKey === connProjectKey) { + const connected_sockets = await io.in(roomId).fetchSockets(); + for (let item of connected_sockets) { + if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo) { + Object.assign(item.handshake.query.sessionInfo, args[0].data, {tabId: args[0].meta.tabId}); + } + } + } + } }); socket.on(EVENTS_DEFINITION.listen.CONNECT_ERROR, err => errorHandler(EVENTS_DEFINITION.listen.CONNECT_ERROR, err)); @@ -358,13 +415,18 @@ module.exports = { return } if (socket.identity === IDENTITIES.session) { - debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.peerId}`); - socket.to(socket.peerId).emit(eventName, args[0]); + debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.roomId}`); + // TODO: send to all agents in the room + socket.to(socket.roomId).emit(eventName, args[0]); } else { - debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to session of room:${socket.peerId}`); - let socketId = await findSessionSocketId(io, socket.peerId); + debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to session of room:${socket.roomId}`); + if (args[0].meta === undefined) { + debug && console.log(`received event:${eventName}, from:${socket.identity}, but message structure is wrong, stopping onAny.`); + return + } + let socketId = await findSessionSocketId(io, socket.roomId, args[0].meta.tabId); if (socketId === null) { - debug && console.log(`session not found for:${socket.peerId}`); + debug && console.log(`session not found for:${socket.roomId}`); io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); } else { debug && console.log("message sent"); diff --git a/ee/assist/servers/websocket.js b/ee/assist/servers/websocket.js index c1ff7cbb5..e53264ea1 100644 --- a/ee/assist/servers/websocket.js +++ b/ee/assist/servers/websocket.js @@ -67,25 +67,30 @@ const respond = function (res, data) { const socketsList = async function (req, res) { debug && console.log("[WS]looking for all available sessions"); let filters = await extractPayloadFromRequest(req, res); - let liveSessions = {}; + let withFilters = hasFilters(filters); + let liveSessionsPerProject = {}; let rooms = await getAvailableRooms(io); for (let peerId of rooms.keys()) { let {projectKey, sessionId} = extractPeerId(peerId); if (projectKey !== undefined) { - liveSessions[projectKey] = liveSessions[projectKey] || []; - if (hasFilters(filters)) { + liveSessionsPerProject[projectKey] = liveSessionsPerProject[projectKey] || new Set(); + if (withFilters) { const connected_sockets = await io.in(peerId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo && isValidSession(item.handshake.query.sessionInfo, filters.filter)) { - liveSessions[projectKey].push(sessionId); + liveSessionsPerProject[projectKey].add(sessionId); } } } else { - liveSessions[projectKey].push(sessionId); + liveSessionsPerProject[projectKey].add(sessionId); } } } + let liveSessions = {}; + liveSessionsPerProject.forEach((sessions, projectId) => { + liveSessions[projectId] = Array.from(sessions); + }); respond(res, liveSessions); } @@ -94,35 +99,36 @@ const socketsListByProject = async function (req, res) { let _projectKey = extractProjectKeyFromRequest(req); let _sessionId = extractSessionIdFromRequest(req); let filters = await extractPayloadFromRequest(req, res); - let liveSessions = {}; + let withFilters = hasFilters(filters); + let liveSessions = new Set(); let rooms = await getAvailableRooms(io); for (let peerId of rooms.keys()) { let {projectKey, sessionId} = extractPeerId(peerId); if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) { - liveSessions[projectKey] = liveSessions[projectKey] || []; - if (hasFilters(filters)) { + if (withFilters) { const connected_sockets = await io.in(peerId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo && isValidSession(item.handshake.query.sessionInfo, filters.filter)) { - liveSessions[projectKey].push(sessionId); + liveSessions.add(sessionId); } } } else { - liveSessions[projectKey].push(sessionId); + liveSessions.add(sessionId); } } } - liveSessions[_projectKey] = liveSessions[_projectKey] || []; - respond(res, _sessionId === undefined ? sortPaginate(liveSessions[_projectKey], filters) - : liveSessions[_projectKey].length > 0 ? liveSessions[_projectKey][0] + let sessions = Array.from(liveSessions); + respond(res, _sessionId === undefined ? sortPaginate(sessions, filters) + : sessions.length > 0 ? sessions[0] : null); } const socketsLive = async function (req, res) { debug && console.log("[WS]looking for all available LIVE sessions"); let filters = await extractPayloadFromRequest(req, res); - let liveSessions = {}; + let withFilters = hasFilters(filters); + let liveSessionsPerProject = {}; let rooms = await getAvailableRooms(io); for (let peerId of rooms.keys()) { let {projectKey} = extractPeerId(peerId); @@ -130,18 +136,22 @@ const socketsLive = async function (req, res) { let connected_sockets = await io.in(peerId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session) { - liveSessions[projectKey] = liveSessions[projectKey] || []; + liveSessionsPerProject[projectKey] = liveSessionsPerProject[projectKey] || new Set(); if (hasFilters(filters)) { if (item.handshake.query.sessionInfo && isValidSession(item.handshake.query.sessionInfo, filters.filter)) { - liveSessions[projectKey].push(item.handshake.query.sessionInfo); + liveSessionsPerProject[projectKey].add(item.handshake.query.sessionInfo); } } else { - liveSessions[projectKey].push(item.handshake.query.sessionInfo); + liveSessionsPerProject[projectKey].add(item.handshake.query.sessionInfo); } } } } } + let liveSessions = {}; + liveSessionsPerProject.forEach((sessions, projectId) => { + liveSessions[projectId] = Array.from(sessions); + }); respond(res, sortPaginate(liveSessions, filters)); } @@ -150,7 +160,9 @@ const socketsLiveByProject = async function (req, res) { let _projectKey = extractProjectKeyFromRequest(req); let _sessionId = extractSessionIdFromRequest(req); let filters = await extractPayloadFromRequest(req, res); - let liveSessions = {}; + let withFilters = hasFilters(filters); + let liveSessions = new Set(); + const sessIDs = new Set(); let rooms = await getAvailableRooms(io); for (let peerId of rooms.keys()) { let {projectKey, sessionId} = extractPeerId(peerId); @@ -158,22 +170,26 @@ const socketsLiveByProject = async function (req, res) { let connected_sockets = await io.in(peerId).fetchSockets(); for (let item of connected_sockets) { if (item.handshake.query.identity === IDENTITIES.session) { - liveSessions[projectKey] = liveSessions[projectKey] || []; - if (hasFilters(filters)) { - if (item.handshake.query.sessionInfo && isValidSession(item.handshake.query.sessionInfo, filters.filter)) { - liveSessions[projectKey].push(item.handshake.query.sessionInfo); + if (withFilters) { + if (item.handshake.query.sessionInfo && + isValidSession(item.handshake.query.sessionInfo, filters.filter) && + !sessIDs.has(item.handshake.query.sessionInfo.sessionID) + ) { + liveSessions.add(item.handshake.query.sessionInfo); + sessIDs.add(item.handshake.query.sessionInfo.sessionID); } } else { - liveSessions[projectKey].push(item.handshake.query.sessionInfo); + if (!sessIDs.has(item.handshake.query.sessionInfo.sessionID)) { + liveSessions.add(item.handshake.query.sessionInfo); + sessIDs.add(item.handshake.query.sessionInfo.sessionID); + } } } } } } - liveSessions[_projectKey] = liveSessions[_projectKey] || []; - respond(res, _sessionId === undefined ? sortPaginate(liveSessions[_projectKey], filters) - : liveSessions[_projectKey].length > 0 ? liveSessions[_projectKey][0] - : null); + let sessions = Array.from(liveSessions); + respond(res, _sessionId === undefined ? sortPaginate(sessions, filters) : sessions.length > 0 ? sessions[0] : null); } const autocomplete = async function (req, res) { @@ -198,10 +214,10 @@ const autocomplete = async function (req, res) { respond(res, uniqueAutocomplete(results)); } -const findSessionSocketId = async (io, peerId) => { - const connected_sockets = await io.in(peerId).fetchSockets(); +const findSessionSocketId = async (io, roomId, tabId) => { + const connected_sockets = await io.in(roomId).fetchSockets(); for (let item of connected_sockets) { - if (item.handshake.query.identity === IDENTITIES.session) { + if (item.handshake.query.identity === IDENTITIES.session && item.tabId === tabId) { return item.id; } } @@ -265,56 +281,73 @@ module.exports = { socket.on(EVENTS_DEFINITION.listen.ERROR, err => errorHandler(EVENTS_DEFINITION.listen.ERROR, err)); debug && console.log(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`); socket._connectedAt = new Date(); + + let {projectKey: connProjectKey, sessionId: connSessionId, tabId:connTabId} = extractPeerId(socket.handshake.query.peerId); socket.peerId = socket.handshake.query.peerId; + socket.roomId = extractRoomId(socket.peerId); + socket.tabId = connTabId; socket.identity = socket.handshake.query.identity; + debug && console.log(`connProjectKey:${connProjectKey}, connSessionId:${connSessionId}, connTabId:${connTabId}, roomId:${socket.roomId}`); + let {c_sessions, c_agents} = await sessions_agents_count(io, socket); if (socket.identity === IDENTITIES.session) { if (c_sessions > 0) { - debug && console.log(`session already connected, refusing new connexion`); - io.to(socket.id).emit(EVENTS_DEFINITION.emit.SESSION_ALREADY_CONNECTED); - return socket.disconnect(); + const rooms = await getAvailableRooms(io); + for (let roomId of rooms.keys()) { + let {projectKey} = extractPeerId(roomId); + if (projectKey === connProjectKey) { + const connected_sockets = await io.in(roomId).fetchSockets(); + for (let item of connected_sockets) { + if (item.tabId === connTabId) { + debug && console.log(`session already connected, refusing new connexion`); + io.to(socket.id).emit(EVENTS_DEFINITION.emit.SESSION_ALREADY_CONNECTED); + return socket.disconnect(); + } + } + } + } } extractSessionInfo(socket); if (c_agents > 0) { debug && console.log(`notifying new session about agent-existence`); let agents_ids = await get_all_agents_ids(io, socket); io.to(socket.id).emit(EVENTS_DEFINITION.emit.AGENTS_CONNECTED, agents_ids); - socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id); + socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id); } } else if (c_sessions <= 0) { debug && console.log(`notifying new agent about no SESSIONS with peerId:${socket.peerId}`); io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); } - await socket.join(socket.peerId); + await socket.join(socket.roomId); const rooms = await getAvailableRooms(io); - if (rooms.get(socket.peerId)) { - debug && console.log(`${socket.id} joined room:${socket.peerId}, as:${socket.identity}, members:${rooms.get(socket.peerId).size}`); + if (rooms.get(socket.roomId)) { + debug && console.log(`${socket.id} joined room:${socket.roomId}, as:${socket.identity}, members:${rooms.get(socket.roomId).size}`); } if (socket.identity === IDENTITIES.agent) { if (socket.handshake.query.agentInfo !== undefined) { socket.handshake.query.agentInfo = JSON.parse(socket.handshake.query.agentInfo); } - socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo); + socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo); } socket.on('disconnect', async () => { - debug && console.log(`${socket.id} disconnected from ${socket.peerId}`); + debug && console.log(`${socket.id} disconnected from ${socket.roomId}`); if (socket.identity === IDENTITIES.agent) { - socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id); + socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id); } debug && console.log("checking for number of connected agents and sessions"); let {c_sessions, c_agents} = await sessions_agents_count(io, socket); if (c_sessions === -1 && c_agents === -1) { - debug && console.log(`room not found: ${socket.peerId}`); + debug && console.log(`room not found: ${socket.roomId}`); } if (c_sessions === 0) { - debug && console.log(`notifying everyone in ${socket.peerId} about no SESSIONS`); - socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); + debug && console.log(`notifying everyone in ${socket.roomId} about no SESSIONS`); + socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); } if (c_agents === 0) { debug && console.log(`notifying everyone in ${socket.peerId} about no AGENTS`); - socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NO_AGENTS); + socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_AGENTS); } }); @@ -324,8 +357,21 @@ module.exports = { debug && console.log('Ignoring update event.'); return } - socket.handshake.query.sessionInfo = {...socket.handshake.query.sessionInfo, ...args[0]}; - socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.UPDATE_EVENT, args[0]); + Object.assign(socket.handshake.query.sessionInfo, args[0].data, {tabId: args[0].meta.tabId}); + socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.UPDATE_EVENT, args[0]); + // Update sessionInfo for all sessions (TODO: rewrite this) + const rooms = await getAvailableRooms(io); + for (let roomId of rooms.keys()) { + let {projectKey} = extractPeerId(roomId); + if (projectKey === connProjectKey) { + const connected_sockets = await io.in(roomId).fetchSockets(); + for (let item of connected_sockets) { + if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo) { + Object.assign(item.handshake.query.sessionInfo, args[0].data, {tabId: args[0].meta.tabId}); + } + } + } + } }); socket.on(EVENTS_DEFINITION.listen.CONNECT_ERROR, err => errorHandler(EVENTS_DEFINITION.listen.CONNECT_ERROR, err)); @@ -338,12 +384,16 @@ module.exports = { } if (socket.identity === IDENTITIES.session) { debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.peerId}`); - socket.to(socket.peerId).emit(eventName, args[0]); + socket.to(socket.roomId).emit(eventName, args[0]); } else { debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to session of room:${socket.peerId}`); - let socketId = await findSessionSocketId(io, socket.peerId); + if (args[0].meta === undefined) { + debug && console.log(`received event:${eventName}, from:${socket.identity}, but message structure is wrong, stopping onAny.`); + return + } + let socketId = await findSessionSocketId(io, socket.roomId, args[0].meta.tabId); if (socketId === null) { - debug && console.log(`session not found for:${socket.peerId}`); + debug && console.log(`session not found for:${socket.roomId}`); io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); } else { debug && console.log("message sent");