feat(assist): added the assist stats part to the node.js app

This commit is contained in:
Alexander 2025-05-13 13:53:40 +02:00
parent d1ed9564c2
commit 8dbf7d8893
3 changed files with 233 additions and 0 deletions

View file

@ -32,6 +32,19 @@ const localCache = {
deletedSessions: new Set()
};
const sendAssistEvent = async function (payload) {
try {
if (typeof payload !== "string") {
logger.warn("sendAssistEvent received non-string payload. Converting to string.");
payload = JSON.stringify(payload);
}
await redisClient.rpush("assist:stats", payload);
logger.debug("Assist event sent to Redis: " + payload);
} catch (error) {
logger.error(`Failed to send assist event to Redis: ${error}`);
}
};
const addSession = async function (sessionID) {
await mutex.runExclusive(() => {
localCache.addedSessions.add(sessionID);
@ -172,6 +185,7 @@ function startCacheRefresher(io) {
}
module.exports = {
sendAssistEvent,
addSession,
updateSession,
renewSession,

View file

@ -14,6 +14,11 @@ const {
const {
logger
} = require('./logger');
const {
startAssist,
endAssist,
handleEvent
} = require('./stats');
const deepMerge = require('@fastify/deepmerge')({all: true});
let io;
@ -133,6 +138,7 @@ async function onConnect(socket) {
if (socket.handshake.query.agentInfo !== undefined) {
socket.handshake.query.agentInfo = JSON.parse(socket.handshake.query.agentInfo);
socket.handshake.query.agentID = socket.handshake.query.agentInfo.id;
startAssist(socket, socket.handshake.query.agentID);
}
sendFrom(socket, socket.handshake.query.roomId, EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo);
}
@ -165,6 +171,7 @@ async function onDisconnect(socket) {
logger.debug(`${socket.id} disconnected from ${socket.handshake.query.roomId}`);
if (socket.handshake.query.identity === IDENTITIES.agent) {
endAssist(socket, socket.handshake.query.agentID);
sendFrom(socket, socket.handshake.query.roomId, EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id);
}
logger.debug("checking for number of connected agents and sessions");
@ -232,6 +239,7 @@ async function onAny(socket, eventName, ...args) {
logger.debug(`received event:${eventName}, from:${socket.handshake.query.identity}, sending message to room:${socket.handshake.query.roomId}`);
sendFrom(socket, socket.handshake.query.roomId, eventName, args[0]);
} else {
handleEvent(eventName, socket, args[0]);
logger.debug(`received event:${eventName}, from:${socket.handshake.query.identity}, sending message to session of room:${socket.handshake.query.roomId}`);
let socketId = await findSessionSocketId(socket.handshake.query.roomId, args[0]?.meta?.tabId);
if (socketId === null) {

211
ee/assist/app/stats.js Normal file
View file

@ -0,0 +1,211 @@
const {logger} = require('./logger');
const {sendAssistEvent} = require('./cache');
class InMemoryCache {
constructor() {
this.cache = new Map();
}
set(key, value) {
this.cache.set(key, value);
}
get(key) {
return this.cache.get(key);
}
delete(key) {
this.cache.delete(key);
}
clear() {
this.cache.clear();
}
}
const cache = new InMemoryCache();
function startAssist(socket, agentID) {
const tsNow = +new Date();
const eventID = `${socket.handshake.query.sessId}_${agentID}_assist_${tsNow}`;
void sendAssistEvent({
"project_id": socket.handshake.query.projectId,
"session_id": socket.handshake.query.sessId,
"agent_id": agentID,
"event_id": eventID,
"event_type": "assist",
"event_state": "start",
"timestamp": tsNow,
});
// Save uniq eventID to cache
cache.set(`${socket.handshake.query.sessId}_${agentID}_assist`, eventID);
// Debug log
logger.debug(`assist_started, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${tsNow}`);
}
function endAssist(socket, agentID) {
const eventID = cache.get(`${socket.handshake.query.sessId}_${agentID}_assist`);
if (eventID === undefined) {
logger.debug(`have to skip assist_ended, no eventID in the cache, agentID: ${socket.handshake.query.agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}`);
return
}
void sendAssistEvent({
"project_id": socket.handshake.query.projectId,
"session_id": socket.handshake.query.sessId,
"agent_id": agentID,
"event_id": eventID,
"event_type": "assist",
"event_state": "end",
"timestamp": +new Date(),
})
// Remove eventID from cache
cache.delete(`${socket.handshake.query.sessId}_${agentID}_assist`);
// Debug logs
logger.debug(`assist_ended, agentID: ${socket.handshake.query.agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}`);
}
function startCall(socket, agentID) {
const tsNow = +new Date();
const eventID = `${socket.handshake.query.sessId}_${agentID}_call_${tsNow}`;
void sendAssistEvent({
"project_id": socket.handshake.query.projectId,
"session_id": socket.handshake.query.sessId,
"agent_id": agentID,
"event_id": eventID,
"event_type": "call",
"event_state": "start",
"timestamp": tsNow,
});
// Save uniq eventID to cache
cache.set(`${socket.handshake.query.sessId}_call`, eventID);
// Debug logs
logger.debug(`s_call_started, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${tsNow}`);
}
function endCall(socket, agentID) {
const tsNow = +new Date();
const eventID = cache.get(`${socket.handshake.query.sessId}_call`);
if (eventID === undefined) {
logger.debug(`have to skip s_call_ended, no eventID in the cache, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${tsNow}`);
return
}
void sendAssistEvent({
"project_id": socket.handshake.query.projectId,
"session_id": socket.handshake.query.sessId,
"agent_id": agentID,
"event_id": eventID,
"event_type": "call",
"event_state": "end",
"timestamp": tsNow,
});
cache.delete(`${socket.handshake.query.sessId}_call`)
// Debug logs
logger.debug(`s_call_ended, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${tsNow}`);
}
function startControl(socket, agentID) {
const tsNow = +new Date();
const eventID = `${socket.handshake.query.sessId}_${agentID}_control_${tsNow}`;
void sendAssistEvent({
"project_id": socket.handshake.query.projectId,
"session_id": socket.handshake.query.sessId,
"agent_id": agentID,
"event_id": eventID,
"event_type": "control",
"event_state": "start",
"timestamp": tsNow,
});
cache.set(`${socket.handshake.query.sessId}_control`, eventID)
// Debug logs
logger.debug(`s_control_started, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${+new Date()}`);
}
function endControl(socket, agentID) {
const tsNow = +new Date();
const eventID = cache.get(`${socket.handshake.query.sessId}_control`);
if (eventID === undefined) {
logger.debug(`have to skip s_control_ended, no eventID in the cache, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${tsNow}`);
return
}
void sendAssistEvent({
"project_id": socket.handshake.query.projectId,
"session_id": socket.handshake.query.sessId,
"agent_id": agentID,
"event_id": eventID,
"event_type": "control",
"event_state": "end",
"timestamp": tsNow,
});
cache.delete(`${socket.handshake.query.sessId}_control`)
// Debug logs
logger.debug(`s_control_ended, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${+new Date()}`);
}
function startRecord(socket, agentID) {
const tsNow = +new Date();
const eventID = `${socket.handshake.query.sessId}_${agentID}_record_${tsNow}`;
void sendAssistEvent({
"project_id": socket.handshake.query.projectId,
"session_id": socket.handshake.query.sessId,
"agent_id": agentID,
"event_id": eventID,
"event_type": "record",
"event_state": "start",
"timestamp": tsNow,
});
cache.set(`${socket.handshake.query.sessId}_record`, eventID)
// Debug logs
logger.debug(`s_recording_started, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${+new Date()}`);
}
function endRecord(socket, agentID) {
const tsNow = +new Date();
const eventID = cache.get(`${socket.sessId}_record`);
void sendAssistEvent({
"project_id": socket.handshake.query.projectId,
"session_id": socket.handshake.query.sessId,
"agent_id": agentID,
"event_id": eventID,
"event_type": "record",
"event_state": "end",
"timestamp": tsNow,
});
cache.delete(`${socket.handshake.query.sessId}_record`)
// Debug logs
logger.debug(`s_recording_ended, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${+new Date()}`);
}
function handleEvent(eventName, socket, agentID) {
switch (eventName) {
case "s_call_started": {
startCall(socket, agentID);
break;
}
case "s_call_ended": {
endCall(socket, agentID);
break;
}
case "s_control_started": {
startControl(socket, agentID)
break;
}
case "s_control_ended": {
endControl(socket, agentID)
break;
}
case "s_recording_started": {
startRecord(socket, agentID);
break;
}
case "s_recording_ended": {
endRecord(socket, agentID);
break;
}
}
}
module.exports = {
startAssist,
endAssist,
handleEvent,
}