feat(assist): multitabs support for ee version
This commit is contained in:
parent
08a96085a9
commit
fb828f4178
3 changed files with 220 additions and 108 deletions
|
|
@ -171,7 +171,6 @@ const socketsLiveByProject = async function (req, res) {
|
|||
}
|
||||
}
|
||||
let sessions = Array.from(liveSessions);
|
||||
console.log("sessions: ", sessions);
|
||||
respond(res, _sessionId === undefined ? sortPaginate(sessions, filters) : sessions.length > 0 ? sessions[0] : null);
|
||||
}
|
||||
|
||||
|
|
@ -265,12 +264,13 @@ module.exports = {
|
|||
debug && console.log(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`);
|
||||
socket._connectedAt = new Date();
|
||||
|
||||
socket.peerId = socket.handshake.query.peerId;
|
||||
let {projectKey: connProjectKey, sessionId: connSessionId, tabId:connTabId} = extractPeerId(socket.handshake.query.peerId);
|
||||
socket.peerId = socket.handshake.query.peerId;
|
||||
socket.roomId = extractRoomId(socket.peerId);
|
||||
debug && console.log(`connProjectKey:${connProjectKey}, connSessionId:${connSessionId}, connTabId:${connTabId}, roomId:${socket.roomId}`);
|
||||
socket.tabId = connTabId;
|
||||
socket.identity = socket.handshake.query.identity;
|
||||
debug && console.log(`connProjectKey:${connProjectKey}, connSessionId:${connSessionId}, connTabId:${connTabId}, roomId:${socket.roomId}`);
|
||||
|
||||
let {c_sessions, c_agents} = await sessions_agents_count(io, socket);
|
||||
if (socket.identity === IDENTITIES.session) {
|
||||
if (c_sessions > 0) {
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ const pubClient = createClient({url: REDIS_URL});
|
|||
const subClient = pubClient.duplicate();
|
||||
console.log(`Using Redis: ${REDIS_URL}`);
|
||||
let io;
|
||||
const debug = process.env.debug === "1";
|
||||
const debug = true;// = process.env.debug === "1";
|
||||
|
||||
const createSocketIOServer = function (server, prefix) {
|
||||
if (process.env.uws !== "true") {
|
||||
|
|
@ -58,6 +58,7 @@ const createSocketIOServer = function (server, prefix) {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: Maybe we should use a Set instead of an array
|
||||
const uniqueSessions = function (data) {
|
||||
let resArr = [];
|
||||
let resArrIDS = [];
|
||||
|
|
@ -85,25 +86,30 @@ const respond = function (res, data) {
|
|||
const socketsList = async function (req, res) {
|
||||
debug && console.log("[WS]looking for all available sessions");
|
||||
let filters = await extractPayloadFromRequest(req, res);
|
||||
let liveSessions = {};
|
||||
let withFilters = hasFilters(filters);
|
||||
let liveSessionsPerProject = {};
|
||||
let rooms = await getAvailableRooms(io);
|
||||
for (let peerId of rooms.keys()) {
|
||||
let {projectKey, sessionId} = extractPeerId(peerId);
|
||||
if (projectKey !== undefined) {
|
||||
liveSessions[projectKey] = liveSessions[projectKey] || [];
|
||||
if (hasFilters(filters)) {
|
||||
liveSessionsPerProject[projectKey] = liveSessionsPerProject[projectKey] || new Set();
|
||||
if (withFilters) {
|
||||
const connected_sockets = await io.in(peerId).fetchSockets();
|
||||
for (let item of connected_sockets) {
|
||||
if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo
|
||||
&& isValidSession(item.handshake.query.sessionInfo, filters.filter)) {
|
||||
liveSessions[projectKey].push(sessionId);
|
||||
liveSessionsPerProject[projectKey].add(sessionId);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
liveSessions[projectKey].push(sessionId);
|
||||
liveSessionsPerProject[projectKey].add(sessionId);
|
||||
}
|
||||
}
|
||||
}
|
||||
let liveSessions = {};
|
||||
liveSessionsPerProject.forEach((sessions, projectId) => {
|
||||
liveSessions[projectId] = Array.from(sessions);
|
||||
});
|
||||
respond(res, liveSessions);
|
||||
}
|
||||
|
||||
|
|
@ -112,35 +118,37 @@ const socketsListByProject = async function (req, res) {
|
|||
let _projectKey = extractProjectKeyFromRequest(req);
|
||||
let _sessionId = extractSessionIdFromRequest(req);
|
||||
let filters = await extractPayloadFromRequest(req, res);
|
||||
let liveSessions = {};
|
||||
let withFilters = hasFilters(filters);
|
||||
let liveSessions = new Set();
|
||||
let rooms = await getAvailableRooms(io);
|
||||
for (let peerId of rooms.keys()) {
|
||||
let {projectKey, sessionId} = extractPeerId(peerId);
|
||||
if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) {
|
||||
liveSessions[projectKey] = liveSessions[projectKey] || [];
|
||||
if (hasFilters(filters)) {
|
||||
if (withFilters) {
|
||||
const connected_sockets = await io.in(peerId).fetchSockets();
|
||||
for (let item of connected_sockets) {
|
||||
if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo
|
||||
&& isValidSession(item.handshake.query.sessionInfo, filters.filter)) {
|
||||
liveSessions[projectKey].push(sessionId);
|
||||
liveSessions.add(sessionId);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
liveSessions[projectKey].push(sessionId);
|
||||
liveSessions.add(sessionId);
|
||||
}
|
||||
}
|
||||
}
|
||||
liveSessions[_projectKey] = liveSessions[_projectKey] || [];
|
||||
respond(res, _sessionId === undefined ? sortPaginate(liveSessions[_projectKey], filters)
|
||||
: liveSessions[_projectKey].length > 0 ? liveSessions[_projectKey][0]
|
||||
let sessions = Array.from(liveSessions);
|
||||
respond(res, _sessionId === undefined ? sortPaginate(sessions, filters)
|
||||
: sessions.length > 0 ? sessions[0]
|
||||
: null);
|
||||
}
|
||||
|
||||
const socketsLive = async function (req, res) {
|
||||
debug && console.log("[WS]looking for all available LIVE sessions");
|
||||
let filters = await extractPayloadFromRequest(req, res);
|
||||
let liveSessions = {};
|
||||
let withFilters = hasFilters(filters);
|
||||
let liveSessionsPerProject = {};
|
||||
const sessIDs = new Set();
|
||||
let rooms = await getAvailableRooms(io);
|
||||
for (let peerId of rooms.keys()) {
|
||||
let {projectKey} = extractPeerId(peerId);
|
||||
|
|
@ -148,19 +156,31 @@ const socketsLive = async function (req, res) {
|
|||
let connected_sockets = await io.in(peerId).fetchSockets();
|
||||
for (let item of connected_sockets) {
|
||||
if (item.handshake.query.identity === IDENTITIES.session) {
|
||||
liveSessions[projectKey] = liveSessions[projectKey] || [];
|
||||
if (hasFilters(filters)) {
|
||||
if (item.handshake.query.sessionInfo && isValidSession(item.handshake.query.sessionInfo, filters.filter)) {
|
||||
liveSessions[projectKey].push(item.handshake.query.sessionInfo);
|
||||
liveSessionsPerProject[projectKey] = liveSessionsPerProject[projectKey] || new Set();
|
||||
if (withFilters) {
|
||||
if (item.handshake.query.sessionInfo &&
|
||||
isValidSession(item.handshake.query.sessionInfo, filters.filter) &&
|
||||
!sessIDs.has(item.handshake.query.sessionInfo.sessionID)
|
||||
) {
|
||||
liveSessionsPerProject[projectKey].add(item.handshake.query.sessionInfo);
|
||||
sessIDs.add(item.handshake.query.sessionInfo.sessionID);
|
||||
}
|
||||
} else {
|
||||
liveSessions[projectKey].push(item.handshake.query.sessionInfo);
|
||||
if (!sessIDs.has(item.handshake.query.sessionInfo.sessionID)) {
|
||||
liveSessionsPerProject[projectKey].add(item.handshake.query.sessionInfo);
|
||||
sessIDs.add(item.handshake.query.sessionInfo.sessionID);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
liveSessions[projectKey] = uniqueSessions(liveSessions[projectKey]);
|
||||
// Should be already unique
|
||||
// liveSessionsPerProject[projectKey] = uniqueSessions(liveSessionsPerProject[projectKey]);
|
||||
}
|
||||
}
|
||||
let liveSessions = {};
|
||||
liveSessionsPerProject.forEach((sessions, projectId) => {
|
||||
liveSessions[projectId] = Array.from(sessions);
|
||||
});
|
||||
respond(res, sortPaginate(liveSessions, filters));
|
||||
}
|
||||
|
||||
|
|
@ -169,7 +189,9 @@ const socketsLiveByProject = async function (req, res) {
|
|||
let _projectKey = extractProjectKeyFromRequest(req);
|
||||
let _sessionId = extractSessionIdFromRequest(req);
|
||||
let filters = await extractPayloadFromRequest(req, res);
|
||||
let liveSessions = {};
|
||||
let withFilters = hasFilters(filters);
|
||||
let liveSessions = new Set();
|
||||
const sessIDs = new Set();
|
||||
let rooms = await getAvailableRooms(io);
|
||||
for (let peerId of rooms.keys()) {
|
||||
let {projectKey, sessionId} = extractPeerId(peerId);
|
||||
|
|
@ -177,23 +199,28 @@ const socketsLiveByProject = async function (req, res) {
|
|||
let connected_sockets = await io.in(peerId).fetchSockets();
|
||||
for (let item of connected_sockets) {
|
||||
if (item.handshake.query.identity === IDENTITIES.session) {
|
||||
liveSessions[projectKey] = liveSessions[projectKey] || [];
|
||||
if (hasFilters(filters)) {
|
||||
if (item.handshake.query.sessionInfo && isValidSession(item.handshake.query.sessionInfo, filters.filter)) {
|
||||
liveSessions[projectKey].push(item.handshake.query.sessionInfo);
|
||||
if (withFilters) {
|
||||
if (item.handshake.query.sessionInfo &&
|
||||
isValidSession(item.handshake.query.sessionInfo, filters.filter) &&
|
||||
!sessIDs.has(item.handshake.query.sessionInfo.sessionID)
|
||||
) {
|
||||
liveSessions.add(item.handshake.query.sessionInfo);
|
||||
sessIDs.add(item.handshake.query.sessionInfo.sessionID);
|
||||
}
|
||||
} else {
|
||||
liveSessions[projectKey].push(item.handshake.query.sessionInfo);
|
||||
if (!sessIDs.has(item.handshake.query.sessionInfo.sessionID)) {
|
||||
liveSessions.add(item.handshake.query.sessionInfo);
|
||||
sessIDs.add(item.handshake.query.sessionInfo.sessionID);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
liveSessions[projectKey] = uniqueSessions(liveSessions[projectKey] || []);
|
||||
// Should be unique already because of using sessIDs set
|
||||
// liveSessions[projectKey] = uniqueSessions(liveSessions[projectKey] || []);
|
||||
}
|
||||
}
|
||||
liveSessions[_projectKey] = liveSessions[_projectKey] || [];
|
||||
respond(res, _sessionId === undefined ? sortPaginate(liveSessions[_projectKey], filters)
|
||||
: liveSessions[_projectKey].length > 0 ? liveSessions[_projectKey][0]
|
||||
: null);
|
||||
let sessions = Array.from(liveSessions);
|
||||
respond(res, _sessionId === undefined ? sortPaginate(sessions, filters) : sessions.length > 0 ? sessions[0] : null);
|
||||
}
|
||||
|
||||
const autocomplete = async function (req, res) {
|
||||
|
|
@ -218,10 +245,10 @@ const autocomplete = async function (req, res) {
|
|||
respond(res, uniqueAutocomplete(results));
|
||||
}
|
||||
|
||||
const findSessionSocketId = async (io, peerId) => {
|
||||
const connected_sockets = await io.in(peerId).fetchSockets();
|
||||
const findSessionSocketId = async (io, roomId, tabId) => {
|
||||
const connected_sockets = await io.in(roomId).fetchSockets();
|
||||
for (let item of connected_sockets) {
|
||||
if (item.handshake.query.identity === IDENTITIES.session) {
|
||||
if (item.handshake.query.identity === IDENTITIES.session && item.tabId === tabId) {
|
||||
return item.id;
|
||||
}
|
||||
}
|
||||
|
|
@ -285,14 +312,31 @@ module.exports = {
|
|||
socket.on(EVENTS_DEFINITION.listen.ERROR, err => errorHandler(EVENTS_DEFINITION.listen.ERROR, err));
|
||||
debug && console.log(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`);
|
||||
socket._connectedAt = new Date();
|
||||
|
||||
let {projectKey: connProjectKey, sessionId: connSessionId, tabId:connTabId} = extractPeerId(socket.handshake.query.peerId);
|
||||
socket.peerId = socket.handshake.query.peerId;
|
||||
socket.roomId = extractRoomId(socket.peerId);
|
||||
socket.tabId = connTabId;
|
||||
socket.identity = socket.handshake.query.identity;
|
||||
debug && console.log(`connProjectKey:${connProjectKey}, connSessionId:${connSessionId}, connTabId:${connTabId}, roomId:${socket.roomId}`);
|
||||
|
||||
let {c_sessions, c_agents} = await sessions_agents_count(io, socket);
|
||||
if (socket.identity === IDENTITIES.session) {
|
||||
if (c_sessions > 0) {
|
||||
debug && console.log(`session already connected, refusing new connexion`);
|
||||
io.to(socket.id).emit(EVENTS_DEFINITION.emit.SESSION_ALREADY_CONNECTED);
|
||||
return socket.disconnect();
|
||||
const rooms = await getAvailableRooms(io);
|
||||
for (let roomId of rooms.keys()) {
|
||||
let {projectKey} = extractPeerId(roomId);
|
||||
if (projectKey === connProjectKey) {
|
||||
const connected_sockets = await io.in(roomId).fetchSockets();
|
||||
for (let item of connected_sockets) {
|
||||
if (item.tabId === connTabId) {
|
||||
debug && console.log(`session already connected, refusing new connexion`);
|
||||
io.to(socket.id).emit(EVENTS_DEFINITION.emit.SESSION_ALREADY_CONNECTED);
|
||||
return socket.disconnect();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
extractSessionInfo(socket);
|
||||
if (c_agents > 0) {
|
||||
|
|
@ -306,36 +350,36 @@ module.exports = {
|
|||
debug && console.log(`notifying new agent about no SESSIONS with peerId:${socket.peerId}`);
|
||||
io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
|
||||
}
|
||||
await socket.join(socket.peerId);
|
||||
await socket.join(socket.roomId);
|
||||
const rooms = await getAvailableRooms(io);
|
||||
if (rooms.has(socket.peerId)) {
|
||||
if (rooms.has(socket.roomId)) {
|
||||
let connectedSockets = await io.in(socket.peerId).fetchSockets();
|
||||
debug && console.log(`${socket.id} joined room:${socket.peerId}, as:${socket.identity}, members:${connectedSockets.length}`);
|
||||
debug && console.log(`${socket.id} joined room:${socket.roomId}, as:${socket.identity}, members:${connectedSockets.length}`);
|
||||
}
|
||||
if (socket.identity === IDENTITIES.agent) {
|
||||
if (socket.handshake.query.agentInfo !== undefined) {
|
||||
socket.handshake.query.agentInfo = JSON.parse(socket.handshake.query.agentInfo);
|
||||
}
|
||||
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo);
|
||||
socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo);
|
||||
}
|
||||
|
||||
socket.on('disconnect', async () => {
|
||||
debug && console.log(`${socket.id} disconnected from ${socket.peerId}`);
|
||||
debug && console.log(`${socket.id} disconnected from ${socket.roomId}`);
|
||||
if (socket.identity === IDENTITIES.agent) {
|
||||
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id);
|
||||
socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id);
|
||||
}
|
||||
debug && console.log("checking for number of connected agents and sessions");
|
||||
let {c_sessions, c_agents} = await sessions_agents_count(io, socket);
|
||||
if (c_sessions === -1 && c_agents === -1) {
|
||||
debug && console.log(`room not found: ${socket.peerId}`);
|
||||
debug && console.log(`room not found: ${socket.roomId}`);
|
||||
}
|
||||
if (c_sessions === 0) {
|
||||
debug && console.log(`notifying everyone in ${socket.peerId} about no SESSIONS`);
|
||||
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
|
||||
debug && console.log(`notifying everyone in ${socket.roomId} about no SESSIONS`);
|
||||
socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
|
||||
}
|
||||
if (c_agents === 0) {
|
||||
debug && console.log(`notifying everyone in ${socket.peerId} about no AGENTS`);
|
||||
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NO_AGENTS);
|
||||
debug && console.log(`notifying everyone in ${socket.roomId} about no AGENTS`);
|
||||
socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_AGENTS);
|
||||
}
|
||||
});
|
||||
|
||||
|
|
@ -345,8 +389,21 @@ module.exports = {
|
|||
debug && console.log('Ignoring update event.');
|
||||
return
|
||||
}
|
||||
socket.handshake.query.sessionInfo = {...socket.handshake.query.sessionInfo, ...args[0]};
|
||||
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.UPDATE_EVENT, args[0]);
|
||||
Object.assign(socket.handshake.query.sessionInfo, args[0].data, {tabId: args[0].meta.tabId});
|
||||
socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.UPDATE_EVENT, args[0]);
|
||||
// Update sessionInfo for all sessions (TODO: rewrite this)
|
||||
const rooms = await getAvailableRooms(io);
|
||||
for (let roomId of rooms.keys()) {
|
||||
let {projectKey} = extractPeerId(roomId);
|
||||
if (projectKey === connProjectKey) {
|
||||
const connected_sockets = await io.in(roomId).fetchSockets();
|
||||
for (let item of connected_sockets) {
|
||||
if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo) {
|
||||
Object.assign(item.handshake.query.sessionInfo, args[0].data, {tabId: args[0].meta.tabId});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
socket.on(EVENTS_DEFINITION.listen.CONNECT_ERROR, err => errorHandler(EVENTS_DEFINITION.listen.CONNECT_ERROR, err));
|
||||
|
|
@ -358,13 +415,18 @@ module.exports = {
|
|||
return
|
||||
}
|
||||
if (socket.identity === IDENTITIES.session) {
|
||||
debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.peerId}`);
|
||||
socket.to(socket.peerId).emit(eventName, args[0]);
|
||||
debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.roomId}`);
|
||||
// TODO: send to all agents in the room
|
||||
socket.to(socket.roomId).emit(eventName, args[0]);
|
||||
} else {
|
||||
debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to session of room:${socket.peerId}`);
|
||||
let socketId = await findSessionSocketId(io, socket.peerId);
|
||||
debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to session of room:${socket.roomId}`);
|
||||
if (args[0].meta === undefined) {
|
||||
debug && console.log(`received event:${eventName}, from:${socket.identity}, but message structure is wrong, stopping onAny.`);
|
||||
return
|
||||
}
|
||||
let socketId = await findSessionSocketId(io, socket.roomId, args[0].meta.tabId);
|
||||
if (socketId === null) {
|
||||
debug && console.log(`session not found for:${socket.peerId}`);
|
||||
debug && console.log(`session not found for:${socket.roomId}`);
|
||||
io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
|
||||
} else {
|
||||
debug && console.log("message sent");
|
||||
|
|
|
|||
|
|
@ -67,25 +67,30 @@ const respond = function (res, data) {
|
|||
const socketsList = async function (req, res) {
|
||||
debug && console.log("[WS]looking for all available sessions");
|
||||
let filters = await extractPayloadFromRequest(req, res);
|
||||
let liveSessions = {};
|
||||
let withFilters = hasFilters(filters);
|
||||
let liveSessionsPerProject = {};
|
||||
let rooms = await getAvailableRooms(io);
|
||||
for (let peerId of rooms.keys()) {
|
||||
let {projectKey, sessionId} = extractPeerId(peerId);
|
||||
if (projectKey !== undefined) {
|
||||
liveSessions[projectKey] = liveSessions[projectKey] || [];
|
||||
if (hasFilters(filters)) {
|
||||
liveSessionsPerProject[projectKey] = liveSessionsPerProject[projectKey] || new Set();
|
||||
if (withFilters) {
|
||||
const connected_sockets = await io.in(peerId).fetchSockets();
|
||||
for (let item of connected_sockets) {
|
||||
if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo
|
||||
&& isValidSession(item.handshake.query.sessionInfo, filters.filter)) {
|
||||
liveSessions[projectKey].push(sessionId);
|
||||
liveSessionsPerProject[projectKey].add(sessionId);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
liveSessions[projectKey].push(sessionId);
|
||||
liveSessionsPerProject[projectKey].add(sessionId);
|
||||
}
|
||||
}
|
||||
}
|
||||
let liveSessions = {};
|
||||
liveSessionsPerProject.forEach((sessions, projectId) => {
|
||||
liveSessions[projectId] = Array.from(sessions);
|
||||
});
|
||||
respond(res, liveSessions);
|
||||
}
|
||||
|
||||
|
|
@ -94,35 +99,36 @@ const socketsListByProject = async function (req, res) {
|
|||
let _projectKey = extractProjectKeyFromRequest(req);
|
||||
let _sessionId = extractSessionIdFromRequest(req);
|
||||
let filters = await extractPayloadFromRequest(req, res);
|
||||
let liveSessions = {};
|
||||
let withFilters = hasFilters(filters);
|
||||
let liveSessions = new Set();
|
||||
let rooms = await getAvailableRooms(io);
|
||||
for (let peerId of rooms.keys()) {
|
||||
let {projectKey, sessionId} = extractPeerId(peerId);
|
||||
if (projectKey === _projectKey && (_sessionId === undefined || _sessionId === sessionId)) {
|
||||
liveSessions[projectKey] = liveSessions[projectKey] || [];
|
||||
if (hasFilters(filters)) {
|
||||
if (withFilters) {
|
||||
const connected_sockets = await io.in(peerId).fetchSockets();
|
||||
for (let item of connected_sockets) {
|
||||
if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo
|
||||
&& isValidSession(item.handshake.query.sessionInfo, filters.filter)) {
|
||||
liveSessions[projectKey].push(sessionId);
|
||||
liveSessions.add(sessionId);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
liveSessions[projectKey].push(sessionId);
|
||||
liveSessions.add(sessionId);
|
||||
}
|
||||
}
|
||||
}
|
||||
liveSessions[_projectKey] = liveSessions[_projectKey] || [];
|
||||
respond(res, _sessionId === undefined ? sortPaginate(liveSessions[_projectKey], filters)
|
||||
: liveSessions[_projectKey].length > 0 ? liveSessions[_projectKey][0]
|
||||
let sessions = Array.from(liveSessions);
|
||||
respond(res, _sessionId === undefined ? sortPaginate(sessions, filters)
|
||||
: sessions.length > 0 ? sessions[0]
|
||||
: null);
|
||||
}
|
||||
|
||||
const socketsLive = async function (req, res) {
|
||||
debug && console.log("[WS]looking for all available LIVE sessions");
|
||||
let filters = await extractPayloadFromRequest(req, res);
|
||||
let liveSessions = {};
|
||||
let withFilters = hasFilters(filters);
|
||||
let liveSessionsPerProject = {};
|
||||
let rooms = await getAvailableRooms(io);
|
||||
for (let peerId of rooms.keys()) {
|
||||
let {projectKey} = extractPeerId(peerId);
|
||||
|
|
@ -130,18 +136,22 @@ const socketsLive = async function (req, res) {
|
|||
let connected_sockets = await io.in(peerId).fetchSockets();
|
||||
for (let item of connected_sockets) {
|
||||
if (item.handshake.query.identity === IDENTITIES.session) {
|
||||
liveSessions[projectKey] = liveSessions[projectKey] || [];
|
||||
liveSessionsPerProject[projectKey] = liveSessionsPerProject[projectKey] || new Set();
|
||||
if (hasFilters(filters)) {
|
||||
if (item.handshake.query.sessionInfo && isValidSession(item.handshake.query.sessionInfo, filters.filter)) {
|
||||
liveSessions[projectKey].push(item.handshake.query.sessionInfo);
|
||||
liveSessionsPerProject[projectKey].add(item.handshake.query.sessionInfo);
|
||||
}
|
||||
} else {
|
||||
liveSessions[projectKey].push(item.handshake.query.sessionInfo);
|
||||
liveSessionsPerProject[projectKey].add(item.handshake.query.sessionInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let liveSessions = {};
|
||||
liveSessionsPerProject.forEach((sessions, projectId) => {
|
||||
liveSessions[projectId] = Array.from(sessions);
|
||||
});
|
||||
respond(res, sortPaginate(liveSessions, filters));
|
||||
}
|
||||
|
||||
|
|
@ -150,7 +160,9 @@ const socketsLiveByProject = async function (req, res) {
|
|||
let _projectKey = extractProjectKeyFromRequest(req);
|
||||
let _sessionId = extractSessionIdFromRequest(req);
|
||||
let filters = await extractPayloadFromRequest(req, res);
|
||||
let liveSessions = {};
|
||||
let withFilters = hasFilters(filters);
|
||||
let liveSessions = new Set();
|
||||
const sessIDs = new Set();
|
||||
let rooms = await getAvailableRooms(io);
|
||||
for (let peerId of rooms.keys()) {
|
||||
let {projectKey, sessionId} = extractPeerId(peerId);
|
||||
|
|
@ -158,22 +170,26 @@ const socketsLiveByProject = async function (req, res) {
|
|||
let connected_sockets = await io.in(peerId).fetchSockets();
|
||||
for (let item of connected_sockets) {
|
||||
if (item.handshake.query.identity === IDENTITIES.session) {
|
||||
liveSessions[projectKey] = liveSessions[projectKey] || [];
|
||||
if (hasFilters(filters)) {
|
||||
if (item.handshake.query.sessionInfo && isValidSession(item.handshake.query.sessionInfo, filters.filter)) {
|
||||
liveSessions[projectKey].push(item.handshake.query.sessionInfo);
|
||||
if (withFilters) {
|
||||
if (item.handshake.query.sessionInfo &&
|
||||
isValidSession(item.handshake.query.sessionInfo, filters.filter) &&
|
||||
!sessIDs.has(item.handshake.query.sessionInfo.sessionID)
|
||||
) {
|
||||
liveSessions.add(item.handshake.query.sessionInfo);
|
||||
sessIDs.add(item.handshake.query.sessionInfo.sessionID);
|
||||
}
|
||||
} else {
|
||||
liveSessions[projectKey].push(item.handshake.query.sessionInfo);
|
||||
if (!sessIDs.has(item.handshake.query.sessionInfo.sessionID)) {
|
||||
liveSessions.add(item.handshake.query.sessionInfo);
|
||||
sessIDs.add(item.handshake.query.sessionInfo.sessionID);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
liveSessions[_projectKey] = liveSessions[_projectKey] || [];
|
||||
respond(res, _sessionId === undefined ? sortPaginate(liveSessions[_projectKey], filters)
|
||||
: liveSessions[_projectKey].length > 0 ? liveSessions[_projectKey][0]
|
||||
: null);
|
||||
let sessions = Array.from(liveSessions);
|
||||
respond(res, _sessionId === undefined ? sortPaginate(sessions, filters) : sessions.length > 0 ? sessions[0] : null);
|
||||
}
|
||||
|
||||
const autocomplete = async function (req, res) {
|
||||
|
|
@ -198,10 +214,10 @@ const autocomplete = async function (req, res) {
|
|||
respond(res, uniqueAutocomplete(results));
|
||||
}
|
||||
|
||||
const findSessionSocketId = async (io, peerId) => {
|
||||
const connected_sockets = await io.in(peerId).fetchSockets();
|
||||
const findSessionSocketId = async (io, roomId, tabId) => {
|
||||
const connected_sockets = await io.in(roomId).fetchSockets();
|
||||
for (let item of connected_sockets) {
|
||||
if (item.handshake.query.identity === IDENTITIES.session) {
|
||||
if (item.handshake.query.identity === IDENTITIES.session && item.tabId === tabId) {
|
||||
return item.id;
|
||||
}
|
||||
}
|
||||
|
|
@ -265,56 +281,73 @@ module.exports = {
|
|||
socket.on(EVENTS_DEFINITION.listen.ERROR, err => errorHandler(EVENTS_DEFINITION.listen.ERROR, err));
|
||||
debug && console.log(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`);
|
||||
socket._connectedAt = new Date();
|
||||
|
||||
let {projectKey: connProjectKey, sessionId: connSessionId, tabId:connTabId} = extractPeerId(socket.handshake.query.peerId);
|
||||
socket.peerId = socket.handshake.query.peerId;
|
||||
socket.roomId = extractRoomId(socket.peerId);
|
||||
socket.tabId = connTabId;
|
||||
socket.identity = socket.handshake.query.identity;
|
||||
debug && console.log(`connProjectKey:${connProjectKey}, connSessionId:${connSessionId}, connTabId:${connTabId}, roomId:${socket.roomId}`);
|
||||
|
||||
let {c_sessions, c_agents} = await sessions_agents_count(io, socket);
|
||||
if (socket.identity === IDENTITIES.session) {
|
||||
if (c_sessions > 0) {
|
||||
debug && console.log(`session already connected, refusing new connexion`);
|
||||
io.to(socket.id).emit(EVENTS_DEFINITION.emit.SESSION_ALREADY_CONNECTED);
|
||||
return socket.disconnect();
|
||||
const rooms = await getAvailableRooms(io);
|
||||
for (let roomId of rooms.keys()) {
|
||||
let {projectKey} = extractPeerId(roomId);
|
||||
if (projectKey === connProjectKey) {
|
||||
const connected_sockets = await io.in(roomId).fetchSockets();
|
||||
for (let item of connected_sockets) {
|
||||
if (item.tabId === connTabId) {
|
||||
debug && console.log(`session already connected, refusing new connexion`);
|
||||
io.to(socket.id).emit(EVENTS_DEFINITION.emit.SESSION_ALREADY_CONNECTED);
|
||||
return socket.disconnect();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
extractSessionInfo(socket);
|
||||
if (c_agents > 0) {
|
||||
debug && console.log(`notifying new session about agent-existence`);
|
||||
let agents_ids = await get_all_agents_ids(io, socket);
|
||||
io.to(socket.id).emit(EVENTS_DEFINITION.emit.AGENTS_CONNECTED, agents_ids);
|
||||
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id);
|
||||
socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id);
|
||||
}
|
||||
|
||||
} else if (c_sessions <= 0) {
|
||||
debug && console.log(`notifying new agent about no SESSIONS with peerId:${socket.peerId}`);
|
||||
io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
|
||||
}
|
||||
await socket.join(socket.peerId);
|
||||
await socket.join(socket.roomId);
|
||||
const rooms = await getAvailableRooms(io);
|
||||
if (rooms.get(socket.peerId)) {
|
||||
debug && console.log(`${socket.id} joined room:${socket.peerId}, as:${socket.identity}, members:${rooms.get(socket.peerId).size}`);
|
||||
if (rooms.get(socket.roomId)) {
|
||||
debug && console.log(`${socket.id} joined room:${socket.roomId}, as:${socket.identity}, members:${rooms.get(socket.roomId).size}`);
|
||||
}
|
||||
if (socket.identity === IDENTITIES.agent) {
|
||||
if (socket.handshake.query.agentInfo !== undefined) {
|
||||
socket.handshake.query.agentInfo = JSON.parse(socket.handshake.query.agentInfo);
|
||||
}
|
||||
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo);
|
||||
socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo);
|
||||
}
|
||||
|
||||
socket.on('disconnect', async () => {
|
||||
debug && console.log(`${socket.id} disconnected from ${socket.peerId}`);
|
||||
debug && console.log(`${socket.id} disconnected from ${socket.roomId}`);
|
||||
if (socket.identity === IDENTITIES.agent) {
|
||||
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id);
|
||||
socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id);
|
||||
}
|
||||
debug && console.log("checking for number of connected agents and sessions");
|
||||
let {c_sessions, c_agents} = await sessions_agents_count(io, socket);
|
||||
if (c_sessions === -1 && c_agents === -1) {
|
||||
debug && console.log(`room not found: ${socket.peerId}`);
|
||||
debug && console.log(`room not found: ${socket.roomId}`);
|
||||
}
|
||||
if (c_sessions === 0) {
|
||||
debug && console.log(`notifying everyone in ${socket.peerId} about no SESSIONS`);
|
||||
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
|
||||
debug && console.log(`notifying everyone in ${socket.roomId} about no SESSIONS`);
|
||||
socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
|
||||
}
|
||||
if (c_agents === 0) {
|
||||
debug && console.log(`notifying everyone in ${socket.peerId} about no AGENTS`);
|
||||
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NO_AGENTS);
|
||||
socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_AGENTS);
|
||||
}
|
||||
});
|
||||
|
||||
|
|
@ -324,8 +357,21 @@ module.exports = {
|
|||
debug && console.log('Ignoring update event.');
|
||||
return
|
||||
}
|
||||
socket.handshake.query.sessionInfo = {...socket.handshake.query.sessionInfo, ...args[0]};
|
||||
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.UPDATE_EVENT, args[0]);
|
||||
Object.assign(socket.handshake.query.sessionInfo, args[0].data, {tabId: args[0].meta.tabId});
|
||||
socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.UPDATE_EVENT, args[0]);
|
||||
// Update sessionInfo for all sessions (TODO: rewrite this)
|
||||
const rooms = await getAvailableRooms(io);
|
||||
for (let roomId of rooms.keys()) {
|
||||
let {projectKey} = extractPeerId(roomId);
|
||||
if (projectKey === connProjectKey) {
|
||||
const connected_sockets = await io.in(roomId).fetchSockets();
|
||||
for (let item of connected_sockets) {
|
||||
if (item.handshake.query.identity === IDENTITIES.session && item.handshake.query.sessionInfo) {
|
||||
Object.assign(item.handshake.query.sessionInfo, args[0].data, {tabId: args[0].meta.tabId});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
socket.on(EVENTS_DEFINITION.listen.CONNECT_ERROR, err => errorHandler(EVENTS_DEFINITION.listen.CONNECT_ERROR, err));
|
||||
|
|
@ -338,12 +384,16 @@ module.exports = {
|
|||
}
|
||||
if (socket.identity === IDENTITIES.session) {
|
||||
debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.peerId}`);
|
||||
socket.to(socket.peerId).emit(eventName, args[0]);
|
||||
socket.to(socket.roomId).emit(eventName, args[0]);
|
||||
} else {
|
||||
debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to session of room:${socket.peerId}`);
|
||||
let socketId = await findSessionSocketId(io, socket.peerId);
|
||||
if (args[0].meta === undefined) {
|
||||
debug && console.log(`received event:${eventName}, from:${socket.identity}, but message structure is wrong, stopping onAny.`);
|
||||
return
|
||||
}
|
||||
let socketId = await findSessionSocketId(io, socket.roomId, args[0].meta.tabId);
|
||||
if (socketId === null) {
|
||||
debug && console.log(`session not found for:${socket.peerId}`);
|
||||
debug && console.log(`session not found for:${socket.roomId}`);
|
||||
io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
|
||||
} else {
|
||||
debug && console.log("message sent");
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue