feat(assist-server): removed old assist and moved assist-server into ee/assist/

This commit is contained in:
Alexander 2025-04-18 16:32:34 +02:00
parent 985ce2812c
commit 620fc05d6c
24 changed files with 721 additions and 3078 deletions

View file

@ -1,5 +0,0 @@
.idea
node_modules
npm-debug.log
.cache
*.mmdb

View file

@ -1,24 +0,0 @@
ARG ARCH=amd64
FROM --platform=linux/$ARCH node:23-alpine
LABEL Maintainer="Zavorotynskiy Alexander <zavorotynskiy@pm.me>"
RUN apk add --no-cache tini git libc6-compat
ARG envarg
ENV ENTERPRISE_BUILD=${envarg} \
MAXMINDDB_FILE=/home/openreplay/geoip.mmdb \
PRIVATE_ENDPOINTS=false \
LISTEN_PORT=9001 \
ERROR=1 \
NODE_ENV=production
WORKDIR /work
COPY package.json .
COPY package-lock.json .
RUN npm install
COPY . .
RUN adduser -u 1001 openreplay -D
USER 1001
ADD --chown=1001 https://static.openreplay.com/geoip/GeoLite2-City.mmdb $MAXMINDDB_FILE
ENTRYPOINT ["/sbin/tini", "--"]
CMD npm start

File diff suppressed because it is too large Load diff

View file

@ -1,26 +0,0 @@
{
"name": "assist-server",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"@fastify/deepmerge": "^3.0.0",
"@maxmind/geoip2-node": "^6.0.0",
"async-mutex": "^0.5.0",
"express": "^4.21.2",
"ioredis": "^5.6.1",
"jsonwebtoken": "^9.0.2",
"redis": "^4.7.0",
"socket.io": "^4.8.1",
"socket.io-client": "^4.8.1",
"ua-parser-js": "^2.0.3",
"uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.51.0",
"winston": "^3.17.0"
}
}

View file

@ -1,64 +0,0 @@
const { App } = require('uWebSockets.js');
const { Server } = require('socket.io');
const { logger } = require("./app/logger");
const { authorizer } = require("./app/assist");
const { onConnect, setSocketIOServer } = require("./app/socket");
const { startCacheRefresher } = require("./app/cache");
const app = App();
const pingInterval = parseInt(process.env.PING_INTERVAL) || 25000;
const getCompressionConfig = function () {
// WS: The theoretical overhead per socket is 19KB (11KB for compressor and 8KB for decompressor)
let perMessageDeflate = false;
if (process.env.COMPRESSION === "true") {
logger.info(`WS compression: enabled`);
perMessageDeflate = {
zlibDeflateOptions: {
windowBits: 10,
memLevel: 1
},
zlibInflateOptions: {
windowBits: 10
}
}
} else {
logger.info(`WS compression: disabled`);
}
return {
perMessageDeflate: perMessageDeflate,
clientNoContextTakeover: true
};
}
const io = new Server({
maxHttpBufferSize: (parseFloat(process.env.maxHttpBufferSize) || 5) * 1e6,
pingInterval: pingInterval, // Will use it for cache invalidation
cors: {
origin: "*", // Allow connections from any origin (for development)
methods: ["GET", "POST"],
credentials: true
},
path: '/socket',
...getCompressionConfig()
});
io.use(async (socket, next) => await authorizer.check(socket, next));
io.on('connection', (socket) => onConnect(socket));
io.attachApp(app);
setSocketIOServer(io);
const HOST = process.env.LISTEN_HOST || '0.0.0.0';
const PORT = parseInt(process.env.PORT) || 9001;
app.listen(PORT, (token) => {
if (token) {
console.log(`Server running at http://${HOST}:${PORT}`);
} else {
console.log(`Failed to listen on port ${PORT}`);
}
});
startCacheRefresher(io);
process.on('uncaughtException', err => {
logger.error(`Uncaught Exception: ${err}`);
});

16
ee/assist/.gitignore vendored
View file

@ -2,20 +2,4 @@
node_modules
npm-debug.log
.cache
test.html
build.sh
servers/peerjs-server.js
servers/sourcemaps-handler.js
servers/sourcemaps-server.js
/utils/geoIP.js
/utils/health.js
/utils/HeapSnapshot.js
/utils/helper.js
/utils/assistHelper.js
/utils/httpHandlers.js
/utils/socketHandlers.js
.local
*.mmdb

View file

@ -1,11 +1,8 @@
#ARCH can be amd64 or arm64
ARG ARCH=amd64
FROM --platform=linux/$ARCH node:23-alpine
LABEL Maintainer="KRAIEM Taha Yassine<tahayk2@gmail.com>"
LABEL Maintainer="Zavorotynskiy Alexander <zavorotynskiy@pm.me>"
RUN apk add --no-cache tini git libc6-compat
# && ln -s /lib/libc.musl-x86_64.so.1 /lib/ld-linux-x86-64.so.2
ARG envarg
ENV ENTERPRISE_BUILD=${envarg} \
MAXMINDDB_FILE=/home/openreplay/geoip.mmdb \

View file

@ -1,14 +0,0 @@
rm -rf ./utils/assistHelper.js
rm -rf ./utils/geoIP.js
rm -rf ./utils/health.js
rm -rf ./utils/HeapSnapshot.js
rm -rf ./utils/helper.js
rm -rf ./utils/httpHandlers.js
rm -rf ./utils/logger.js
rm -rf ./utils/metrics.js
rm -rf ./utils/socketHandlers.js
rm -rf servers/peerjs-server.js
rm -rf servers/sourcemaps-handler.js
rm -rf servers/sourcemaps-server.js
rm -rf build.sh

File diff suppressed because it is too large Load diff

View file

@ -1,33 +1,26 @@
{
"name": "assist-server",
"version": "v1.22.0-ee",
"description": "assist server to get live sessions & sourcemaps reader to get stack trace",
"main": "peerjs-server.js",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"start": "node server.js"
"test": "echo \"Error: no test specified\" && exit 1"
},
"repository": {
"type": "git",
"url": "git+https://github.com/openreplay/openreplay.git"
},
"author": "KRAIEM Taha Yassine <tahayk2@gmail.com>",
"license": "Elastic License 2.0 (ELv2)",
"bugs": {
"url": "https://github.com/openreplay/openreplay/issues"
},
"homepage": "https://github.com/openreplay/openreplay#readme",
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"@fastify/deepmerge": "^2.0.1",
"@maxmind/geoip2-node": "^4.2.0",
"@socket.io/redis-adapter": "^8.2.1",
"express": "^4.21.1",
"@fastify/deepmerge": "^3.0.0",
"@maxmind/geoip2-node": "^6.0.0",
"async-mutex": "^0.5.0",
"express": "^4.21.2",
"ioredis": "^5.6.1",
"jsonwebtoken": "^9.0.2",
"prom-client": "^15.0.0",
"redis": "^4.6.10",
"socket.io": "^4.8.0",
"ua-parser-js": "^1.0.37",
"redis": "^4.7.0",
"socket.io": "^4.8.1",
"socket.io-client": "^4.8.1",
"ua-parser-js": "^2.0.3",
"uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.51.0",
"winston": "^3.13.0"
"winston": "^3.17.0"
}
}

View file

@ -1,2 +0,0 @@
#!/bin/bash
rsync -avr --exclude=".*" --exclude="node_modules" --ignore-existing ../../assist/* ./

View file

@ -1,6 +0,0 @@
#!/bin/bash
set -a
source .env
set +a
npm start

View file

@ -1,119 +1,64 @@
const dumps = require('./utils/HeapSnapshot');
const {request_logger} = require('./utils/helper');
const express = require('express');
const health = require("./utils/health");
const assert = require('assert').strict;
const register = require('./utils/metrics').register;
let socket;
if (process.env.redis === "true") {
socket = require("./servers/websocket-cluster");
} else {
socket = require("./servers/websocket");
}
const {logger} = require('./utils/logger');
const { App } = require('uWebSockets.js');
const { Server } = require('socket.io');
const { logger } = require("./app/logger");
const { authorizer } = require("./app/assist");
const { onConnect, setSocketIOServer } = require("./app/socket");
const { startCacheRefresher } = require("./app/cache");
health.healthApp.get('/metrics', async (req, res) => {
try {
res.set('Content-Type', register.contentType);
res.end(await register.metrics());
} catch (ex) {
res.status(500).end(ex);
const app = App();
const pingInterval = parseInt(process.env.PING_INTERVAL) || 25000;
const getCompressionConfig = function () {
// WS: The theoretical overhead per socket is 19KB (11KB for compressor and 8KB for decompressor)
let perMessageDeflate = false;
if (process.env.COMPRESSION === "true") {
logger.info(`WS compression: enabled`);
perMessageDeflate = {
zlibDeflateOptions: {
windowBits: 10,
memLevel: 1
},
zlibInflateOptions: {
windowBits: 10
}
}
} else {
logger.info(`WS compression: disabled`);
}
return {
perMessageDeflate: perMessageDeflate,
clientNoContextTakeover: true
};
}
const io = new Server({
maxHttpBufferSize: (parseFloat(process.env.maxHttpBufferSize) || 5) * 1e6,
pingInterval: pingInterval, // Will use it for cache invalidation
cors: {
origin: "*", // Allow connections from any origin (for development)
methods: ["GET", "POST"],
credentials: true
},
path: '/socket',
...getCompressionConfig()
});
io.use(async (socket, next) => await authorizer.check(socket, next));
io.on('connection', (socket) => onConnect(socket));
io.attachApp(app);
setSocketIOServer(io);
const HOST = process.env.LISTEN_HOST || '0.0.0.0';
const PORT = process.env.LISTEN_PORT || 9001;
assert.ok(process.env.ASSIST_KEY, 'The "ASSIST_KEY" environment variable is required');
const P_KEY = process.env.ASSIST_KEY;
const PREFIX = process.env.PREFIX || process.env.prefix || `/assist`;
const heapdump = process.env.heapdump === "1";
if (process.env.uws !== "true") {
let wsapp = express();
wsapp.use(express.json());
wsapp.use(express.urlencoded({extended: true}));
wsapp.use(request_logger("[wsapp]"));
wsapp.get(['/', PREFIX, `${PREFIX}/`, `${PREFIX}/${P_KEY}`, `${PREFIX}/${P_KEY}/`], (req, res) => {
res.statusCode = 200;
res.end("ok!");
}
);
heapdump && wsapp.use(`${PREFIX}/${P_KEY}/heapdump`, dumps.router);
wsapp.use(`${PREFIX}/${P_KEY}`, socket.wsRouter);
wsapp.enable('trust proxy');
const wsserver = wsapp.listen(PORT, HOST, () => {
logger.info(`WS App listening on http://${HOST}:${PORT}`);
health.healthApp.listen(health.PORT, HOST, health.listen_cb);
});
socket.start(wsserver);
module.exports = {wsserver};
} else {
logger.info("Using uWebSocket");
const {App} = require("uWebSockets.js");
const uapp = new App();
const healthFn = (res, req) => {
res.writeStatus('200 OK').end('ok!');
const PORT = parseInt(process.env.PORT) || 9001;
app.listen(PORT, (token) => {
if (token) {
console.log(`Server running at http://${HOST}:${PORT}`);
} else {
console.log(`Failed to listen on port ${PORT}`);
}
uapp.get('/', healthFn);
uapp.get(PREFIX, healthFn);
uapp.get(`${PREFIX}/`, healthFn);
uapp.get(`${PREFIX}/${P_KEY}`, healthFn);
uapp.get(`${PREFIX}/${P_KEY}/`, healthFn);
});
startCacheRefresher(io);
/* Either onAborted or simply finished request */
const onAbortedOrFinishedResponse = function (res) {
if (res.id === -1) {
logger.debug("ERROR! onAbortedOrFinishedResponse called twice for the same res!");
} else {
logger.debug('Stream was closed');
}
/* Mark this response already accounted for */
res.id = -1;
}
const uWrapper = function (fn) {
return (res, req) => {
res.id = 1;
res.aborted = false;
req.startTs = performance.now(); // track request's start timestamp
req.method = req.getMethod();
res.onAborted(() => {
res.aborted = true;
onAbortedOrFinishedResponse(res);
});
return fn(req, res);
}
}
uapp.get(`${PREFIX}/${P_KEY}/sockets-list/:projectKey/autocomplete`, uWrapper(socket.handlers.autocomplete));
uapp.get(`${PREFIX}/${P_KEY}/sockets-list/:projectKey/:sessionId`, uWrapper(socket.handlers.socketsListByProject));
uapp.get(`${PREFIX}/${P_KEY}/sockets-live/:projectKey/autocomplete`, uWrapper(socket.handlers.autocomplete));
uapp.get(`${PREFIX}/${P_KEY}/sockets-live/:projectKey`, uWrapper(socket.handlers.socketsLiveByProject));
uapp.post(`${PREFIX}/${P_KEY}/sockets-live/:projectKey`, uWrapper(socket.handlers.socketsLiveByProject));
uapp.get(`${PREFIX}/${P_KEY}/sockets-live/:projectKey/:sessionId`, uWrapper(socket.handlers.socketsLiveBySession));
socket.start(uapp);
uapp.listen(HOST, PORT, (token) => {
if (!token) {
logger.error("port already in use");
}
logger.info(`WS App listening on http://${HOST}:${PORT}`);
health.healthApp.listen(health.PORT, HOST, health.listen_cb);
});
process.on('uncaughtException', err => {
logger.error(`Uncaught Exception: ${err}`);
});
module.exports = {uapp};
}
process.on('uncaughtException', err => {
logger.error(`Uncaught Exception: ${err}`);
});

View file

@ -1,64 +0,0 @@
const express = require('express');
const {
socketConnexionTimeout,
authorizer
} = require('../utils/assistHelper');
const {
createSocketIOServer
} = require('../utils/wsServer');
const {
onConnect
} = require('../utils/socketHandlers');
const {
socketsListByProject,
socketsLiveByProject,
socketsLiveBySession,
autocomplete
} = require('../utils/httpHandlers');
const {logger} = require('../utils/logger');
const {createAdapter} = require("@socket.io/redis-adapter");
const {createClient} = require("redis");
const REDIS_URL = (process.env.REDIS_URL || "localhost:6379").replace(/((^\w+:|^)\/\/|^)/, 'redis://');
const pubClient = createClient({url: REDIS_URL});
const subClient = pubClient.duplicate();
logger.info(`Using Redis: ${REDIS_URL}`);
const wsRouter = express.Router();
wsRouter.get(`/sockets-list/:projectKey/autocomplete`, autocomplete); // autocomplete
wsRouter.get(`/sockets-list/:projectKey/:sessionId`, socketsListByProject); // is_live
wsRouter.get(`/sockets-live/:projectKey/autocomplete`, autocomplete); // not using
wsRouter.get(`/sockets-live/:projectKey`, socketsLiveByProject);
wsRouter.post(`/sockets-live/:projectKey`, socketsLiveByProject); // assist search
wsRouter.get(`/sockets-live/:projectKey/:sessionId`, socketsLiveBySession); // session_exists, get_live_session_by_id
let io;
module.exports = {
wsRouter,
start: (server, prefix) => {
io = createSocketIOServer(server, prefix);
io.use(async (socket, next) => await authorizer.check(socket, next));
io.on('connection', (socket) => onConnect(socket));
logger.info("WS server started");
socketConnexionTimeout(io);
Promise.all([pubClient.connect(), subClient.connect()])
.then(() => {
io.adapter(createAdapter(pubClient, subClient,
{requestsTimeout: process.env.REDIS_REQUESTS_TIMEOUT || 5000}));
logger.info("> redis connected.");
})
.catch((err) => {
logger.error(`redis connection error: ${err}`);
process.exit(2);
});
},
handlers: {
socketsListByProject,
socketsLiveByProject,
socketsLiveBySession,
autocomplete
}
};

View file

@ -1,45 +0,0 @@
const express = require('express');
const {
socketConnexionTimeout,
authorizer
} = require('../utils/assistHelper');
const {
createSocketIOServer
} = require('../utils/wsServer');
const {
onConnect
} = require('../utils/socketHandlers');
const {
socketsListByProject,
socketsLiveByProject,
socketsLiveBySession,
autocomplete
} = require('../utils/httpHandlers');
const {logger} = require('../utils/logger');
const wsRouter = express.Router();
wsRouter.get(`/sockets-list/:projectKey/autocomplete`, autocomplete); // autocomplete
wsRouter.get(`/sockets-list/:projectKey/:sessionId`, socketsListByProject); // is_live
wsRouter.get(`/sockets-live/:projectKey/autocomplete`, autocomplete); // not using
wsRouter.get(`/sockets-live/:projectKey`, socketsLiveByProject);
wsRouter.post(`/sockets-live/:projectKey`, socketsLiveByProject); // assist search
wsRouter.get(`/sockets-live/:projectKey/:sessionId`, socketsLiveBySession); // session_exists, get_live_session_by_id
let io;
module.exports = {
wsRouter,
start: (server, prefix) => {
io = createSocketIOServer(server, prefix);
io.use(async (socket, next) => await authorizer.check(socket, next));
io.on('connection', (socket) => onConnect(socket));
logger.info("WS server started");
socketConnexionTimeout(io);
},
handlers: {
socketsListByProject,
socketsLiveByProject,
socketsLiveBySession,
autocomplete
}
};

View file

@ -1,13 +0,0 @@
const {
extractProjectKeyFromRequest,
extractSessionIdFromRequest,
extractPayloadFromRequest,
getAvailableRooms
} = require('../utils/helper-ee');
module.exports = {
extractProjectKeyFromRequest,
extractSessionIdFromRequest,
extractPayloadFromRequest,
getAvailableRooms
}

View file

