feat(assist): moved stats part to separate module
This commit is contained in:
parent
c7e5145282
commit
3c8ca0c412
2 changed files with 372 additions and 307 deletions
|
|
@ -22,8 +22,11 @@ const {
|
|||
errorHandler,
|
||||
authorizer
|
||||
} = require('../utils/assistHelper');
|
||||
|
||||
const StatsHost = process.env.STATS_HOST || 'http://assist-stats-openreplay.app.svc.cluster.local:8000/events';
|
||||
const {
|
||||
startAssist,
|
||||
endAssist,
|
||||
handleEvent
|
||||
} = require('../utils/stats');
|
||||
|
||||
const wsRouter = express.Router();
|
||||
|
||||
|
|
@ -261,53 +264,7 @@ wsRouter.get(`/sockets-live/:projectKey`, socketsLiveByProject);
|
|||
wsRouter.post(`/sockets-live/:projectKey`, socketsLiveByProject);
|
||||
wsRouter.get(`/sockets-live/:projectKey/:sessionId`, socketsLiveByProject);
|
||||
|
||||
async function postData(payload) {
|
||||
const options = {
|
||||
method: 'POST',
|
||||
body: JSON.stringify(payload),
|
||||
headers: { 'Content-Type': 'application/json' }
|
||||
}
|
||||
|
||||
try {
|
||||
const response = await fetch(StatsHost, options)
|
||||
const jsonResponse = await response.json();
|
||||
console.log('JSON response', JSON.stringify(jsonResponse, null, 4))
|
||||
} catch(err) {
|
||||
console.log('ERROR', err);
|
||||
}
|
||||
}
|
||||
|
||||
class InMemoryCache {
|
||||
constructor() {
|
||||
this.cache = new Map();
|
||||
}
|
||||
|
||||
set(key, value) {
|
||||
this.cache.set(key, value);
|
||||
}
|
||||
|
||||
get(key) {
|
||||
return this.cache.get(key);
|
||||
}
|
||||
|
||||
delete(key) {
|
||||
this.cache.delete(key);
|
||||
}
|
||||
|
||||
clear() {
|
||||
this.cache.clear();
|
||||
}
|
||||
}
|
||||
|
||||
// Usage:
|
||||
const cache = new InMemoryCache();
|
||||
|
||||
module.exports = {
|
||||
wsRouter,
|
||||
start: (server, prefix) => {
|
||||
createSocketIOServer(server, prefix);
|
||||
io.use(async (socket, next) => await authorizer.check(socket, next));
|
||||
io.on('connection', async (socket) => {
|
||||
async function onConnect(socket) {
|
||||
socket.on(EVENTS_DEFINITION.listen.ERROR, err => errorHandler(EVENTS_DEFINITION.listen.ERROR, err));
|
||||
debug && console.log(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`);
|
||||
socket._connectedAt = new Date();
|
||||
|
|
@ -360,49 +317,33 @@ module.exports = {
|
|||
if (socket.identity === IDENTITIES.agent) {
|
||||
if (socket.handshake.query.agentInfo !== undefined) {
|
||||
socket.handshake.query.agentInfo = JSON.parse(socket.handshake.query.agentInfo);
|
||||
const tsNow = +new Date();
|
||||
const agentID = socket.handshake.query.agentInfo.id;
|
||||
socket.agentID = agentID;
|
||||
const eventID = `${socket.sessId}_${agentID}_assist_${tsNow}`;
|
||||
void postData({
|
||||
"project_id": socket.projectId,
|
||||
"session_id": socket.sessId,
|
||||
"agent_id": agentID,
|
||||
"event_id": eventID,
|
||||
"event_type": "assist",
|
||||
"event_state": "start",
|
||||
"timestamp": tsNow,
|
||||
});
|
||||
// Save uniq eventID to cache
|
||||
cache.set(`${socket.sessId}_${agentID}_assist`, eventID);
|
||||
// Debug logs
|
||||
console.log(`assist_started, agentID: ${agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${tsNow}`);
|
||||
socket.agentID = socket.handshake.query.agentInfo.id;
|
||||
// Stats
|
||||
startAssist(socket, socket.agentID);
|
||||
}
|
||||
socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NEW_AGENT, socket.id, socket.handshake.query.agentInfo);
|
||||
}
|
||||
|
||||
socket.on('disconnect', async () => {
|
||||
// Set disconnect handler
|
||||
socket.on('disconnect', () => onDisconnect(socket));
|
||||
|
||||
//
|
||||
socket.on(EVENTS_DEFINITION.listen.UPDATE_EVENT, (...args) => onUpdateEvent(socket, ...args));
|
||||
|
||||
//
|
||||
socket.on(EVENTS_DEFINITION.listen.CONNECT_ERROR, err => errorHandler(EVENTS_DEFINITION.listen.CONNECT_ERROR, err));
|
||||
socket.on(EVENTS_DEFINITION.listen.CONNECT_FAILED, err => errorHandler(EVENTS_DEFINITION.listen.CONNECT_FAILED, err));
|
||||
|
||||
//
|
||||
socket.onAny((eventName, ...args) => onAny(socket, eventName, ...args));
|
||||
}
|
||||
|
||||
async function onDisconnect(socket) {
|
||||
debug && console.log(`${socket.id} disconnected from ${socket.roomId}`);
|
||||
if (socket.identity === IDENTITIES.agent) {
|
||||
socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id);
|
||||
const eventID = cache.get(`${socket.sessId}_${socket.agentID}_assist`);
|
||||
if (eventID === undefined) {
|
||||
console.log(`have to skip assist_ended, no eventID in the cache, agentID: ${socket.agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}`);
|
||||
} else {
|
||||
void postData({
|
||||
"project_id": socket.projectId,
|
||||
"session_id": socket.sessId,
|
||||
"agent_id": socket.agentID,
|
||||
"event_id": eventID,
|
||||
"event_type": "assist",
|
||||
"event_state": "end",
|
||||
"timestamp": +new Date(),
|
||||
})
|
||||
// Remove eventID from cache
|
||||
cache.delete(`${socket.sessId}_${socket.agentID}_assist`);
|
||||
// Debug logs
|
||||
console.log(`assist_ended, agentID: ${socket.agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}`);
|
||||
}
|
||||
// Stats
|
||||
endAssist(socket, socket.agentID);
|
||||
}
|
||||
debug && console.log("checking for number of connected agents and sessions");
|
||||
let {c_sessions, c_agents} = await sessions_agents_count(io, socket);
|
||||
|
|
@ -417,9 +358,9 @@ module.exports = {
|
|||
debug && console.log(`notifying everyone in ${socket.roomId} about no AGENTS`);
|
||||
socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_AGENTS);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
socket.on(EVENTS_DEFINITION.listen.UPDATE_EVENT, async (...args) => {
|
||||
async function onUpdateEvent(socket, ...args) {
|
||||
debug && console.log(`${socket.id} sent update event.`);
|
||||
if (socket.identity !== IDENTITIES.session) {
|
||||
debug && console.log('Ignoring update event.');
|
||||
|
|
@ -443,12 +384,9 @@ module.exports = {
|
|||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
socket.on(EVENTS_DEFINITION.listen.CONNECT_ERROR, err => errorHandler(EVENTS_DEFINITION.listen.CONNECT_ERROR, err));
|
||||
socket.on(EVENTS_DEFINITION.listen.CONNECT_FAILED, err => errorHandler(EVENTS_DEFINITION.listen.CONNECT_FAILED, err));
|
||||
|
||||
socket.onAny(async (eventName, ...args) => {
|
||||
async function onAny(socket, eventName, ...args) {
|
||||
if (Object.values(EVENTS_DEFINITION.listen).indexOf(eventName) >= 0) {
|
||||
debug && console.log(`received event:${eventName}, should be handled by another listener, stopping onAny.`);
|
||||
return
|
||||
|
|
@ -462,114 +400,7 @@ module.exports = {
|
|||
// TODO: emit message to all agents in the room (except tabs)
|
||||
socket.to(socket.roomId).emit(eventName, args[0]);
|
||||
} else {
|
||||
const tsNow = +new Date();
|
||||
switch (eventName) {
|
||||
case "s_call_started": {
|
||||
const eventID = `${socket.sessId}_${args[0]}_call_${tsNow}`;
|
||||
void postData({
|
||||
"project_id": socket.projectId,
|
||||
"session_id": socket.sessId,
|
||||
"agent_id": args[0],
|
||||
"event_id": eventID,
|
||||
"event_type": "call",
|
||||
"event_state": "start",
|
||||
"timestamp": tsNow,
|
||||
});
|
||||
// Save uniq eventID to cache
|
||||
cache.set(`${socket.sessId}_call`, eventID);
|
||||
// Debug logs
|
||||
console.log(`s_call_started, agentID: ${args[0]}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${tsNow}`);
|
||||
break;
|
||||
}
|
||||
case "s_call_ended": {
|
||||
const eventID = cache.get(`${socket.sessId}_call`);
|
||||
if (eventID === undefined) {
|
||||
console.log(`have to skip s_call_ended, no eventID in the cache, agentID: ${args[0]}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${tsNow}`);
|
||||
break;
|
||||
}
|
||||
void postData({
|
||||
"project_id": socket.projectId,
|
||||
"session_id": socket.sessId,
|
||||
"agent_id": args[0],
|
||||
"event_id": eventID,
|
||||
"event_type": "call",
|
||||
"event_state": "end",
|
||||
"timestamp": tsNow,
|
||||
});
|
||||
cache.delete(`${socket.sessId}_call`)
|
||||
// Debug logs
|
||||
console.log(`s_call_ended, agentID: ${args[0]}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${tsNow}`);
|
||||
break;
|
||||
}
|
||||
case "s_control_started": {
|
||||
const eventID = `${socket.sessId}_${args[0]}_control_${tsNow}`;
|
||||
void postData({
|
||||
"project_id": socket.projectId,
|
||||
"session_id": socket.sessId,
|
||||
"agent_id": args[0],
|
||||
"event_id": eventID,
|
||||
"event_type": "control",
|
||||
"event_state": "start",
|
||||
"timestamp": tsNow,
|
||||
});
|
||||
cache.set(`${socket.sessId}_control`, eventID)
|
||||
// Debug logs
|
||||
console.log(`s_control_started, agentID: ${args[0]}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${+new Date()}`);
|
||||
break;
|
||||
}
|
||||
case "s_control_ended": {
|
||||
const eventID = cache.get(`${socket.sessId}_control`);
|
||||
if (eventID === undefined) {
|
||||
console.log(`have to skip s_control_ended, no eventID in the cache, agentID: ${args[0]}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${tsNow}`);
|
||||
break;
|
||||
}
|
||||
void postData({
|
||||
"project_id": socket.projectId,
|
||||
"session_id": socket.sessId,
|
||||
"agent_id": args[0],
|
||||
"event_id": eventID,
|
||||
"event_type": "control",
|
||||
"event_state": "end",
|
||||
"timestamp": tsNow,
|
||||
});
|
||||
cache.delete(`${socket.sessId}_control`)
|
||||
// Debug logs
|
||||
console.log(`s_control_ended, agentID: ${args[0]}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${+new Date()}`);
|
||||
break;
|
||||
}
|
||||
case "s_recording_started": {
|
||||
const eventID = `${socket.sessId}_${args[0]}_record_${tsNow}`;
|
||||
void postData({
|
||||
"project_id": socket.projectId,
|
||||
"session_id": socket.sessId,
|
||||
"agent_id": args[0],
|
||||
"event_id": eventID,
|
||||
"event_type": "record",
|
||||
"event_state": "start",
|
||||
"timestamp": tsNow,
|
||||
});
|
||||
cache.set(`${socket.sessId}_record`, eventID)
|
||||
// Debug logs
|
||||
console.log(`s_recording_started, agentID: ${args[0]}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${+new Date()}`);
|
||||
break;
|
||||
}
|
||||
case "s_recording_ended": {
|
||||
const eventID = cache.get(`${socket.sessId}_record`);
|
||||
void postData({
|
||||
"project_id": socket.projectId,
|
||||
"session_id": socket.sessId,
|
||||
"agent_id": args[0],
|
||||
"event_id": eventID,
|
||||
"event_type": "record",
|
||||
"event_state": "end",
|
||||
"timestamp": tsNow,
|
||||
});
|
||||
cache.delete(`${socket.sessId}_record`)
|
||||
// Debug logs
|
||||
console.log(`s_recording_ended, agentID: ${args[0]}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${+new Date()}`);
|
||||
break;
|
||||
}
|
||||
}
|
||||
handleEvent(eventName, socket, args[0]);
|
||||
debug && console.log(`received event:${eventName}, from:${socket.identity}, sending message to session of room:${socket.roomId}`);
|
||||
let socketId = await findSessionSocketId(io, socket.roomId, args[0]?.meta?.tabId);
|
||||
if (socketId === null) {
|
||||
|
|
@ -580,9 +411,15 @@ module.exports = {
|
|||
io.to(socketId).emit(eventName, socket.id, args[0]);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
wsRouter,
|
||||
start: (server, prefix) => {
|
||||
createSocketIOServer(server, prefix);
|
||||
io.use(async (socket, next) => await authorizer.check(socket, next));
|
||||
io.on('connection', (socket) => onConnect(socket));
|
||||
|
||||
});
|
||||
console.log("WS server started");
|
||||
setInterval(async (io) => {
|
||||
try {
|
||||
|
|
|
|||
228
assist/utils/stats.js
Normal file
228
assist/utils/stats.js
Normal file
|
|
@ -0,0 +1,228 @@
|
|||
const StatsHost = process.env.STATS_HOST || 'http://assist-stats-openreplay.app.svc.cluster.local:8000/events';
|
||||
|
||||
async function postData(payload) {
|
||||
const options = {
|
||||
method: 'POST',
|
||||
body: JSON.stringify(payload),
|
||||
headers: { 'Content-Type': 'application/json' }
|
||||
}
|
||||
|
||||
try {
|
||||
const response = await fetch(StatsHost, options)
|
||||
const jsonResponse = await response.json();
|
||||
console.log('JSON response', JSON.stringify(jsonResponse, null, 4))
|
||||
} catch(err) {
|
||||
console.log('ERROR', err);
|
||||
}
|
||||
}
|
||||
|
||||
class InMemoryCache {
|
||||
constructor() {
|
||||
this.cache = new Map();
|
||||
}
|
||||
|
||||
set(key, value) {
|
||||
this.cache.set(key, value);
|
||||
}
|
||||
|
||||
get(key) {
|
||||
return this.cache.get(key);
|
||||
}
|
||||
|
||||
delete(key) {
|
||||
this.cache.delete(key);
|
||||
}
|
||||
|
||||
clear() {
|
||||
this.cache.clear();
|
||||
}
|
||||
}
|
||||
|
||||
const cache = new InMemoryCache();
|
||||
|
||||
const debug = process.env.debug === "1";
|
||||
|
||||
function startAssist(socket, agentID) {
|
||||
const tsNow = +new Date();
|
||||
const eventID = `${socket.sessId}_${agentID}_assist_${tsNow}`;
|
||||
void postData({
|
||||
"project_id": socket.projectId,
|
||||
"session_id": socket.sessId,
|
||||
"agent_id": agentID,
|
||||
"event_id": eventID,
|
||||
"event_type": "assist",
|
||||
"event_state": "start",
|
||||
"timestamp": tsNow,
|
||||
});
|
||||
// Save uniq eventID to cache
|
||||
cache.set(`${socket.sessId}_${agentID}_assist`, eventID);
|
||||
// Debug log
|
||||
debug && console.log(`assist_started, agentID: ${agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${tsNow}`);
|
||||
}
|
||||
|
||||
function endAssist(socket, agentID) {
|
||||
const eventID = cache.get(`${socket.sessId}_${agentID}_assist`);
|
||||
if (eventID === undefined) {
|
||||
debug && console.log(`have to skip assist_ended, no eventID in the cache, agentID: ${socket.agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}`);
|
||||
return
|
||||
}
|
||||
void postData({
|
||||
"project_id": socket.projectId,
|
||||
"session_id": socket.sessId,
|
||||
"agent_id": agentID,
|
||||
"event_id": eventID,
|
||||
"event_type": "assist",
|
||||
"event_state": "end",
|
||||
"timestamp": +new Date(),
|
||||
})
|
||||
// Remove eventID from cache
|
||||
cache.delete(`${socket.sessId}_${agentID}_assist`);
|
||||
// Debug logs
|
||||
debug && console.log(`assist_ended, agentID: ${socket.agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}`);
|
||||
}
|
||||
|
||||
function startCall(socket, agentID) {
|
||||
const tsNow = +new Date();
|
||||
const eventID = `${socket.sessId}_${agentID}_call_${tsNow}`;
|
||||
void postData({
|
||||
"project_id": socket.projectId,
|
||||
"session_id": socket.sessId,
|
||||
"agent_id": agentID,
|
||||
"event_id": eventID,
|
||||
"event_type": "call",
|
||||
"event_state": "start",
|
||||
"timestamp": tsNow,
|
||||
});
|
||||
// Save uniq eventID to cache
|
||||
cache.set(`${socket.sessId}_call`, eventID);
|
||||
// Debug logs
|
||||
console.log(`s_call_started, agentID: ${agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${tsNow}`);
|
||||
}
|
||||
|
||||
function endCall(socket, agentID) {
|
||||
const tsNow = +new Date();
|
||||
const eventID = cache.get(`${socket.sessId}_call`);
|
||||
if (eventID === undefined) {
|
||||
console.log(`have to skip s_call_ended, no eventID in the cache, agentID: ${agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${tsNow}`);
|
||||
return
|
||||
}
|
||||
void postData({
|
||||
"project_id": socket.projectId,
|
||||
"session_id": socket.sessId,
|
||||
"agent_id": agentID,
|
||||
"event_id": eventID,
|
||||
"event_type": "call",
|
||||
"event_state": "end",
|
||||
"timestamp": tsNow,
|
||||
});
|
||||
cache.delete(`${socket.sessId}_call`)
|
||||
// Debug logs
|
||||
console.log(`s_call_ended, agentID: ${agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${tsNow}`);
|
||||
}
|
||||
|
||||
function startControl(socket, agentID) {
|
||||
const tsNow = +new Date();
|
||||
const eventID = `${socket.sessId}_${agentID}_control_${tsNow}`;
|
||||
void postData({
|
||||
"project_id": socket.projectId,
|
||||
"session_id": socket.sessId,
|
||||
"agent_id": agentID,
|
||||
"event_id": eventID,
|
||||
"event_type": "control",
|
||||
"event_state": "start",
|
||||
"timestamp": tsNow,
|
||||
});
|
||||
cache.set(`${socket.sessId}_control`, eventID)
|
||||
// Debug logs
|
||||
console.log(`s_control_started, agentID: ${agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${+new Date()}`);
|
||||
}
|
||||
|
||||
function endControl(socket, agentID) {
|
||||
const tsNow = +new Date();
|
||||
const eventID = cache.get(`${socket.sessId}_control`);
|
||||
if (eventID === undefined) {
|
||||
console.log(`have to skip s_control_ended, no eventID in the cache, agentID: ${agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${tsNow}`);
|
||||
return
|
||||
}
|
||||
void postData({
|
||||
"project_id": socket.projectId,
|
||||
"session_id": socket.sessId,
|
||||
"agent_id": agentID,
|
||||
"event_id": eventID,
|
||||
"event_type": "control",
|
||||
"event_state": "end",
|
||||
"timestamp": tsNow,
|
||||
});
|
||||
cache.delete(`${socket.sessId}_control`)
|
||||
// Debug logs
|
||||
console.log(`s_control_ended, agentID: ${agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${+new Date()}`);
|
||||
}
|
||||
|
||||
function startRecord(socket, agentID) {
|
||||
const tsNow = +new Date();
|
||||
const eventID = `${socket.sessId}_${agentID}_record_${tsNow}`;
|
||||
void postData({
|
||||
"project_id": socket.projectId,
|
||||
"session_id": socket.sessId,
|
||||
"agent_id": agentID,
|
||||
"event_id": eventID,
|
||||
"event_type": "record",
|
||||
"event_state": "start",
|
||||
"timestamp": tsNow,
|
||||
});
|
||||
cache.set(`${socket.sessId}_record`, eventID)
|
||||
// Debug logs
|
||||
console.log(`s_recording_started, agentID: ${agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${+new Date()}`);
|
||||
}
|
||||
|
||||
function endRecord(socket, agentID) {
|
||||
const tsNow = +new Date();
|
||||
const eventID = cache.get(`${socket.sessId}_record`);
|
||||
void postData({
|
||||
"project_id": socket.projectId,
|
||||
"session_id": socket.sessId,
|
||||
"agent_id": agentID,
|
||||
"event_id": eventID,
|
||||
"event_type": "record",
|
||||
"event_state": "end",
|
||||
"timestamp": tsNow,
|
||||
});
|
||||
cache.delete(`${socket.sessId}_record`)
|
||||
// Debug logs
|
||||
console.log(`s_recording_ended, agentID: ${agentID}, sessID: ${socket.sessId}, projID: ${socket.projectId}, time: ${+new Date()}`);
|
||||
}
|
||||
|
||||
function handleEvent(eventName, socket, agentID) {
|
||||
switch (eventName) {
|
||||
case "s_call_started": {
|
||||
startCall(socket, agentID);
|
||||
break;
|
||||
}
|
||||
case "s_call_ended": {
|
||||
endCall(socket, agentID);
|
||||
break;
|
||||
}
|
||||
case "s_control_started": {
|
||||
startControl(socket, agentID)
|
||||
break;
|
||||
}
|
||||
case "s_control_ended": {
|
||||
endControl(socket, agentID)
|
||||
break;
|
||||
}
|
||||
case "s_recording_started": {
|
||||
startRecord(socket, agentID);
|
||||
break;
|
||||
}
|
||||
case "s_recording_ended": {
|
||||
endRecord(socket, agentID);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
startAssist,
|
||||
endAssist,
|
||||
handleEvent,
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue