From 4dfcafc5727af603f275f5aa3036b58357c2ab95 Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 9 Nov 2023 15:06:11 +0100 Subject: [PATCH] Assist metrics (#1627) * feat(assist): added default metrics * feat(assist): added assist_ label to easy metrics sort * feat(assist): finished metrics template implementation (now available on /metrics) * feat(assist): removed unnecessary _ symbol from app name * feat(assist): integrated metrics to rest api and websockets handlers * feat(assist): fixed wrong naming issue * feat(assist): added connection type to some metrics * feat(assist): added url route parser for metrics stats --- assist/package-lock.json | 34 ++++++++++++ assist/package.json | 1 + assist/server.js | 2 +- assist/utils/health.js | 12 ++++- assist/utils/helper.js | 28 ++++++++++ assist/utils/metrics.js | 99 ++++++++++++++++++++++++++++++++++ assist/utils/socketHandlers.js | 31 ++++++++--- 7 files changed, 197 insertions(+), 10 deletions(-) create mode 100644 assist/utils/metrics.js diff --git a/assist/package-lock.json b/assist/package-lock.json index 25947548b..8c332a94b 100644 --- a/assist/package-lock.json +++ b/assist/package-lock.json @@ -12,6 +12,7 @@ "@maxmind/geoip2-node": "^4.2.0", "express": "^4.18.2", "jsonwebtoken": "^9.0.2", + "prom-client": "^15.0.0", "socket.io": "^4.7.2", "ua-parser-js": "^1.0.35" } @@ -25,6 +26,14 @@ "maxmind": "^4.2.0" } }, + "node_modules/@opentelemetry/api": { + "version": "1.6.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/api/-/api-1.6.0.tgz", + "integrity": "sha512-OWlrQAnWn9577PhVgqjUvMr1pg57Bc4jv0iL4w0PRuOSRvq67rvHW9Ie/dZVMvCzhSCB+UxhcY/PmCmFj33Q+g==", + "engines": { + "node": ">=8.0.0" + } + }, "node_modules/@socket.io/component-emitter": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/@socket.io/component-emitter/-/component-emitter-3.1.0.tgz", @@ -81,6 +90,11 @@ "node": "^4.5.0 || >= 5.9" } }, + "node_modules/bintrees": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/bintrees/-/bintrees-1.0.2.tgz", + "integrity": "sha512-VOMgTMwjAaUG580SXn3LacVgjurrbMme7ZZNYGSSV7mmtY6QQRh0Eg3pwIcntQ77DErK1L0NxkbetjcoXzVwKw==" + }, "node_modules/body-parser": { "version": "1.20.1", "resolved": "https://registry.npmjs.org/body-parser/-/body-parser-1.20.1.tgz", @@ -712,6 +726,18 @@ "resolved": "https://registry.npmjs.org/path-to-regexp/-/path-to-regexp-0.1.7.tgz", "integrity": "sha512-5DFkuoqlv1uYQKxy8omFBeJPQcdoE07Kv2sferDCrAq1ohOU+MSDswDIbnx3YAM60qIOnYa53wBhXW0EbMonrQ==" }, + "node_modules/prom-client": { + "version": "15.0.0", + "resolved": "https://registry.npmjs.org/prom-client/-/prom-client-15.0.0.tgz", + "integrity": "sha512-UocpgIrKyA2TKLVZDSfm8rGkL13C19YrQBAiG3xo3aDFWcHedxRxI3z+cIcucoxpSO0h5lff5iv/SXoxyeopeA==", + "dependencies": { + "@opentelemetry/api": "^1.4.0", + "tdigest": "^0.1.1" + }, + "engines": { + "node": "^16 || ^18 || >=20" + } + }, "node_modules/proxy-addr": { "version": "2.0.7", "resolved": "https://registry.npmjs.org/proxy-addr/-/proxy-addr-2.0.7.tgz", @@ -945,6 +971,14 @@ "node": ">= 0.8" } }, + "node_modules/tdigest": { + "version": "0.1.2", + "resolved": "https://registry.npmjs.org/tdigest/-/tdigest-0.1.2.tgz", + "integrity": "sha512-+G0LLgjjo9BZX2MfdvPfH+MKLCrxlXSYec5DaPYP1fe6Iyhf0/fSmJ0bFiZ1F8BT6cGXl2LpltQptzjXKWEkKA==", + "dependencies": { + "bintrees": "1.0.2" + } + }, "node_modules/tiny-lru": { "version": "11.0.1", "resolved": "https://registry.npmjs.org/tiny-lru/-/tiny-lru-11.0.1.tgz", diff --git a/assist/package.json b/assist/package.json index fc9cecdf2..d13b969f1 100644 --- a/assist/package.json +++ b/assist/package.json @@ -21,6 +21,7 @@ "@maxmind/geoip2-node": "^4.2.0", "express": "^4.18.2", "jsonwebtoken": "^9.0.2", + "prom-client": "^15.0.0", "socket.io": "^4.7.2", "ua-parser-js": "^1.0.35" } diff --git a/assist/server.js b/assist/server.js index 5eb6c2e16..5f3cc068e 100644 --- a/assist/server.js +++ b/assist/server.js @@ -33,4 +33,4 @@ const wsserver = wsapp.listen(PORT, HOST, () => { wsapp.enable('trust proxy'); socket.start(wsserver); -module.exports = {wsserver}; \ No newline at end of file +module.exports = {wsserver}; diff --git a/assist/utils/health.js b/assist/utils/health.js index 0b89dd1d8..cbc7cab57 100644 --- a/assist/utils/health.js +++ b/assist/utils/health.js @@ -1,7 +1,7 @@ const express = require('express'); const HOST = process.env.LISTEN_HOST || '0.0.0.0'; const PORT = process.env.HEALTH_PORT || 8888; - +const register = require('./metrics').register; const {request_logger} = require("./helper"); const debug = process.env.debug === "1"; @@ -21,7 +21,6 @@ const check_health = async function (req, res) { }); } - const healthApp = express(); healthApp.use(express.json()); healthApp.use(express.urlencoded({extended: true})); @@ -40,6 +39,15 @@ healthApp.get('/shutdown', (req, res) => { } ); +healthApp.get('/metrics', async (req, res) => { + try { + res.set('Content-Type', register.contentType); + res.end(await register.metrics()); + } catch (ex) { + res.status(500).end(ex); + } +}); + const listen_cb = async function () { console.log(`Health App listening on http://${HOST}:${PORT}`); console.log('Press Ctrl+C to quit.'); diff --git a/assist/utils/helper.js b/assist/utils/helper.js index 03fc7c313..8c1eacad1 100644 --- a/assist/utils/helper.js +++ b/assist/utils/helper.js @@ -1,5 +1,11 @@ +const { + RecordRequestDuration, + IncreaseTotalRequests +} = require('../utils/metrics'); + let PROJECT_KEY_LENGTH = parseInt(process.env.PROJECT_KEY_LENGTH) || 20; let debug = process.env.debug === "1" || false; + const extractRoomId = (peerId) => { let {projectKey, sessionId, tabId} = extractPeerId(peerId); if (projectKey && sessionId) { @@ -7,6 +13,7 @@ const extractRoomId = (peerId) => { } return null; } + const extractTabId = (peerId) => { let {projectKey, sessionId, tabId} = extractPeerId(peerId); if (tabId) { @@ -14,6 +21,7 @@ const extractTabId = (peerId) => { } return null; } + const extractPeerId = (peerId) => { let splited = peerId.split("-"); if (splited.length < 2 || splited.length > 3) { @@ -29,10 +37,16 @@ const extractPeerId = (peerId) => { } return {projectKey: splited[0], sessionId: splited[1], tabId: splited[2]}; }; + const request_logger = (identity) => { return (req, res, next) => { debug && console.log(identity, new Date().toTimeString(), 'REQUEST', req.method, req.originalUrl); + const startTs = performance.now(); // millis res.on('finish', function () { + const duration = performance.now() - startTs; + IncreaseTotalRequests(); + let route = req.originalUrl.split('/')[3]; + RecordRequestDuration(req.method, route, this.statusCode, duration); if (this.statusCode !== 200 || debug) { console.log(new Date().toTimeString(), 'RESPONSE', req.method, req.originalUrl, this.statusCode); } @@ -41,6 +55,7 @@ const request_logger = (identity) => { next(); } }; + const extractProjectKeyFromRequest = function (req) { if (req.params.projectKey) { debug && console.log(`[WS]where projectKey=${req.params.projectKey}`); @@ -48,6 +63,7 @@ const extractProjectKeyFromRequest = function (req) { } return undefined; } + const extractSessionIdFromRequest = function (req) { if (req.params.sessionId) { debug && console.log(`[WS]where sessionId=${req.params.sessionId}`); @@ -55,6 +71,7 @@ const extractSessionIdFromRequest = function (req) { } return undefined; } + const isValidSession = function (sessionInfo, filters) { let foundAll = true; for (const [key, body] of Object.entries(filters)) { @@ -89,6 +106,7 @@ const isValidSession = function (sessionInfo, filters) { } return foundAll; } + const getValidAttributes = function (sessionInfo, query) { let matches = []; let deduplicate = []; @@ -106,9 +124,11 @@ const getValidAttributes = function (sessionInfo, query) { } return matches; } + const hasFilters = function (filters) { return filters && filters.filter && Object.keys(filters.filter).length > 0; } + const objectToObjectOfArrays = function (obj) { let _obj = {} if (obj) { @@ -126,6 +146,7 @@ const objectToObjectOfArrays = function (obj) { } return _obj; } + const transformFilters = function (filter) { for (let key of Object.keys(filter)) { //To support old v1.7.0 payload @@ -142,6 +163,7 @@ const transformFilters = function (filter) { } return filter; } + const extractPayloadFromRequest = async function (req, res) { let filters = { "query": {}, // for autocomplete @@ -173,6 +195,7 @@ const extractPayloadFromRequest = async function (req, res) { debug && console.log("payload/filters:" + JSON.stringify(filters)) return filters; } + const getValue = function (obj, key) { if (obj !== undefined && obj !== null) { let val; @@ -190,6 +213,7 @@ const getValue = function (obj, key) { } return undefined; } + const sortPaginate = function (list, filters) { if (typeof (list) === "object" && !Array.isArray(list)) { for (const [key, value] of Object.entries(list)) { @@ -224,6 +248,7 @@ const sortPaginate = function (list, filters) { } return {"total": total, "sessions": list}; } + const uniqueAutocomplete = function (list) { let _list = []; let deduplicate = []; @@ -235,9 +260,11 @@ const uniqueAutocomplete = function (list) { } return _list; } + const getAvailableRooms = async function (io) { return io.sockets.adapter.rooms; } + const getCompressionConfig = function () { // WS: The theoretical overhead per socket is 19KB (11KB for compressor and 8KB for decompressor) let perMessageDeflate = false; @@ -261,6 +288,7 @@ const getCompressionConfig = function () { }; } + module.exports = { transformFilters, extractRoomId, diff --git a/assist/utils/metrics.js b/assist/utils/metrics.js new file mode 100644 index 000000000..104721d1f --- /dev/null +++ b/assist/utils/metrics.js @@ -0,0 +1,99 @@ +const client = require('prom-client') + +// Create a Registry which registers the metrics +const register = new client.Registry() +register.setDefaultLabels({ + app: 'assist' +}) + +// Enable the collection of default metrics +client.collectDefaultMetrics({ register }) + +// http metrics + +const httpRequestDuration = new client.Histogram({ + name: 'request_duration_seconds', + help: 'A histogram displaying the duration of each HTTP request in seconds.', + labelNames: ['method', 'route', 'code'], + buckets: [.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 25, 50, 100, 250, 500, 1000], +}); + +const RecordRequestDuration = function(method, route, code, duration) { + httpRequestDuration.observe({ method: method, route: route, code: code }, duration); +} + +const httpTotalRequests = new client.Counter({ + name: 'requests_total', + help: 'A counter displaying the number of all HTTP requests.', +}); + +const IncreaseTotalRequests = function () { + httpTotalRequests.inc(); +} + +// websocket metrics + +const websocketTotalConnections = new client.Counter({ + name: 'ws_connections_total', + help: 'A counter displaying the number of all ws connections', + labelNames: ['type'], // tab, agent +}); + +const IncreaseTotalWSConnections = function (type) { + websocketTotalConnections.inc({type: type}); +} + +const websocketOnlineConnections = new client.Gauge({ + name: 'ws_connections_online', + help: 'A gauge displaying the number of online (active) connections', + labelNames: ['type'], // tab, agent +}); + +const IncreaseOnlineConnections = function (type) { + websocketOnlineConnections.inc({type: type}); +} + +const DecreaseOnlineConnections = function (type) { + websocketOnlineConnections.dec({type: type}); +} + +const websocketTotalRooms = new client.Counter({ + name: 'ws_rooms_total', + help: 'A counter displaying the number of all rooms', +}); + +const IncreaseTotalRooms = function () { + websocketTotalRooms.inc(); +} + +const websocketOnlineRooms = new client.Gauge({ + name: 'ws_rooms_online', + help: 'A gauge displaying the number of online (active) rooms', +}); + +const IncreaseOnlineRooms = function () { + websocketOnlineRooms.inc(); +} + +const DecreaseOnlineRooms = function () { + websocketOnlineRooms.dec(); +} + +register.registerMetric(httpRequestDuration); +register.registerMetric(httpTotalRequests); +register.registerMetric(websocketTotalConnections); +register.registerMetric(websocketOnlineConnections); +register.registerMetric(websocketTotalRooms); +register.registerMetric(websocketOnlineRooms); + +module.exports = { + register, + RecordRequestDuration, + IncreaseTotalRequests, + IncreaseTotalWSConnections, + IncreaseOnlineConnections, + DecreaseOnlineConnections, + IncreaseTotalRooms, + IncreaseOnlineRooms, + DecreaseOnlineRooms, +} \ No newline at end of file diff --git a/assist/utils/socketHandlers.js b/assist/utils/socketHandlers.js index 0d80d6019..f2ebcb863 100644 --- a/assist/utils/socketHandlers.js +++ b/assist/utils/socketHandlers.js @@ -15,6 +15,14 @@ const { const { getServer } = require('../utils/wsServer'); +const { + IncreaseTotalWSConnections, + IncreaseOnlineConnections, + DecreaseOnlineConnections, + IncreaseTotalRooms, + IncreaseOnlineRooms, + DecreaseOnlineRooms, +} = require('../utils/metrics'); const debug_log = process.env.debug === "1"; const error_log = process.env.ERROR === "1"; @@ -70,13 +78,15 @@ function processNewSocket(socket) { async function onConnect(socket) { debug_log && console.log(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`); processNewSocket(socket); + IncreaseTotalWSConnections(socket.identity); + IncreaseOnlineConnections(socket.identity); const io = getServer(); - const {sessionsCount, agentsCount, tabIDs, agentIDs} = await getRoomData(io, socket.roomId); + const {tabsCount, agentsCount, tabIDs, agentIDs} = await getRoomData(io, socket.roomId); if (socket.identity === IDENTITIES.session) { // Check if session with the same tabID already connected, if so, refuse new connexion - if (sessionsCount > 0) { + if (tabsCount > 0) { for (let tab of tabIDs) { if (tab === socket.tabId) { error_log && console.log(`session already connected, refusing new connexion, peerId: ${socket.peerId}`); @@ -85,6 +95,11 @@ async function onConnect(socket) { } } } + if (tabsCount < 0) { + // New session creates new room + IncreaseTotalRooms(); + IncreaseOnlineRooms(); + } extractSessionInfo(socket); // Inform all connected agents about reconnected session if (agentsCount > 0) { @@ -92,8 +107,7 @@ async function onConnect(socket) { io.to(socket.id).emit(EVENTS_DEFINITION.emit.AGENTS_CONNECTED, agentIDs); socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.SESSION_RECONNECTED, socket.id); } - - } else if (sessionsCount <= 0) { + } else if (tabsCount <= 0) { debug_log && console.log(`notifying new agent about no SESSIONS with peerId:${socket.peerId}`); io.to(socket.id).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); } @@ -132,7 +146,9 @@ async function onConnect(socket) { } async function onDisconnect(socket) { + DecreaseOnlineConnections(socket.identity); debug_log && console.log(`${socket.id} disconnected from ${socket.roomId}`); + if (socket.identity === IDENTITIES.agent) { socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.AGENT_DISCONNECT, socket.id); // Stats @@ -140,13 +156,14 @@ async function onDisconnect(socket) { } debug_log && console.log("checking for number of connected agents and sessions"); const io = getServer(); - let {sessionsCount, agentsCount, tabIDs, agentIDs} = await getRoomData(io, socket.roomId); + let {tabsCount, agentsCount, tabIDs, agentIDs} = await getRoomData(io, socket.roomId); - if (sessionsCount === -1 && agentsCount === -1) { + if (tabsCount === -1 && agentsCount === -1) { + DecreaseOnlineRooms(); debug_log && console.log(`room not found: ${socket.roomId}`); return; } - if (sessionsCount === 0) { + if (tabsCount === 0) { debug_log && console.log(`notifying everyone in ${socket.roomId} about no SESSIONS`); socket.to(socket.roomId).emit(EVENTS_DEFINITION.emit.NO_SESSIONS); }