@ -1,126 +0,0 @@
const uWS = require("uWebSockets.js");
const helper = require('./helper');
const {logger} = require('./logger');
const getBodyFromUWSResponse = async function (res) {
return new Promise(((resolve, reject) => {
let buffer;
res.onData((ab, isLast) => {
let chunk = Buffer.from(ab);
if (buffer) {
buffer = Buffer.concat([buffer, chunk]);
} else {
buffer = Buffer.concat([chunk]);
}
if (isLast) {
let json;
try {
json = JSON.parse(buffer);
} catch (e) {
console.error(e);
json = {};
}
resolve(json);
}
});
}));
}
const extractProjectKeyFromRequest = function (req) {
if (process.env.uws === "true") {
if (req.getParameter(0)) {
logger.debug(`[WS]where projectKey=${req.getParameter(0)}`);
return req.getParameter(0);
}
} else {
return helper.extractProjectKeyFromRequest(req);
}
return undefined;
}
const extractSessionIdFromRequest = function (req) {
if (process.env.uws === "true") {
if (req.getParameter(1)) {
logger.debug(`[WS]where projectKey=${req.getParameter(1)}`);
return req.getParameter(1);
}
} else {
return helper.extractSessionIdFromRequest(req);
}
return undefined;
}
const extractPayloadFromRequest = async function (req, res) {
let filters = {
"query": {},
"filter": {}
};
if (process.env.uws === "true") {
if (req.getQuery("q")) {
logger.debug(`[WS]where q=${req.getQuery("q")}`);
filters.query.value = req.getQuery("q");
}
if (req.getQuery("key")) {
logger.debug(`[WS]where key=${req.getQuery("key")}`);
filters.query.key = req.getQuery("key");
}
if (req.getQuery("userId")) {
logger.debug(`[WS]where userId=${req.getQuery("userId")}`);
filters.filter.userID = [req.getQuery("userId")];
}
if (!filters.query.value) {
let body = {};
if (req.getMethod() !== 'get') {
body = await getBodyFromUWSResponse(res);
}
filters = {
...filters,
"sort": {
"key": body.sort && body.sort.key ? body.sort.key : undefined,
"order": body.sort && body.sort.order === "DESC"
},
"pagination": {
"limit": body.pagination && body.pagination.limit ? body.pagination.limit : undefined,
"page": body.pagination && body.pagination.page ? body.pagination.page : undefined
}
}
filters.filter = {...filters.filter, ...(body.filter || {})};
}
} else {
return helper.extractPayloadFromRequest(req);
}
filters.filter = helper.objectToObjectOfArrays(filters.filter);
filters.filter = helper.transformFilters(filters.filter);
logger.debug("payload/filters:" + JSON.stringify(filters))
return Object.keys(filters).length > 0 ? filters : undefined;
}
const getAvailableRooms = async function (io) {
if (process.env.redis === "true") {
return io.of('/').adapter.allRooms();
} else {
return helper.getAvailableRooms(io);
}
}
const getCompressionConfig = function () {
if (process.env.uws !== "true") {
return helper.getCompressionConfig();
} else {
// uWS: The theoretical overhead per socket is 32KB (8KB for compressor and for 24KB decompressor)
if (process.env.COMPRESSION === "true") {
console.log(`uWS compression: enabled`);
return {
compression: uWS.DEDICATED_COMPRESSOR_8KB,
decompression: uWS.DEDICATED_DECOMPRESSOR_1KB
};
} else {
console.log(`uWS compression: disabled`);
return {};
}
}
}
module.exports = {
extractProjectKeyFromRequest,
extractSessionIdFromRequest,
extractPayloadFromRequest,
getCompressionConfig,
getAvailableRooms
};

View file

