feat(assist): fixed double UPDATE_SESSION event emit

feat(assist): upgraded EE dependencies
This commit is contained in:
Taha Yassine Kraiem 2022-08-18 11:59:10 +01:00
parent e856982d6d
commit 38d6bcc404
6 changed files with 70 additions and 65 deletions

View file

@ -10,9 +10,9 @@
"license": "Elastic License 2.0 (ELv2)",
"dependencies": {
"@maxmind/geoip2-node": "^3.4.0",
"@socket.io/redis-adapter": "^7.1.0",
"express": "^4.17.1",
"redis": "^4.0.3",
"@socket.io/redis-adapter": "^7.2.0",
"express": "^4.18.1",
"redis": "^4.2.0",
"socket.io": "^4.5.1",
"ua-parser-js": "^1.0.2",
"uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.10.0"

View file

@ -19,9 +19,9 @@
"homepage": "https://github.com/openreplay/openreplay#readme",
"dependencies": {
"@maxmind/geoip2-node": "^3.4.0",
"@socket.io/redis-adapter": "^7.1.0",
"express": "^4.17.1",
"redis": "^4.0.3",
"@socket.io/redis-adapter": "^7.2.0",
"express": "^4.18.1",
"redis": "^4.2.0",
"socket.io": "^4.5.1",
"ua-parser-js": "^1.0.2",
"uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.10.0"

View file

@ -9,6 +9,8 @@ const {
uniqueAutocomplete
} = require('../utils/helper');
const {
IDENTITIES,
EVENTS_DEFINITION,
extractSessionInfo
} = require('../utils/assistHelper');
const {
@ -19,15 +21,6 @@ const {
const {createAdapter} = require("@socket.io/redis-adapter");
const {createClient} = require("redis");
const wsRouter = express.Router();
const UPDATE_EVENT = "UPDATE_SESSION";
const IDENTITIES = {agent: 'agent', session: 'session'};
const NEW_AGENT = "NEW_AGENT";
const NO_AGENTS = "NO_AGENT";
const AGENT_DISCONNECT = "AGENT_DISCONNECTED";
const AGENTS_CONNECTED = "AGENTS_CONNECTED";
const NO_SESSIONS = "SESSION_DISCONNECTED";
const SESSION_ALREADY_CONNECTED = "SESSION_ALREADY_CONNECTED";
const SESSION_RECONNECTED = "SESSION_RECONNECTED";
const REDIS_URL = process.env.REDIS_URL || "redis://localhost:6379";
const pubClient = createClient({url: REDIS_URL});
const subClient = pubClient.duplicate();
@ -295,20 +288,20 @@ module.exports = {
if (socket.identity === IDENTITIES.session) {
if (c_sessions > 0) {
debug && console.log(`session already connected, refusing new connexion`);
io.to(socket.id).emit(SESSION_ALREADY_CONNECTED);
io.to(socket.id).emit(EVENTS_DEFINITION.emit.SESSION_ALREADY_CONNECTED);
return socket.disconnect();
}
extractSessionInfo(socket);
if (c_agents > 0) {
debug && console.log(`notifying new session about agent-existence`);
let agents_ids = await get_all_agents_ids(io, socket);
io.to(socket.id).emit(AGENTS_CONNECTED, agents_ids);
socket.to(socket.peerId).emit(SESSION_RECONNECTED, socket.id);
io.to(socket.id).emit(EVENTS_DEFINITION.emit.AGENTS_CONNECTED, agents_ids);
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id);
}
} else if (c_sessions <= 0) {
debug && console.log(`notifying new agent about no SESSIONS`);
io.to(socket.id).emit(NO_SESSIONS);
io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
}
await io.of('/').adapter.remoteJoin(socket.id, socket.peerId);
let rooms = await io.of('/').adapter.allRooms();
@ -320,13 +313,13 @@ module.exports = {
if (socket.handshake.query.agentInfo !== undefined) {
socket.handshake.query.agentInfo = JSON.parse(socket.handshake.query.agentInfo);
}
socket.to(socket.peerId).emit(NEW_AGENT, socket.id, socket.handshake.query.agentInfo);
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo);
}
socket.on('disconnect', async () => {
debug && console.log(`${socket.id} disconnected from ${socket.peerId}`);
if (socket.identity === IDENTITIES.agent) {
socket.to(socket.peerId).emit(AGENT_DISCONNECT, socket.id);
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id);
}
debug && console.log("checking for number of connected agents and sessions");
let {c_sessions, c_agents} = await sessions_agents_count(io, socket);
@ -335,25 +328,29 @@ module.exports = {
}
if (c_sessions === 0) {
debug && console.log(`notifying everyone in ${socket.peerId} about no SESSIONS`);
socket.to(socket.peerId).emit(NO_SESSIONS);
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
}
if (c_agents === 0) {
debug && console.log(`notifying everyone in ${socket.peerId} about no AGENTS`);
socket.to(socket.peerId).emit(NO_AGENTS);
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NO_AGENTS);
}
});
socket.on(UPDATE_EVENT, async (...args) => {
socket.on(EVENTS_DEFINITION.listen.UPDATE_EVENT, async (...args) => {
debug && console.log(`${socket.id} sent update event.`);
if (socket.identity !== IDENTITIES.session) {
debug && console.log('Ignoring update event.');
return
}
socket.handshake.query.sessionInfo = {...socket.handshake.query.sessionInfo, ...args[0]};
socket.to(socket.peerId).emit(UPDATE_EVENT, args[0]);
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.UPDATE_EVENT, args[0]);
});
socket.onAny(async (eventName, ...args) => {
if (Object.values(EVENTS_DEFINITION.listen).indexOf(eventName) >= 0) {
debug && console.log(`received event:${eventName}, should be handled by another listener, stopping onAny.`);
return
}
if (socket.identity === IDENTITIES.session) {
debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.peerId}`);
socket.to(socket.peerId).emit(eventName, args[0]);
@ -362,7 +359,7 @@ module.exports = {
let socketId = await findSessionSocketId(io, socket.peerId);
if (socketId === null) {
debug && console.log(`session not found for:${socket.peerId}`);
io.to(socket.id).emit(NO_SESSIONS);
io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
} else {
debug && console.log("message sent");
io.to(socketId).emit(eventName, socket.id, args[0]);

View file

@ -9,6 +9,8 @@ const {
uniqueAutocomplete
} = require('../utils/helper');
const {
IDENTITIES,
EVENTS_DEFINITION,
extractSessionInfo
} = require('../utils/assistHelper');
const {
@ -17,15 +19,6 @@ const {
extractPayloadFromRequest,
} = require('../utils/helper-ee');
const wsRouter = express.Router();
const UPDATE_EVENT = "UPDATE_SESSION";
const IDENTITIES = {agent: 'agent', session: 'session'};
const NEW_AGENT = "NEW_AGENT";
const NO_AGENTS = "NO_AGENT";
const AGENT_DISCONNECT = "AGENT_DISCONNECTED";
const AGENTS_CONNECTED = "AGENTS_CONNECTED";
const NO_SESSIONS = "SESSION_DISCONNECTED";
const SESSION_ALREADY_CONNECTED = "SESSION_ALREADY_CONNECTED";
const SESSION_RECONNECTED = "SESSION_RECONNECTED";
let io;
const debug = process.env.debug === "1" || false;
@ -273,20 +266,20 @@ module.exports = {
if (socket.identity === IDENTITIES.session) {
if (c_sessions > 0) {
debug && console.log(`session already connected, refusing new connexion`);
io.to(socket.id).emit(SESSION_ALREADY_CONNECTED);
io.to(socket.id).emit(EVENTS_DEFINITION.emit.SESSION_ALREADY_CONNECTED);
return socket.disconnect();
}
extractSessionInfo(socket);
if (c_agents > 0) {
debug && console.log(`notifying new session about agent-existence`);
let agents_ids = await get_all_agents_ids(io, socket);
io.to(socket.id).emit(AGENTS_CONNECTED, agents_ids);
socket.to(socket.peerId).emit(SESSION_RECONNECTED, socket.id);
io.to(socket.id).emit(EVENTS_DEFINITION.emit.AGENTS_CONNECTED, agents_ids);
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id);
}
} else if (c_sessions <= 0) {
debug && console.log(`notifying new agent about no SESSIONS`);
io.to(socket.id).emit(NO_SESSIONS);
io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
}
socket.join(socket.peerId);
if (io.sockets.adapter.rooms.get(socket.peerId)) {
@ -296,13 +289,13 @@ module.exports = {
if (socket.handshake.query.agentInfo !== undefined) {
socket.handshake.query.agentInfo = JSON.parse(socket.handshake.query.agentInfo);
}
socket.to(socket.peerId).emit(NEW_AGENT, socket.id, socket.handshake.query.agentInfo);
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo);
}
socket.on('disconnect', async () => {
debug && console.log(`${socket.id} disconnected from ${socket.peerId}`);
if (socket.identity === IDENTITIES.agent) {
socket.to(socket.peerId).emit(AGENT_DISCONNECT, socket.id);
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id);
}
debug && console.log("checking for number of connected agents and sessions");
let {c_sessions, c_agents} = await sessions_agents_count(io, socket);
@ -315,21 +308,25 @@ module.exports = {
}
if (c_agents === 0) {
debug && console.log(`notifying everyone in ${socket.peerId} about no AGENTS`);
socket.to(socket.peerId).emit(NO_AGENTS);
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NO_AGENTS);
}
});
socket.on(UPDATE_EVENT, async (...args) => {
socket.on(EVENTS_DEFINITION.listen.UPDATE_EVENT, async (...args) => {
debug && console.log(`${socket.id} sent update event.`);
if (socket.identity !== IDENTITIES.session) {
debug && console.log('Ignoring update event.');
return
}
socket.handshake.query.sessionInfo = {...socket.handshake.query.sessionInfo, ...args[0]};
socket.to(socket.peerId).emit(UPDATE_EVENT, args[0]);
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.UPDATE_EVENT, args[0]);
});
socket.onAny(async (eventName, ...args) => {
if (Object.values(EVENTS_DEFINITION.listen).indexOf(eventName) >= 0) {
debug && console.log(`received event:${eventName}, should be handled by another listener, stopping onAny.`);
return
}
if (socket.identity === IDENTITIES.session) {
debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.peerId}`);
socket.to(socket.peerId).emit(eventName, args[0]);

View file

@ -12,18 +12,11 @@ const {
uniqueAutocomplete
} = require('../utils/helper');
const {
IDENTITIES,
EVENTS_DEFINITION,
extractSessionInfo
} = require('../utils/assistHelper');
const wsRouter = express.Router();
const UPDATE_EVENT = "UPDATE_SESSION";
const IDENTITIES = {agent: 'agent', session: 'session'};
const NEW_AGENT = "NEW_AGENT";
const NO_AGENTS = "NO_AGENT";
const AGENT_DISCONNECT = "AGENT_DISCONNECTED";
const AGENTS_CONNECTED = "AGENTS_CONNECTED";
const NO_SESSIONS = "SESSION_DISCONNECTED";
const SESSION_ALREADY_CONNECTED = "SESSION_ALREADY_CONNECTED";
const SESSION_RECONNECTED = "SESSION_RECONNECTED";
let io;
const debug = process.env.debug === "1" || false;
@ -254,20 +247,20 @@ module.exports = {
if (socket.identity === IDENTITIES.session) {
if (c_sessions > 0) {
debug && console.log(`session already connected, refusing new connexion`);
io.to(socket.id).emit(SESSION_ALREADY_CONNECTED);
io.to(socket.id).emit(EVENTS_DEFINITION.emit.SESSION_ALREADY_CONNECTED);
return socket.disconnect();
}
extractSessionInfo(socket);
if (c_agents > 0) {
debug && console.log(`notifying new session about agent-existence`);
let agents_ids = await get_all_agents_ids(io, socket);
io.to(socket.id).emit(AGENTS_CONNECTED, agents_ids);
socket.to(socket.peerId).emit(SESSION_RECONNECTED, socket.id);
io.to(socket.id).emit(EVENTS_DEFINITION.emit.AGENTS_CONNECTED, agents_ids);
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id);
}
} else if (c_sessions <= 0) {
debug && console.log(`notifying new agent about no SESSIONS`);
io.to(socket.id).emit(NO_SESSIONS);
io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
}
socket.join(socket.peerId);
if (io.sockets.adapter.rooms.get(socket.peerId)) {
@ -277,13 +270,13 @@ module.exports = {
if (socket.handshake.query.agentInfo !== undefined) {
socket.handshake.query.agentInfo = JSON.parse(socket.handshake.query.agentInfo);
}
socket.to(socket.peerId).emit(NEW_AGENT, socket.id, socket.handshake.query.agentInfo);
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo);
}
socket.on('disconnect', async () => {
debug && console.log(`${socket.id} disconnected from ${socket.peerId}`);
if (socket.identity === IDENTITIES.agent) {
socket.to(socket.peerId).emit(AGENT_DISCONNECT, socket.id);
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id);
}
debug && console.log("checking for number of connected agents and sessions");
let {c_sessions, c_agents} = await sessions_agents_count(io, socket);
@ -292,25 +285,29 @@ module.exports = {
}
if (c_sessions === 0) {
debug && console.log(`notifying everyone in ${socket.peerId} about no SESSIONS`);
socket.to(socket.peerId).emit(NO_SESSIONS);
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
}
if (c_agents === 0) {
debug && console.log(`notifying everyone in ${socket.peerId} about no AGENTS`);
socket.to(socket.peerId).emit(NO_AGENTS);
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.NO_AGENTS);
}
});
socket.on(UPDATE_EVENT, async (...args) => {
socket.on(EVENTS_DEFINITION.listen.UPDATE_EVENT, async (...args) => {
debug && console.log(`${socket.id} sent update event.`);
if (socket.identity !== IDENTITIES.session) {
debug && console.log('Ignoring update event.');
return
}
socket.handshake.query.sessionInfo = {...socket.handshake.query.sessionInfo, ...args[0]};
socket.to(socket.peerId).emit(UPDATE_EVENT, args[0]);
socket.to(socket.peerId).emit(EVENTS_DEFINITION.emit.UPDATE_EVENT, args[0]);
});
socket.onAny(async (eventName, ...args) => {
if (Object.values(EVENTS_DEFINITION.listen).indexOf(eventName) >= 0) {
debug && console.log(`received event:${eventName}, should be handled by another listener, stopping onAny.`);
return
}
if (socket.identity === IDENTITIES.session) {
debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to room:${socket.peerId}`);
socket.to(socket.peerId).emit(eventName, args[0]);
@ -319,7 +316,7 @@ module.exports = {
let socketId = await findSessionSocketId(io, socket.peerId);
if (socketId === null) {
debug && console.log(`session not found for:${socket.peerId}`);
io.to(socket.id).emit(NO_SESSIONS);
io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS);
} else {
debug && console.log("message sent");
io.to(socketId).emit(eventName, socket.id, args[0]);

View file

@ -2,6 +2,20 @@ const uaParser = require('ua-parser-js');
const {geoip} = require('./geoIP');
let debug = process.env.debug === "1" || false;
const IDENTITIES = {agent: 'agent', session: 'session'};
const EVENTS_DEFINITION = {
listen: {UPDATE_EVENT: "UPDATE_SESSION"}
};
EVENTS_DEFINITION.emit = {
NEW_AGENT: "NEW_AGENT",
NO_AGENTS: "NO_AGENT",
AGENT_DISCONNECT: "AGENT_DISCONNECTED",
AGENTS_CONNECTED: "AGENTS_CONNECTED",
NO_SESSIONS: "SESSION_DISCONNECTED",
SESSION_ALREADY_CONNECTED: "SESSION_ALREADY_CONNECTED",
SESSION_RECONNECTED: "SESSION_RECONNECTED",
UPDATE_EVENT: EVENTS_DEFINITION.listen.UPDATE_EVENT
};
const BASE_sessionInfo = {
"pageTitle": "Page",
@ -57,5 +71,5 @@ const extractSessionInfo = function (socket) {
module.exports = {
extractSessionInfo
extractSessionInfo, EVENTS_DEFINITION, IDENTITIES
};