From 8dbf7d889311aaed467f28d8bf3eac532ba558b7 Mon Sep 17 00:00:00 2001 From: Alexander Date: Tue, 13 May 2025 13:53:40 +0200 Subject: [PATCH] feat(assist): added the assist stats part to the node.js app --- ee/assist/app/cache.js | 14 +++ ee/assist/app/socket.js | 8 ++ ee/assist/app/stats.js | 211 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 233 insertions(+) create mode 100644 ee/assist/app/stats.js diff --git a/ee/assist/app/cache.js b/ee/assist/app/cache.js index efe3c7181..bf291387c 100644 --- a/ee/assist/app/cache.js +++ b/ee/assist/app/cache.js @@ -32,6 +32,19 @@ const localCache = { deletedSessions: new Set() }; +const sendAssistEvent = async function (payload) { + try { + if (typeof payload !== "string") { + logger.warn("sendAssistEvent received non-string payload. Converting to string."); + payload = JSON.stringify(payload); + } + await redisClient.rpush("assist:stats", payload); + logger.debug("Assist event sent to Redis: " + payload); + } catch (error) { + logger.error(`Failed to send assist event to Redis: ${error}`); + } +}; + const addSession = async function (sessionID) { await mutex.runExclusive(() => { localCache.addedSessions.add(sessionID); @@ -172,6 +185,7 @@ function startCacheRefresher(io) { } module.exports = { + sendAssistEvent, addSession, updateSession, renewSession, diff --git a/ee/assist/app/socket.js b/ee/assist/app/socket.js index f2bb882ae..4ecc4fd7d 100644 --- a/ee/assist/app/socket.js +++ b/ee/assist/app/socket.js @@ -14,6 +14,11 @@ const { const { logger } = require('./logger'); +const { + startAssist, + endAssist, + handleEvent +} = require('./stats'); const deepMerge = require('@fastify/deepmerge')({all: true}); let io; @@ -133,6 +138,7 @@ async function onConnect(socket) { if (socket.handshake.query.agentInfo !== undefined) { socket.handshake.query.agentInfo = JSON.parse(socket.handshake.query.agentInfo); socket.handshake.query.agentID = socket.handshake.query.agentInfo.id; + startAssist(socket, socket.handshake.query.agentID); } sendFrom(socket, socket.handshake.query.roomId, EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo); } @@ -165,6 +171,7 @@ async function onDisconnect(socket) { logger.debug(`${socket.id} disconnected from ${socket.handshake.query.roomId}`); if (socket.handshake.query.identity === IDENTITIES.agent) { + endAssist(socket, socket.handshake.query.agentID); sendFrom(socket, socket.handshake.query.roomId, EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id); } logger.debug("checking for number of connected agents and sessions"); @@ -232,6 +239,7 @@ async function onAny(socket, eventName, ...args) { logger.debug(`received event:${eventName}, from:${socket.handshake.query.identity}, sending message to room:${socket.handshake.query.roomId}`); sendFrom(socket, socket.handshake.query.roomId, eventName, args[0]); } else { + handleEvent(eventName, socket, args[0]); logger.debug(`received event:${eventName}, from:${socket.handshake.query.identity}, sending message to session of room:${socket.handshake.query.roomId}`); let socketId = await findSessionSocketId(socket.handshake.query.roomId, args[0]?.meta?.tabId); if (socketId === null) { diff --git a/ee/assist/app/stats.js b/ee/assist/app/stats.js new file mode 100644 index 000000000..63c2edd5d --- /dev/null +++ b/ee/assist/app/stats.js @@ -0,0 +1,211 @@ +const {logger} = require('./logger'); +const {sendAssistEvent} = require('./cache'); + +class InMemoryCache { + constructor() { + this.cache = new Map(); + } + + set(key, value) { + this.cache.set(key, value); + } + + get(key) { + return this.cache.get(key); + } + + delete(key) { + this.cache.delete(key); + } + + clear() { + this.cache.clear(); + } +} + +const cache = new InMemoryCache(); + +function startAssist(socket, agentID) { + const tsNow = +new Date(); + const eventID = `${socket.handshake.query.sessId}_${agentID}_assist_${tsNow}`; + void sendAssistEvent({ + "project_id": socket.handshake.query.projectId, + "session_id": socket.handshake.query.sessId, + "agent_id": agentID, + "event_id": eventID, + "event_type": "assist", + "event_state": "start", + "timestamp": tsNow, + }); + // Save uniq eventID to cache + cache.set(`${socket.handshake.query.sessId}_${agentID}_assist`, eventID); + // Debug log + logger.debug(`assist_started, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${tsNow}`); +} + +function endAssist(socket, agentID) { + const eventID = cache.get(`${socket.handshake.query.sessId}_${agentID}_assist`); + if (eventID === undefined) { + logger.debug(`have to skip assist_ended, no eventID in the cache, agentID: ${socket.handshake.query.agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}`); + return + } + void sendAssistEvent({ + "project_id": socket.handshake.query.projectId, + "session_id": socket.handshake.query.sessId, + "agent_id": agentID, + "event_id": eventID, + "event_type": "assist", + "event_state": "end", + "timestamp": +new Date(), + }) + // Remove eventID from cache + cache.delete(`${socket.handshake.query.sessId}_${agentID}_assist`); + // Debug logs + logger.debug(`assist_ended, agentID: ${socket.handshake.query.agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}`); +} + +function startCall(socket, agentID) { + const tsNow = +new Date(); + const eventID = `${socket.handshake.query.sessId}_${agentID}_call_${tsNow}`; + void sendAssistEvent({ + "project_id": socket.handshake.query.projectId, + "session_id": socket.handshake.query.sessId, + "agent_id": agentID, + "event_id": eventID, + "event_type": "call", + "event_state": "start", + "timestamp": tsNow, + }); + // Save uniq eventID to cache + cache.set(`${socket.handshake.query.sessId}_call`, eventID); + // Debug logs + logger.debug(`s_call_started, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${tsNow}`); +} + +function endCall(socket, agentID) { + const tsNow = +new Date(); + const eventID = cache.get(`${socket.handshake.query.sessId}_call`); + if (eventID === undefined) { + logger.debug(`have to skip s_call_ended, no eventID in the cache, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${tsNow}`); + return + } + void sendAssistEvent({ + "project_id": socket.handshake.query.projectId, + "session_id": socket.handshake.query.sessId, + "agent_id": agentID, + "event_id": eventID, + "event_type": "call", + "event_state": "end", + "timestamp": tsNow, + }); + cache.delete(`${socket.handshake.query.sessId}_call`) + // Debug logs + logger.debug(`s_call_ended, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${tsNow}`); +} + +function startControl(socket, agentID) { + const tsNow = +new Date(); + const eventID = `${socket.handshake.query.sessId}_${agentID}_control_${tsNow}`; + void sendAssistEvent({ + "project_id": socket.handshake.query.projectId, + "session_id": socket.handshake.query.sessId, + "agent_id": agentID, + "event_id": eventID, + "event_type": "control", + "event_state": "start", + "timestamp": tsNow, + }); + cache.set(`${socket.handshake.query.sessId}_control`, eventID) + // Debug logs + logger.debug(`s_control_started, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${+new Date()}`); +} + +function endControl(socket, agentID) { + const tsNow = +new Date(); + const eventID = cache.get(`${socket.handshake.query.sessId}_control`); + if (eventID === undefined) { + logger.debug(`have to skip s_control_ended, no eventID in the cache, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${tsNow}`); + return + } + void sendAssistEvent({ + "project_id": socket.handshake.query.projectId, + "session_id": socket.handshake.query.sessId, + "agent_id": agentID, + "event_id": eventID, + "event_type": "control", + "event_state": "end", + "timestamp": tsNow, + }); + cache.delete(`${socket.handshake.query.sessId}_control`) + // Debug logs + logger.debug(`s_control_ended, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${+new Date()}`); +} + +function startRecord(socket, agentID) { + const tsNow = +new Date(); + const eventID = `${socket.handshake.query.sessId}_${agentID}_record_${tsNow}`; + void sendAssistEvent({ + "project_id": socket.handshake.query.projectId, + "session_id": socket.handshake.query.sessId, + "agent_id": agentID, + "event_id": eventID, + "event_type": "record", + "event_state": "start", + "timestamp": tsNow, + }); + cache.set(`${socket.handshake.query.sessId}_record`, eventID) + // Debug logs + logger.debug(`s_recording_started, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${+new Date()}`); +} + +function endRecord(socket, agentID) { + const tsNow = +new Date(); + const eventID = cache.get(`${socket.sessId}_record`); + void sendAssistEvent({ + "project_id": socket.handshake.query.projectId, + "session_id": socket.handshake.query.sessId, + "agent_id": agentID, + "event_id": eventID, + "event_type": "record", + "event_state": "end", + "timestamp": tsNow, + }); + cache.delete(`${socket.handshake.query.sessId}_record`) + // Debug logs + logger.debug(`s_recording_ended, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${+new Date()}`); +} + +function handleEvent(eventName, socket, agentID) { + switch (eventName) { + case "s_call_started": { + startCall(socket, agentID); + break; + } + case "s_call_ended": { + endCall(socket, agentID); + break; + } + case "s_control_started": { + startControl(socket, agentID) + break; + } + case "s_control_ended": { + endControl(socket, agentID) + break; + } + case "s_recording_started": { + startRecord(socket, agentID); + break; + } + case "s_recording_ended": { + endRecord(socket, agentID); + break; + } + } +} + +module.exports = { + startAssist, + endAssist, + handleEvent, +} \ No newline at end of file