@ -1,234 +0,0 @@
const statsHost = process.env.STATS_HOST || 'http://assist-stats-openreplay.app.svc.cluster.local:8000/events';
const authToken = process.env.STATS_AUTH_TOKEN || '';
const {logger} = require('./logger');
class InMemoryCache {
constructor() {
this.cache = new Map();
}
set(key, value) {
this.cache.set(key, value);
}
get(key) {
return this.cache.get(key);
}
delete(key) {
this.cache.delete(key);
}
clear() {
this.cache.clear();
}
}
const cache = new InMemoryCache();
async function postData(payload) {
let headers = {
'Content-Type': 'application/json'
};
if (authToken && authToken.trim() !== '') {
headers['Authorization'] = 'Bearer ' + authToken;
}
const options = {
method: 'POST',
body: JSON.stringify(payload),
headers: headers,
}
try {
const response = await fetch(statsHost, options)
const jsonResponse = await response.json();
logger.debug('JSON response', JSON.stringify(jsonResponse, null, 4))
} catch(err) {
logger.debug('ERROR', err);
}
}
function startAssist(socket, agentID) {
const tsNow = +new Date();
const eventID = `${socket.handshake.query.sessId}_${agentID}_assist_${tsNow}`;
void postData({
"project_id": socket.handshake.query.projectId,
"session_id": socket.handshake.query.sessId,
"agent_id": agentID,
"event_id": eventID,
"event_type": "assist",
"event_state": "start",
"timestamp": tsNow,
});
// Save uniq eventID to cache
cache.set(`${socket.handshake.query.sessId}_${agentID}_assist`, eventID);
// Debug log
logger.debug(`assist_started, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${tsNow}`);
}
function endAssist(socket, agentID) {
const eventID = cache.get(`${socket.handshake.query.sessId}_${agentID}_assist`);
if (eventID === undefined) {
logger.debug(`have to skip assist_ended, no eventID in the cache, agentID: ${socket.handshake.query.agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}`);
return
}
void postData({
"project_id": socket.handshake.query.projectId,
"session_id": socket.handshake.query.sessId,
"agent_id": agentID,
"event_id": eventID,
"event_type": "assist",
"event_state": "end",
"timestamp": +new Date(),
})
// Remove eventID from cache
cache.delete(`${socket.handshake.query.sessId}_${agentID}_assist`);
// Debug logs
logger.debug(`assist_ended, agentID: ${socket.handshake.query.agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}`);
}
function startCall(socket, agentID) {
const tsNow = +new Date();
const eventID = `${socket.handshake.query.sessId}_${agentID}_call_${tsNow}`;
void postData({
"project_id": socket.handshake.query.projectId,
"session_id": socket.handshake.query.sessId,
"agent_id": agentID,
"event_id": eventID,
"event_type": "call",
"event_state": "start",
"timestamp": tsNow,
});
// Save uniq eventID to cache
cache.set(`${socket.handshake.query.sessId}_call`, eventID);
// Debug logs
logger.debug(`s_call_started, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${tsNow}`);
}
function endCall(socket, agentID) {
const tsNow = +new Date();
const eventID = cache.get(`${socket.handshake.query.sessId}_call`);
if (eventID === undefined) {
logger.debug(`have to skip s_call_ended, no eventID in the cache, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${tsNow}`);
return
}
void postData({
"project_id": socket.handshake.query.projectId,
"session_id": socket.handshake.query.sessId,
"agent_id": agentID,
"event_id": eventID,
"event_type": "call",
"event_state": "end",
"timestamp": tsNow,
});
cache.delete(`${socket.handshake.query.sessId}_call`)
// Debug logs
logger.debug(`s_call_ended, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${tsNow}`);
}
function startControl(socket, agentID) {
const tsNow = +new Date();
const eventID = `${socket.handshake.query.sessId}_${agentID}_control_${tsNow}`;
void postData({
"project_id": socket.handshake.query.projectId,
"session_id": socket.handshake.query.sessId,
"agent_id": agentID,
"event_id": eventID,
"event_type": "control",
"event_state": "start",
"timestamp": tsNow,
});
cache.set(`${socket.handshake.query.sessId}_control`, eventID)
// Debug logs
logger.debug(`s_control_started, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${+new Date()}`);
}
function endControl(socket, agentID) {
const tsNow = +new Date();
const eventID = cache.get(`${socket.handshake.query.sessId}_control`);
if (eventID === undefined) {
logger.debug(`have to skip s_control_ended, no eventID in the cache, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${tsNow}`);
return
}
void postData({
"project_id": socket.handshake.query.projectId,
"session_id": socket.handshake.query.sessId,
"agent_id": agentID,
"event_id": eventID,
"event_type": "control",
"event_state": "end",
"timestamp": tsNow,
});
cache.delete(`${socket.handshake.query.sessId}_control`)
// Debug logs
logger.debug(`s_control_ended, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${+new Date()}`);
}
function startRecord(socket, agentID) {
const tsNow = +new Date();
const eventID = `${socket.handshake.query.sessId}_${agentID}_record_${tsNow}`;
void postData({
"project_id": socket.handshake.query.projectId,
"session_id": socket.handshake.query.sessId,
"agent_id": agentID,
"event_id": eventID,
"event_type": "record",
"event_state": "start",
"timestamp": tsNow,
});
cache.set(`${socket.handshake.query.sessId}_record`, eventID)
// Debug logs
logger.debug(`s_recording_started, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${+new Date()}`);
}
function endRecord(socket, agentID) {
const tsNow = +new Date();
const eventID = cache.get(`${socket.sessId}_record`);
void postData({
"project_id": socket.handshake.query.projectId,
"session_id": socket.handshake.query.sessId,
"agent_id": agentID,
"event_id": eventID,
"event_type": "record",
"event_state": "end",
"timestamp": tsNow,
});
cache.delete(`${socket.handshake.query.sessId}_record`)
// Debug logs
logger.debug(`s_recording_ended, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${+new Date()}`);
}
function handleEvent(eventName, socket, agentID) {
switch (eventName) {
case "s_call_started": {
startCall(socket, agentID);
break;
}
case "s_call_ended": {
endCall(socket, agentID);
break;
}
case "s_control_started": {
startControl(socket, agentID)
break;
}
case "s_control_ended": {
endControl(socket, agentID)
break;
}
case "s_recording_started": {
startRecord(socket, agentID);
break;
}
case "s_recording_ended": {
endRecord(socket, agentID);
break;
}
}
}
module.exports = {
startAssist,
endAssist,
handleEvent,
}

View file

@ -1,107 +0,0 @@
const _io = require("socket.io");
const {getCompressionConfig} = require("./helper");
const {logger} = require('./logger');
let io;
const getServer = function () {return io;}
const useRedis = process.env.redis === "true";
let inMemorySocketsCache = [];
let lastCacheUpdateTime = 0;
const CACHE_REFRESH_INTERVAL = parseInt(process.env.cacheRefreshInterval) || 5000;
const doFetchAllSockets = async function () {
if (useRedis) {
const now = Date.now();
logger.info(`Using in-memory cache (age: ${now - lastCacheUpdateTime}ms)`);
return inMemorySocketsCache;
} else {
try {
return await io.fetchSockets();
} catch (error) {
logger.error('Error fetching sockets:', error);
return [];
}
}
}
// Background refresher that runs independently of requests
let cacheRefresher = null;
function startCacheRefresher() {
if (cacheRefresher) clearInterval(cacheRefresher);
cacheRefresher = setInterval(async () => {
const now = Date.now();
// Only refresh if cache is stale
if (now - lastCacheUpdateTime >= CACHE_REFRESH_INTERVAL) {
logger.debug('Background refresh triggered');
try {
const startTime = performance.now();
const result = await io.fetchSockets();
inMemorySocketsCache = result;
lastCacheUpdateTime = now;
const duration = performance.now() - startTime;
logger.info(`Background refresh complete: ${duration}ms, ${result.length} sockets`);
} catch (error) {
logger.error(`Background refresh error: ${error}`);
}
}
}, CACHE_REFRESH_INTERVAL / 2);
}
const processSocketsList = function (sockets) {
let res = []
for (let socket of sockets) {
let {handshake} = socket;
res.push({handshake});
}
return res
}
const fetchSockets = async function (roomID) {
if (!io) {
return [];
}
if (!roomID) {
return await doFetchAllSockets();
}
return await io.in(roomID).fetchSockets();
}
const createSocketIOServer = function (server, prefix) {
if (io) {
return io;
}
if (process.env.uws !== "true") {
io = _io(server, {
maxHttpBufferSize: (parseFloat(process.env.maxHttpBufferSize) || 5) * 1e6,
cors: {
origin: "*",
methods: ["GET", "POST", "PUT"],
credentials: true
},
path: (prefix ? prefix : '') + '/socket',
...getCompressionConfig()
});
} else {
io = new _io.Server({
maxHttpBufferSize: (parseFloat(process.env.maxHttpBufferSize) || 5) * 1e6,
cors: {
origin: "*",
methods: ["GET", "POST", "PUT"],
credentials: true
},
path: (prefix ? prefix : '') + '/socket',
...getCompressionConfig()
});
io.attachApp(server);
}
startCacheRefresher();
return io;
}
module.exports = {
createSocketIOServer,
getServer,
fetchSockets,
}