This commit is contained in:
Alexander 2025-05-13 10:59:42 +02:00 committed by GitHub
commit c50396357d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
50 changed files with 2545 additions and 3099 deletions

24
assist-server/Makefile Normal file
View file

@ -0,0 +1,24 @@
ee ?= "false" # true to build ee
arch ?= "amd64" # default amd64
docker_runtime ?= "docker" # default docker runtime
docker_repo ?= "public.ecr.aws/p1t3u8a3"
docker_build_args ?= $(if $(filter depot,$(docker_runtime)),"--push","")
.PHONY: help
help: ## Prints help for targets with comments
@awk 'BEGIN {FS = ":.*##"; printf "\nUsage:\n make \033[36m<target>\033[0m\n"} /^[a-zA-Z_0-9-]+:.*?##/ { printf " \033[36m%-25s\033[0m %s\n", $$1, $$2 } /^##@/ { printf "\n\033[1m%s\033[0m\n", substr($$0, 5) } ' $(MAKEFILE_LIST)
##@ Docker
.PHONY: build
build: ## Build the backend. ee=true for ee build.
@DOCKER_BUILD_ARGS=$(docker_build_args) DOCKER_REPO=$(docker_repo) ARCH=$(arch) DOCKER_RUNTIME=$(docker_runtime) bash build.sh $(ee)
##@ Local Dev
.PHONY: scan
scan: ## Scan the backend
@echo scanning foss
@trivy fs -q .
@echo scanning ee
@trivy fs -q ../ee/assist-server/

View file

@ -1,7 +1,10 @@
ee ?= "false" # true to build ee
distro ?= foss # ee to build ee
app ?= "" # app name, default all
arch ?= "amd64" # default amd64
docker_repo ?= "public.ecr.aws/p1t3u8a3"
docker_runtime ?= "docker" # default docker runtime
image_tag ?= "" # image tag to build. Default is git sha short
docker_build_args ?= $(if $(filter depot,$(docker_runtime)),"--push","")
.PHONY: help
help: ## Prints help for targets with comments
@ -11,7 +14,7 @@ help: ## Prints help for targets with comments
.PHONY: build
build: ## Build the backend. ee=true for ee build. app=app name for only one app. Default build all apps.
ARCH=$(arch) DOCKER_RUNTIME=$(docker_runtime) bash build.sh $(ee) $(app)
IMAGE_TAG=$(image_tag) DOCKER_BUILD_ARGS=$(docker_build_args) DOCKER_REPO=$(docker_repo) ARCH=$(arch) DOCKER_RUNTIME=$(docker_runtime) bash build.sh $(distro) $(app)
##@ Local Dev

View file

@ -26,7 +26,11 @@ func New() Logger {
encoderConfig := zap.NewProductionEncoderConfig()
encoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05.000")
jsonEncoder := zapcore.NewJSONEncoder(encoderConfig)
core := zapcore.NewCore(jsonEncoder, zapcore.AddSync(os.Stdout), zap.InfoLevel)
logLevel := zap.InfoLevel
if os.Getenv("DEBUG") == "true" {
logLevel = zap.DebugLevel
}
core := zapcore.NewCore(jsonEncoder, zapcore.AddSync(os.Stdout), logLevel)
baseLogger := zap.New(core, zap.AddCaller())
logger := baseLogger.WithOptions(zap.AddCallerSkip(1))
customLogger := &loggerImpl{l: logger}

View file

@ -13,7 +13,7 @@ func (e *routerImpl) health(w http.ResponseWriter, r *http.Request) {
func (e *routerImpl) healthMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/" {
if r.URL.Path == "/" || r.URL.Path == "/health" {
w.WriteHeader(http.StatusOK)
return
}

View file

@ -1,5 +0,0 @@
.idea
node_modules
npm-debug.log
.cache
*.mmdb

View file

@ -1,24 +0,0 @@
ARG ARCH=amd64
FROM --platform=linux/$ARCH node:23-alpine
LABEL Maintainer="Zavorotynskiy Alexander <zavorotynskiy@pm.me>"
RUN apk add --no-cache tini git libc6-compat
ARG envarg
ENV ENTERPRISE_BUILD=${envarg} \
MAXMINDDB_FILE=/home/openreplay/geoip.mmdb \
PRIVATE_ENDPOINTS=false \
LISTEN_PORT=9001 \
ERROR=1 \
NODE_ENV=production
WORKDIR /work
COPY package.json .
COPY package-lock.json .
RUN npm install
COPY . .
RUN adduser -u 1001 openreplay -D
USER 1001
ADD --chown=1001 https://static.openreplay.com/geoip/GeoLite2-City.mmdb $MAXMINDDB_FILE
ENTRYPOINT ["/sbin/tini", "--"]
CMD npm start

View file

@ -1,109 +0,0 @@
const {logger} = require('./logger');
const {createClient} = require("redis");
const crypto = require("crypto");
let redisClient;
const REDIS_URL = (process.env.REDIS_URL || "localhost:6379").replace(/((^\w+:|^)\/\/|^)/, 'redis://');
redisClient = createClient({url: REDIS_URL});
redisClient.on("error", (error) => logger.error(`Redis cache error : ${error}`));
void redisClient.connect();
function generateNodeID() {
const buffer = crypto.randomBytes(8);
return "node_"+buffer.readBigUInt64BE(0).toString();
}
const PING_INTERVAL = parseInt(process.env.PING_INTERVAL_SECONDS) || 25;
const CACHE_REFRESH_INTERVAL = parseInt(process.env.CACHE_REFRESH_INTERVAL_SECONDS) || 10;
const pingInterval = PING_INTERVAL + PING_INTERVAL/2;
const cacheRefreshInterval = CACHE_REFRESH_INTERVAL + CACHE_REFRESH_INTERVAL/2;
const cacheRefreshIntervalMs = CACHE_REFRESH_INTERVAL * 1000;
let lastCacheUpdateTime = 0;
let cacheRefresher = null;
const nodeID = process.env.HOSTNAME || generateNodeID();
const addSessionToCache = async function (sessionID, sessionData) {
try {
await redisClient.set(`active_sessions:${sessionID}`, JSON.stringify(sessionData), 'EX', pingInterval);
logger.debug(`Session ${sessionID} stored in Redis`);
} catch (error) {
logger.error(error);
}
}
const renewSession = async function (sessionID){
try {
await redisClient.expire(`active_sessions:${sessionID}`, pingInterval);
logger.debug(`Session ${sessionID} renewed in Redis`);
} catch (error) {
logger.error(error);
}
}
const getSessionFromCache = async function (sessionID) {
try {
const sessionData = await redisClient.get(`active_sessions:${sessionID}`);
if (sessionData) {
logger.debug(`Session ${sessionID} retrieved from Redis`);
return JSON.parse(sessionData);
}
return null;
} catch (error) {
logger.error(error);
return null;
}
}
const removeSessionFromCache = async function (sessionID) {
try {
await redisClient.del(`active_sessions:${sessionID}`);
logger.debug(`Session ${sessionID} removed from Redis`);
} catch (error) {
logger.error(error);
}
}
const setNodeSessions = async function (nodeID, sessionIDs) {
try {
await redisClient.set(`node:${nodeID}:sessions`, JSON.stringify(sessionIDs), 'EX', cacheRefreshInterval);
logger.debug(`Node ${nodeID} sessions stored in Redis`);
} catch (error) {
logger.error(error);
}
}
function startCacheRefresher(io) {
if (cacheRefresher) clearInterval(cacheRefresher);
cacheRefresher = setInterval(async () => {
const now = Date.now();
if (now - lastCacheUpdateTime < cacheRefreshIntervalMs) {
return;
}
logger.debug('Background refresh triggered');
try {
const startTime = performance.now();
const sessionIDs = new Set();
const result = await io.fetchSockets();
result.forEach((socket) => {
if (socket.handshake.query.sessId) {
sessionIDs.add(socket.handshake.query.sessId);
}
})
await setNodeSessions(nodeID, Array.from(sessionIDs));
lastCacheUpdateTime = now;
const duration = performance.now() - startTime;
logger.info(`Background refresh complete: ${duration}ms, ${result.length} sockets`);
} catch (error) {
logger.error(`Background refresh error: ${error}`);
}
}, cacheRefreshIntervalMs / 2);
}
module.exports = {
addSessionToCache,
renewSession,
getSessionFromCache,
removeSessionFromCache,
startCacheRefresher,
}

File diff suppressed because it is too large Load diff

View file

@ -1,24 +0,0 @@
{
"name": "assist-server",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"@fastify/deepmerge": "^3.0.0",
"@maxmind/geoip2-node": "^6.0.0",
"express": "^4.21.2",
"jsonwebtoken": "^9.0.2",
"redis": "^4.7.0",
"socket.io": "^4.8.1",
"socket.io-client": "^4.8.1",
"ua-parser-js": "^2.0.3",
"uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.51.0",
"winston": "^3.17.0"
}
}

View file

@ -1,67 +0,0 @@
const { App } = require('uWebSockets.js');
const { Server } = require('socket.io');
const { logger } = require("./app/logger");
const { authorizer } = require("./app/assist");
const { onConnect, setSocketIOServer } = require("./app/socket");
const { startCacheRefresher } = require("./app/cache");
const app = App();
const pingInterval = parseInt(process.env.PING_INTERVAL) || 25000;
const getCompressionConfig = function () {
// WS: The theoretical overhead per socket is 19KB (11KB for compressor and 8KB for decompressor)
let perMessageDeflate = false;
if (process.env.COMPRESSION === "true") {
logger.info(`WS compression: enabled`);
perMessageDeflate = {
zlibDeflateOptions: {
windowBits: 10,
memLevel: 1
},
zlibInflateOptions: {
windowBits: 10
}
}
} else {
logger.info(`WS compression: disabled`);
}
return {
perMessageDeflate: perMessageDeflate,
clientNoContextTakeover: true
};
}
const io = new Server({
maxHttpBufferSize: (parseFloat(process.env.maxHttpBufferSize) || 5) * 1e6,
pingInterval: pingInterval, // Will use it for cache invalidation
cors: {
origin: "*", // Allow connections from any origin (for development)
methods: ["GET", "POST"],
credentials: true
},
path: '/socket',
...getCompressionConfig()
});
io.use(async (socket, next) => await authorizer.check(socket, next));
io.on('connection', (socket) => onConnect(socket));
io.attachApp(app);
io.engine.on("headers", (headers) => {
headers["x-host-id"] = process.env.HOSTNAME || "unknown";
});
setSocketIOServer(io);
const HOST = process.env.LISTEN_HOST || '0.0.0.0';
const PORT = parseInt(process.env.PORT) || 9001;
app.listen(PORT, (token) => {
if (token) {
console.log(`Server running at http://${HOST}:${PORT}`);
} else {
console.log(`Failed to listen on port ${PORT}`);
}
});
startCacheRefresher(io);
process.on('uncaughtException', err => {
logger.error(`Uncaught Exception: ${err}`);
});

16
ee/assist/.gitignore vendored
View file

@ -2,20 +2,4 @@
node_modules
npm-debug.log
.cache
test.html
build.sh
servers/peerjs-server.js
servers/sourcemaps-handler.js
servers/sourcemaps-server.js
/utils/geoIP.js
/utils/health.js
/utils/HeapSnapshot.js
/utils/helper.js
/utils/assistHelper.js
/utils/httpHandlers.js
/utils/socketHandlers.js
.local
*.mmdb

View file

@ -1,11 +1,8 @@
#ARCH can be amd64 or arm64
ARG ARCH=amd64
FROM --platform=linux/$ARCH node:23-alpine
LABEL Maintainer="KRAIEM Taha Yassine<tahayk2@gmail.com>"
LABEL Maintainer="Zavorotynskiy Alexander <zavorotynskiy@pm.me>"
RUN apk add --no-cache tini git libc6-compat
# && ln -s /lib/libc.musl-x86_64.so.1 /lib/ld-linux-x86-64.so.2
ARG envarg
ENV ENTERPRISE_BUILD=${envarg} \
MAXMINDDB_FILE=/home/openreplay/geoip.mmdb \

View file

@ -32,16 +32,13 @@ EVENTS_DEFINITION.emit = {
const BASE_sessionInfo = {
"pageTitle": "Page",
"active": false,
"live": true,
"sessionID": "0",
"metadata": {},
"userID": "",
"userUUID": "",
"projectKey": "",
"revID": "",
"timestamp": 0,
"trackerVersion": "",
"isSnippet": true,
"userOs": "",
"userBrowser": "",
"userBrowserVersion": "",
@ -49,8 +46,7 @@ const BASE_sessionInfo = {
"userDeviceType": "",
"userCountry": "",
"userState": "",
"userCity": "",
"projectId": 0
"userCity": ""
};
const extractPeerId = (peerId) => {

180
ee/assist/app/cache.js Normal file
View file

@ -0,0 +1,180 @@
const {logger} = require('./logger');
const Redis = require("ioredis");
const crypto = require("crypto");
const { Mutex } = require("async-mutex");
const REDIS_URL = process.env.REDIS_URL || "localhost:6379";
const redisClient = new Redis(REDIS_URL);
redisClient.on("error", (error) => {
logger.error(`Redis cache error : ${error}`);
});
function generateNodeID() {
const buffer = crypto.randomBytes(8);
return "node_"+buffer.readBigUInt64BE(0).toString();
}
const batchSize = parseInt(process.env.REDIS_BATCH_SIZE) || 1000;
const PING_INTERVAL = parseInt(process.env.PING_INTERVAL_SECONDS) || 25;
const CACHE_REFRESH_INTERVAL = parseInt(process.env.CACHE_REFRESH_INTERVAL_SECONDS) || 5;
const pingInterval = Math.floor(PING_INTERVAL + PING_INTERVAL/2);
const cacheRefreshInterval = Math.floor(CACHE_REFRESH_INTERVAL * 4);
const cacheRefreshIntervalMs = CACHE_REFRESH_INTERVAL * 1000;
let lastCacheUpdateTime = 0;
let cacheRefresher = null;
const nodeID = process.env.HOSTNAME || generateNodeID();
const mutex = new Mutex();
const localCache = {
addedSessions: new Set(),
updatedSessions: new Set(),
refreshedSessions: new Set(),
deletedSessions: new Set()
};
const addSession = async function (sessionID) {
await mutex.runExclusive(() => {
localCache.addedSessions.add(sessionID);
});
}
const updateSession = async function (sessionID) {
await mutex.runExclusive(() => {
localCache.addedSessions.add(sessionID); // to update the session's cache
localCache.updatedSessions.add(sessionID); // to add sessionID to the list of recently updated sessions
});
}
const renewSession = async function (sessionID) {
await mutex.runExclusive(() => {
localCache.refreshedSessions.add(sessionID);
})
}
const removeSession = async function (sessionID) {
await mutex.runExclusive(() => {
localCache.deletedSessions.add(sessionID);
});
}
const updateNodeCache = async function (io) {
logger.debug('Background refresh triggered');
try {
const startTime = performance.now();
let currStepTs = performance.now();
const sessionIDs = new Set();
const result = await io.fetchSockets();
let toAdd = new Map();
let toUpdate = [];
let toRenew = [];
let toDelete = [];
await mutex.runExclusive(() => {
result.forEach((socket) => {
if (socket.handshake.query.sessId) {
const sessID = socket.handshake.query.sessId;
if (sessionIDs.has(sessID)) {
return;
}
sessionIDs.add(sessID);
if (localCache.addedSessions.has(sessID)) {
toAdd.set(sessID, socket.handshake.query.sessionInfo);
}
}
});
toUpdate = [...localCache.updatedSessions];
toRenew = [...localCache.refreshedSessions];
toDelete = [...localCache.deletedSessions];
// Clear the local cache
localCache.addedSessions.clear();
localCache.updatedSessions.clear();
localCache.refreshedSessions.clear();
localCache.deletedSessions.clear();
})
// insert new sessions in pipeline
const toAddArray = Array.from(toAdd.keys());
for (let i = 0; i < toAddArray.length; i += batchSize) {
const batch = toAddArray.slice(i, i + batchSize);
const pipeline = redisClient.pipeline();
for (const sessionID of batch) {
pipeline.set(`assist:online_sessions:${sessionID}`, JSON.stringify(toAdd.get(sessionID)), 'EX', pingInterval);
}
await pipeline.exec();
}
logger.info(`step 1 (toAdd) complete: ${(performance.now() - currStepTs).toFixed(2)}ms, ${toAddArray.length} sockets`);
currStepTs = performance.now();
// renew sessions in pipeline
for (let i = 0; i < toRenew.length; i += batchSize) {
const batch = toRenew.slice(i, i + batchSize);
const pipeline = redisClient.pipeline();
for (const sessionID of batch) {
pipeline.expire(`assist:online_sessions:${sessionID}`, pingInterval);
}
await pipeline.exec();
}
logger.info(`step 2 (toRenew) complete: ${(performance.now() - currStepTs).toFixed(2)}ms, ${toRenew.length} sockets`);
currStepTs = performance.now();
// delete sessions in pipeline
for (let i = 0; i < toDelete.length; i += batchSize) {
const batch = toDelete.slice(i, i + batchSize);
const pipeline = redisClient.pipeline();
for (const sessionID of batch) {
pipeline.del(`assist:online_sessions:${sessionID}`);
}
await pipeline.exec();
}
logger.info(`step 3 (toDelete) complete: ${(performance.now() - currStepTs).toFixed(2)}ms, ${toDelete.length} sockets`);
currStepTs = performance.now();
// add recently updated sessions
if (toUpdate.length > 0) {
await redisClient.sadd('assist:updated_sessions', toUpdate);
}
// store the node sessions
await redisClient.set(`assist:nodes:${nodeID}:sessions`, JSON.stringify(Array.from(sessionIDs)), 'EX', cacheRefreshInterval);
logger.info(`step 4 (full list + updated) complete: ${(performance.now() - currStepTs).toFixed(2)}ms, ${toUpdate.length} sockets`);
const duration = performance.now() - startTime;
logger.info(`Background refresh complete: ${duration.toFixed(2)}ms, ${result.length} sockets`);
} catch (error) {
logger.error(`Background refresh error: ${error}`);
}
}
let isFlushing = false;
function startCacheRefresher(io) {
if (cacheRefresher) clearInterval(cacheRefresher);
cacheRefresher = setInterval(async () => {
if (isFlushing) {
logger.warn("Skipping tick: flush in progress");
return;
}
const now = Date.now();
if (now - lastCacheUpdateTime < cacheRefreshIntervalMs) {
return;
}
isFlushing = true;
try {
await updateNodeCache(io);
lastCacheUpdateTime = Date.now();
} catch (err) {
logger.error(`Tick error: ${err}`);
} finally {
isFlushing = false;
}
}, cacheRefreshIntervalMs / 2);
}
module.exports = {
addSession,
updateSession,
renewSession,
removeSession,
startCacheRefresher,
}

View file

@ -6,9 +6,10 @@ const {
errorHandler
} = require("./assist");
const {
addSessionToCache,
addSession,
updateSession,
renewSession,
removeSessionFromCache
removeSession
} = require('./cache');
const {
logger
@ -83,8 +84,11 @@ async function getRoomData(roomID) {
async function onConnect(socket) {
logger.debug(`A new client:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`);
// Drop unknown socket.io connections
if (socket.handshake.query.identity === undefined || socket.handshake.query.peerId === undefined || socket.handshake.query.sessionInfo === undefined) {
logger.debug(`something is undefined, refusing connexion`);
if (socket.handshake.query.identity === undefined || socket.handshake.query.peerId === undefined) {
logger.debug(`no identity or peerId, refusing connexion`);
return socket.disconnect();
} else if (socket.handshake.query.identity === IDENTITIES.session && socket.handshake.query.sessionInfo === undefined) {
logger.debug(`sessionInfo is undefined, refusing connexion`);
return socket.disconnect();
}
processPeerInfo(socket);
@ -122,7 +126,7 @@ async function onConnect(socket) {
// Add session to cache
if (socket.handshake.query.identity === IDENTITIES.session) {
await addSessionToCache(socket.handshake.query.sessId, socket.handshake.query.sessionInfo);
await addSession(socket.handshake.query.sessId, socket.handshake.query.sessionInfo);
}
if (socket.handshake.query.identity === IDENTITIES.agent) {
@ -167,7 +171,7 @@ async function onDisconnect(socket) {
let {tabsCount, agentsCount, tabIDs, agentIDs} = await getRoomData(socket.handshake.query.roomId);
if (tabsCount <= 0) {
await removeSessionFromCache(socket.handshake.query.sessId);
await removeSession(socket.handshake.query.sessId);
}
if (tabsCount === -1 && agentsCount === -1) {
@ -195,7 +199,7 @@ async function onUpdateEvent(socket, ...args) {
socket.handshake.query.sessionInfo = deepMerge(socket.handshake.query.sessionInfo, args[0]?.data, {tabId: args[0]?.meta?.tabId});
// update session cache
await addSessionToCache(socket.handshake.query.sessId, socket.handshake.query.sessionInfo);
await updateSession(socket.handshake.query.sessId, socket.handshake.query.sessionInfo);
// Update sessionInfo for all agents in the room
const connected_sockets = await fetchSockets(socket.handshake.query.roomId);

View file

@ -1,14 +0,0 @@
rm -rf ./utils/assistHelper.js
rm -rf ./utils/geoIP.js
rm -rf ./utils/health.js
rm -rf ./utils/HeapSnapshot.js
rm -rf ./utils/helper.js
rm -rf ./utils/httpHandlers.js
rm -rf ./utils/logger.js
rm -rf ./utils/metrics.js
rm -rf ./utils/socketHandlers.js
rm -rf servers/peerjs-server.js
rm -rf servers/sourcemaps-handler.js
rm -rf servers/sourcemaps-server.js
rm -rf build.sh

File diff suppressed because it is too large Load diff

View file

@ -1,33 +1,26 @@
{
"name": "assist-server",
"version": "v1.22.0-ee",
"description": "assist server to get live sessions & sourcemaps reader to get stack trace",
"main": "peerjs-server.js",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"start": "node server.js"
"test": "echo \"Error: no test specified\" && exit 1"
},
"repository": {
"type": "git",
"url": "git+https://github.com/openreplay/openreplay.git"
},
"author": "KRAIEM Taha Yassine <tahayk2@gmail.com>",
"license": "Elastic License 2.0 (ELv2)",
"bugs": {
"url": "https://github.com/openreplay/openreplay/issues"
},
"homepage": "https://github.com/openreplay/openreplay#readme",
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"@fastify/deepmerge": "^2.0.1",
"@maxmind/geoip2-node": "^4.2.0",
"@socket.io/redis-adapter": "^8.2.1",
"express": "^4.21.1",
"@fastify/deepmerge": "^3.0.0",
"@maxmind/geoip2-node": "^6.0.0",
"async-mutex": "^0.5.0",
"express": "^4.21.2",
"ioredis": "^5.6.1",
"jsonwebtoken": "^9.0.2",
"prom-client": "^15.0.0",
"redis": "^4.6.10",
"socket.io": "^4.8.0",
"ua-parser-js": "^1.0.37",
"redis": "^4.7.0",
"socket.io": "^4.8.1",
"socket.io-client": "^4.8.1",
"ua-parser-js": "^2.0.3",
"uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.51.0",
"winston": "^3.13.0"
"winston": "^3.17.0"
}
}

View file

@ -1,2 +0,0 @@
#!/bin/bash
rsync -avr --exclude=".*" --exclude="node_modules" --ignore-existing ../../assist/* ./

View file

@ -1,6 +0,0 @@
#!/bin/bash
set -a
source .env
set +a
npm start

View file

@ -1,119 +1,64 @@
const dumps = require('./utils/HeapSnapshot');
const {request_logger} = require('./utils/helper');
const express = require('express');
const health = require("./utils/health");
const assert = require('assert').strict;
const register = require('./utils/metrics').register;
let socket;
if (process.env.redis === "true") {
socket = require("./servers/websocket-cluster");
} else {
socket = require("./servers/websocket");
}
const {logger} = require('./utils/logger');
const { App } = require('uWebSockets.js');
const { Server } = require('socket.io');
const { logger } = require("./app/logger");
const { authorizer } = require("./app/assist");
const { onConnect, setSocketIOServer } = require("./app/socket");
const { startCacheRefresher } = require("./app/cache");
health.healthApp.get('/metrics', async (req, res) => {
try {
res.set('Content-Type', register.contentType);
res.end(await register.metrics());
} catch (ex) {
res.status(500).end(ex);
const app = App();
const pingInterval = parseInt(process.env.PING_INTERVAL) || 25000;
const getCompressionConfig = function () {
// WS: The theoretical overhead per socket is 19KB (11KB for compressor and 8KB for decompressor)
let perMessageDeflate = false;
if (process.env.COMPRESSION === "true") {
logger.info(`WS compression: enabled`);
perMessageDeflate = {
zlibDeflateOptions: {
windowBits: 10,
memLevel: 1
},
zlibInflateOptions: {
windowBits: 10
}
}
} else {
logger.info(`WS compression: disabled`);
}
return {
perMessageDeflate: perMessageDeflate,
clientNoContextTakeover: true
};
}
const io = new Server({
maxHttpBufferSize: (parseFloat(process.env.maxHttpBufferSize) || 5) * 1e6,
pingInterval: pingInterval, // Will use it for cache invalidation
cors: {
origin: "*", // Allow connections from any origin (for development)
methods: ["GET", "POST"],
credentials: true
},
path: '/socket',
...getCompressionConfig()
});
io.use(async (socket, next) => await authorizer.check(socket, next));
io.on('connection', (socket) => onConnect(socket));
io.attachApp(app);
setSocketIOServer(io);
const HOST = process.env.LISTEN_HOST || '0.0.0.0';
const PORT = process.env.LISTEN_PORT || 9001;
assert.ok(process.env.ASSIST_KEY, 'The "ASSIST_KEY" environment variable is required');
const P_KEY = process.env.ASSIST_KEY;
const PREFIX = process.env.PREFIX || process.env.prefix || `/assist`;
const heapdump = process.env.heapdump === "1";
if (process.env.uws !== "true") {
let wsapp = express();
wsapp.use(express.json());
wsapp.use(express.urlencoded({extended: true}));
wsapp.use(request_logger("[wsapp]"));
wsapp.get(['/', PREFIX, `${PREFIX}/`, `${PREFIX}/${P_KEY}`, `${PREFIX}/${P_KEY}/`], (req, res) => {
res.statusCode = 200;
res.end("ok!");
}
);
heapdump && wsapp.use(`${PREFIX}/${P_KEY}/heapdump`, dumps.router);
wsapp.use(`${PREFIX}/${P_KEY}`, socket.wsRouter);
wsapp.enable('trust proxy');
const wsserver = wsapp.listen(PORT, HOST, () => {
logger.info(`WS App listening on http://${HOST}:${PORT}`);
health.healthApp.listen(health.PORT, HOST, health.listen_cb);
});
socket.start(wsserver);
module.exports = {wsserver};
} else {
logger.info("Using uWebSocket");
const {App} = require("uWebSockets.js");
const uapp = new App();
const healthFn = (res, req) => {
res.writeStatus('200 OK').end('ok!');
const PORT = parseInt(process.env.PORT) || 9001;
app.listen(PORT, (token) => {
if (token) {
console.log(`Server running at http://${HOST}:${PORT}`);
} else {
console.log(`Failed to listen on port ${PORT}`);
}
uapp.get('/', healthFn);
uapp.get(PREFIX, healthFn);
uapp.get(`${PREFIX}/`, healthFn);
uapp.get(`${PREFIX}/${P_KEY}`, healthFn);
uapp.get(`${PREFIX}/${P_KEY}/`, healthFn);
});
startCacheRefresher(io);
/* Either onAborted or simply finished request */
const onAbortedOrFinishedResponse = function (res) {
if (res.id === -1) {
logger.debug("ERROR! onAbortedOrFinishedResponse called twice for the same res!");
} else {
logger.debug('Stream was closed');
}
/* Mark this response already accounted for */
res.id = -1;
}
const uWrapper = function (fn) {
return (res, req) => {
res.id = 1;
res.aborted = false;
req.startTs = performance.now(); // track request's start timestamp
req.method = req.getMethod();
res.onAborted(() => {
res.aborted = true;
onAbortedOrFinishedResponse(res);
});
return fn(req, res);
}
}
uapp.get(`${PREFIX}/${P_KEY}/sockets-list/:projectKey/autocomplete`, uWrapper(socket.handlers.autocomplete));
uapp.get(`${PREFIX}/${P_KEY}/sockets-list/:projectKey/:sessionId`, uWrapper(socket.handlers.socketsListByProject));
uapp.get(`${PREFIX}/${P_KEY}/sockets-live/:projectKey/autocomplete`, uWrapper(socket.handlers.autocomplete));
uapp.get(`${PREFIX}/${P_KEY}/sockets-live/:projectKey`, uWrapper(socket.handlers.socketsLiveByProject));
uapp.post(`${PREFIX}/${P_KEY}/sockets-live/:projectKey`, uWrapper(socket.handlers.socketsLiveByProject));
uapp.get(`${PREFIX}/${P_KEY}/sockets-live/:projectKey/:sessionId`, uWrapper(socket.handlers.socketsLiveBySession));
socket.start(uapp);
uapp.listen(HOST, PORT, (token) => {
if (!token) {
logger.error("port already in use");
}
logger.info(`WS App listening on http://${HOST}:${PORT}`);
health.healthApp.listen(health.PORT, HOST, health.listen_cb);
});
process.on('uncaughtException', err => {
logger.error(`Uncaught Exception: ${err}`);
});
module.exports = {uapp};
}
process.on('uncaughtException', err => {
logger.error(`Uncaught Exception: ${err}`);
});

View file

@ -1,64 +0,0 @@
const express = require('express');
const {
socketConnexionTimeout,
authorizer
} = require('../utils/assistHelper');
const {
createSocketIOServer
} = require('../utils/wsServer');
const {
onConnect
} = require('../utils/socketHandlers');
const {
socketsListByProject,
socketsLiveByProject,
socketsLiveBySession,
autocomplete
} = require('../utils/httpHandlers');
const {logger} = require('../utils/logger');
const {createAdapter} = require("@socket.io/redis-adapter");
const {createClient} = require("redis");
const REDIS_URL = (process.env.REDIS_URL || "localhost:6379").replace(/((^\w+:|^)\/\/|^)/, 'redis://');
const pubClient = createClient({url: REDIS_URL});
const subClient = pubClient.duplicate();
logger.info(`Using Redis: ${REDIS_URL}`);
const wsRouter = express.Router();
wsRouter.get(`/sockets-list/:projectKey/autocomplete`, autocomplete); // autocomplete
wsRouter.get(`/sockets-list/:projectKey/:sessionId`, socketsListByProject); // is_live
wsRouter.get(`/sockets-live/:projectKey/autocomplete`, autocomplete); // not using
wsRouter.get(`/sockets-live/:projectKey`, socketsLiveByProject);
wsRouter.post(`/sockets-live/:projectKey`, socketsLiveByProject); // assist search
wsRouter.get(`/sockets-live/:projectKey/:sessionId`, socketsLiveBySession); // session_exists, get_live_session_by_id
let io;
module.exports = {
wsRouter,
start: (server, prefix) => {
io = createSocketIOServer(server, prefix);
io.use(async (socket, next) => await authorizer.check(socket, next));
io.on('connection', (socket) => onConnect(socket));
logger.info("WS server started");
socketConnexionTimeout(io);
Promise.all([pubClient.connect(), subClient.connect()])
.then(() => {
io.adapter(createAdapter(pubClient, subClient,
{requestsTimeout: process.env.REDIS_REQUESTS_TIMEOUT || 5000}));
logger.info("> redis connected.");
})
.catch((err) => {
logger.error(`redis connection error: ${err}`);
process.exit(2);
});
},
handlers: {
socketsListByProject,
socketsLiveByProject,
socketsLiveBySession,
autocomplete
}
};

View file

@ -1,45 +0,0 @@
const express = require('express');
const {
socketConnexionTimeout,
authorizer
} = require('../utils/assistHelper');
const {
createSocketIOServer
} = require('../utils/wsServer');
const {
onConnect
} = require('../utils/socketHandlers');
const {
socketsListByProject,
socketsLiveByProject,
socketsLiveBySession,
autocomplete
} = require('../utils/httpHandlers');
const {logger} = require('../utils/logger');
const wsRouter = express.Router();
wsRouter.get(`/sockets-list/:projectKey/autocomplete`, autocomplete); // autocomplete
wsRouter.get(`/sockets-list/:projectKey/:sessionId`, socketsListByProject); // is_live
wsRouter.get(`/sockets-live/:projectKey/autocomplete`, autocomplete); // not using
wsRouter.get(`/sockets-live/:projectKey`, socketsLiveByProject);
wsRouter.post(`/sockets-live/:projectKey`, socketsLiveByProject); // assist search
wsRouter.get(`/sockets-live/:projectKey/:sessionId`, socketsLiveBySession); // session_exists, get_live_session_by_id
let io;
module.exports = {
wsRouter,
start: (server, prefix) => {
io = createSocketIOServer(server, prefix);
io.use(async (socket, next) => await authorizer.check(socket, next));
io.on('connection', (socket) => onConnect(socket));
logger.info("WS server started");
socketConnexionTimeout(io);
},
handlers: {
socketsListByProject,
socketsLiveByProject,
socketsLiveBySession,
autocomplete
}
};

View file

@ -1,13 +0,0 @@
const {
extractProjectKeyFromRequest,
extractSessionIdFromRequest,
extractPayloadFromRequest,
getAvailableRooms
} = require('../utils/helper-ee');
module.exports = {
extractProjectKeyFromRequest,
extractSessionIdFromRequest,
extractPayloadFromRequest,
getAvailableRooms
}

View file

@ -1,126 +0,0 @@
const uWS = require("uWebSockets.js");
const helper = require('./helper');
const {logger} = require('./logger');
const getBodyFromUWSResponse = async function (res) {
return new Promise(((resolve, reject) => {
let buffer;
res.onData((ab, isLast) => {
let chunk = Buffer.from(ab);
if (buffer) {
buffer = Buffer.concat([buffer, chunk]);
} else {
buffer = Buffer.concat([chunk]);
}
if (isLast) {
let json;
try {
json = JSON.parse(buffer);
} catch (e) {
console.error(e);
json = {};
}
resolve(json);
}
});
}));
}
const extractProjectKeyFromRequest = function (req) {
if (process.env.uws === "true") {
if (req.getParameter(0)) {
logger.debug(`[WS]where projectKey=${req.getParameter(0)}`);
return req.getParameter(0);
}
} else {
return helper.extractProjectKeyFromRequest(req);
}
return undefined;
}
const extractSessionIdFromRequest = function (req) {
if (process.env.uws === "true") {
if (req.getParameter(1)) {
logger.debug(`[WS]where projectKey=${req.getParameter(1)}`);
return req.getParameter(1);
}
} else {
return helper.extractSessionIdFromRequest(req);
}
return undefined;
}
const extractPayloadFromRequest = async function (req, res) {
let filters = {
"query": {},
"filter": {}
};
if (process.env.uws === "true") {
if (req.getQuery("q")) {
logger.debug(`[WS]where q=${req.getQuery("q")}`);
filters.query.value = req.getQuery("q");
}
if (req.getQuery("key")) {
logger.debug(`[WS]where key=${req.getQuery("key")}`);
filters.query.key = req.getQuery("key");
}
if (req.getQuery("userId")) {
logger.debug(`[WS]where userId=${req.getQuery("userId")}`);
filters.filter.userID = [req.getQuery("userId")];
}
if (!filters.query.value) {
let body = {};
if (req.getMethod() !== 'get') {
body = await getBodyFromUWSResponse(res);
}
filters = {
...filters,
"sort": {
"key": body.sort && body.sort.key ? body.sort.key : undefined,
"order": body.sort && body.sort.order === "DESC"
},
"pagination": {
"limit": body.pagination && body.pagination.limit ? body.pagination.limit : undefined,
"page": body.pagination && body.pagination.page ? body.pagination.page : undefined
}
}
filters.filter = {...filters.filter, ...(body.filter || {})};
}
} else {
return helper.extractPayloadFromRequest(req);
}
filters.filter = helper.objectToObjectOfArrays(filters.filter);
filters.filter = helper.transformFilters(filters.filter);
logger.debug("payload/filters:" + JSON.stringify(filters))
return Object.keys(filters).length > 0 ? filters : undefined;
}
const getAvailableRooms = async function (io) {
if (process.env.redis === "true") {
return io.of('/').adapter.allRooms();
} else {
return helper.getAvailableRooms(io);
}
}
const getCompressionConfig = function () {
if (process.env.uws !== "true") {
return helper.getCompressionConfig();
} else {
// uWS: The theoretical overhead per socket is 32KB (8KB for compressor and for 24KB decompressor)
if (process.env.COMPRESSION === "true") {
console.log(`uWS compression: enabled`);
return {
compression: uWS.DEDICATED_COMPRESSOR_8KB,
decompression: uWS.DEDICATED_DECOMPRESSOR_1KB
};
} else {
console.log(`uWS compression: disabled`);
return {};
}
}
}
module.exports = {
extractProjectKeyFromRequest,
extractSessionIdFromRequest,
extractPayloadFromRequest,
getCompressionConfig,
getAvailableRooms
};

View file

@ -1,234 +0,0 @@
const statsHost = process.env.STATS_HOST || 'http://assist-stats-openreplay.app.svc.cluster.local:8000/events';
const authToken = process.env.STATS_AUTH_TOKEN || '';
const {logger} = require('./logger');
class InMemoryCache {
constructor() {
this.cache = new Map();
}
set(key, value) {
this.cache.set(key, value);
}
get(key) {
return this.cache.get(key);
}
delete(key) {
this.cache.delete(key);
}
clear() {
this.cache.clear();
}
}
const cache = new InMemoryCache();
async function postData(payload) {
let headers = {
'Content-Type': 'application/json'
};
if (authToken && authToken.trim() !== '') {
headers['Authorization'] = 'Bearer ' + authToken;
}
const options = {
method: 'POST',
body: JSON.stringify(payload),
headers: headers,
}
try {
const response = await fetch(statsHost, options)
const jsonResponse = await response.json();
logger.debug('JSON response', JSON.stringify(jsonResponse, null, 4))
} catch(err) {
logger.debug('ERROR', err);
}
}
function startAssist(socket, agentID) {
const tsNow = +new Date();
const eventID = `${socket.handshake.query.sessId}_${agentID}_assist_${tsNow}`;
void postData({
"project_id": socket.handshake.query.projectId,
"session_id": socket.handshake.query.sessId,
"agent_id": agentID,
"event_id": eventID,
"event_type": "assist",
"event_state": "start",
"timestamp": tsNow,
});
// Save uniq eventID to cache
cache.set(`${socket.handshake.query.sessId}_${agentID}_assist`, eventID);
// Debug log
logger.debug(`assist_started, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${tsNow}`);
}
function endAssist(socket, agentID) {
const eventID = cache.get(`${socket.handshake.query.sessId}_${agentID}_assist`);
if (eventID === undefined) {
logger.debug(`have to skip assist_ended, no eventID in the cache, agentID: ${socket.handshake.query.agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}`);
return
}
void postData({
"project_id": socket.handshake.query.projectId,
"session_id": socket.handshake.query.sessId,
"agent_id": agentID,
"event_id": eventID,
"event_type": "assist",
"event_state": "end",
"timestamp": +new Date(),
})
// Remove eventID from cache
cache.delete(`${socket.handshake.query.sessId}_${agentID}_assist`);
// Debug logs
logger.debug(`assist_ended, agentID: ${socket.handshake.query.agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}`);
}
function startCall(socket, agentID) {
const tsNow = +new Date();
const eventID = `${socket.handshake.query.sessId}_${agentID}_call_${tsNow}`;
void postData({
"project_id": socket.handshake.query.projectId,
"session_id": socket.handshake.query.sessId,
"agent_id": agentID,
"event_id": eventID,
"event_type": "call",
"event_state": "start",
"timestamp": tsNow,
});
// Save uniq eventID to cache
cache.set(`${socket.handshake.query.sessId}_call`, eventID);
// Debug logs
logger.debug(`s_call_started, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${tsNow}`);
}
function endCall(socket, agentID) {
const tsNow = +new Date();
const eventID = cache.get(`${socket.handshake.query.sessId}_call`);
if (eventID === undefined) {
logger.debug(`have to skip s_call_ended, no eventID in the cache, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${tsNow}`);
return
}
void postData({
"project_id": socket.handshake.query.projectId,
"session_id": socket.handshake.query.sessId,
"agent_id": agentID,
"event_id": eventID,
"event_type": "call",
"event_state": "end",
"timestamp": tsNow,
});
cache.delete(`${socket.handshake.query.sessId}_call`)
// Debug logs
logger.debug(`s_call_ended, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${tsNow}`);
}
function startControl(socket, agentID) {
const tsNow = +new Date();
const eventID = `${socket.handshake.query.sessId}_${agentID}_control_${tsNow}`;
void postData({
"project_id": socket.handshake.query.projectId,
"session_id": socket.handshake.query.sessId,
"agent_id": agentID,
"event_id": eventID,
"event_type": "control",
"event_state": "start",
"timestamp": tsNow,
});
cache.set(`${socket.handshake.query.sessId}_control`, eventID)
// Debug logs
logger.debug(`s_control_started, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${+new Date()}`);
}
function endControl(socket, agentID) {
const tsNow = +new Date();
const eventID = cache.get(`${socket.handshake.query.sessId}_control`);
if (eventID === undefined) {
logger.debug(`have to skip s_control_ended, no eventID in the cache, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${tsNow}`);
return
}
void postData({
"project_id": socket.handshake.query.projectId,
"session_id": socket.handshake.query.sessId,
"agent_id": agentID,
"event_id": eventID,
"event_type": "control",
"event_state": "end",
"timestamp": tsNow,
});
cache.delete(`${socket.handshake.query.sessId}_control`)
// Debug logs
logger.debug(`s_control_ended, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${+new Date()}`);
}
function startRecord(socket, agentID) {
const tsNow = +new Date();
const eventID = `${socket.handshake.query.sessId}_${agentID}_record_${tsNow}`;
void postData({
"project_id": socket.handshake.query.projectId,
"session_id": socket.handshake.query.sessId,
"agent_id": agentID,
"event_id": eventID,
"event_type": "record",
"event_state": "start",
"timestamp": tsNow,
});
cache.set(`${socket.handshake.query.sessId}_record`, eventID)
// Debug logs
logger.debug(`s_recording_started, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${+new Date()}`);
}
function endRecord(socket, agentID) {
const tsNow = +new Date();
const eventID = cache.get(`${socket.sessId}_record`);
void postData({
"project_id": socket.handshake.query.projectId,
"session_id": socket.handshake.query.sessId,
"agent_id": agentID,
"event_id": eventID,
"event_type": "record",
"event_state": "end",
"timestamp": tsNow,
});
cache.delete(`${socket.handshake.query.sessId}_record`)
// Debug logs
logger.debug(`s_recording_ended, agentID: ${agentID}, sessID: ${socket.handshake.query.sessId}, projID: ${socket.handshake.query.projectId}, time: ${+new Date()}`);
}
function handleEvent(eventName, socket, agentID) {
switch (eventName) {
case "s_call_started": {
startCall(socket, agentID);
break;
}
case "s_call_ended": {
endCall(socket, agentID);
break;
}
case "s_control_started": {
startControl(socket, agentID)
break;
}
case "s_control_ended": {
endControl(socket, agentID)
break;
}
case "s_recording_started": {
startRecord(socket, agentID);
break;
}
case "s_recording_ended": {
endRecord(socket, agentID);
break;
}
}
}
module.exports = {
startAssist,
endAssist,
handleEvent,
}

View file

@ -1,107 +0,0 @@
const _io = require("socket.io");
const {getCompressionConfig} = require("./helper");
const {logger} = require('./logger');
let io;
const getServer = function () {return io;}
const useRedis = process.env.redis === "true";
let inMemorySocketsCache = [];
let lastCacheUpdateTime = 0;
const CACHE_REFRESH_INTERVAL = parseInt(process.env.cacheRefreshInterval) || 5000;
const doFetchAllSockets = async function () {
if (useRedis) {
const now = Date.now();
logger.info(`Using in-memory cache (age: ${now - lastCacheUpdateTime}ms)`);
return inMemorySocketsCache;
} else {
try {
return await io.fetchSockets();
} catch (error) {
logger.error('Error fetching sockets:', error);
return [];
}
}
}
// Background refresher that runs independently of requests
let cacheRefresher = null;
function startCacheRefresher() {
if (cacheRefresher) clearInterval(cacheRefresher);
cacheRefresher = setInterval(async () => {
const now = Date.now();
// Only refresh if cache is stale
if (now - lastCacheUpdateTime >= CACHE_REFRESH_INTERVAL) {
logger.debug('Background refresh triggered');
try {
const startTime = performance.now();
const result = await io.fetchSockets();
inMemorySocketsCache = result;
lastCacheUpdateTime = now;
const duration = performance.now() - startTime;
logger.info(`Background refresh complete: ${duration}ms, ${result.length} sockets`);
} catch (error) {
logger.error(`Background refresh error: ${error}`);
}
}
}, CACHE_REFRESH_INTERVAL / 2);
}
const processSocketsList = function (sockets) {
let res = []
for (let socket of sockets) {
let {handshake} = socket;
res.push({handshake});
}
return res
}
const fetchSockets = async function (roomID) {
if (!io) {
return [];
}
if (!roomID) {
return await doFetchAllSockets();
}
return await io.in(roomID).fetchSockets();
}
const createSocketIOServer = function (server, prefix) {
if (io) {
return io;
}
if (process.env.uws !== "true") {
io = _io(server, {
maxHttpBufferSize: (parseFloat(process.env.maxHttpBufferSize) || 5) * 1e6,
cors: {
origin: "*",
methods: ["GET", "POST", "PUT"],
credentials: true
},
path: (prefix ? prefix : '') + '/socket',
...getCompressionConfig()
});
} else {
io = new _io.Server({
maxHttpBufferSize: (parseFloat(process.env.maxHttpBufferSize) || 5) * 1e6,
cors: {
origin: "*",
methods: ["GET", "POST", "PUT"],
credentials: true
},
path: (prefix ? prefix : '') + '/socket',
...getCompressionConfig()
});
io.attachApp(server);
}
startCacheRefresher();
return io;
}
module.exports = {
createSocketIOServer,
getServer,
fetchSockets,
}

View file

@ -0,0 +1,57 @@
package main
import (
"context"
assistConfig "openreplay/backend/internal/config/assist"
"openreplay/backend/pkg/assist"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/db/redis"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/metrics"
databaseMetrics "openreplay/backend/pkg/metrics/database"
"openreplay/backend/pkg/metrics/web"
"openreplay/backend/pkg/server"
"openreplay/backend/pkg/server/api"
)
func main() {
ctx := context.Background()
log := logger.New()
cfg := assistConfig.New(log)
// Observability
webMetrics := web.New("assist")
dbMetric := databaseMetrics.New("assist")
metrics.New(log, append(webMetrics.List(), dbMetric.List()...))
if cfg.AssistKey == "" {
log.Fatal(ctx, "assist key is not set")
}
pgConn, err := pool.New(dbMetric, cfg.Postgres.String())
if err != nil {
log.Fatal(ctx, "can't init postgres connection: %s", err)
}
defer pgConn.Close()
redisClient, err := redis.New(&cfg.Redis)
if err != nil {
log.Fatal(ctx, "can't init redis connection: %s", err)
}
defer redisClient.Close()
prefix := api.NoPrefix
builder, err := assist.NewServiceBuilder(log, cfg, webMetrics, dbMetric, pgConn, redisClient)
if err != nil {
log.Fatal(ctx, "can't init services: %s", err)
}
router, err := api.NewRouter(&cfg.HTTP, log)
if err != nil {
log.Fatal(ctx, "failed while creating router: %s", err)
}
router.AddHandlers(prefix, builder.AssistAPI)
router.AddMiddlewares(builder.RateLimiter.Middleware)
server.Run(ctx, log, &cfg.HTTP, router)
}

View file

@ -0,0 +1,30 @@
package assist
import (
"time"
"openreplay/backend/internal/config/common"
"openreplay/backend/internal/config/configurator"
"openreplay/backend/internal/config/redis"
"openreplay/backend/pkg/env"
"openreplay/backend/pkg/logger"
)
type Config struct {
common.Config
common.Postgres
redis.Redis
common.HTTP
ProjectExpiration time.Duration `env:"PROJECT_EXPIRATION,default=10m"`
AssistKey string `env:"ASSIST_KEY"`
CacheTTL time.Duration `env:"REDIS_CACHE_TTL,default=5s"`
BatchSize int `env:"REDIS_BATCH_SIZE,default=1000"`
ScanSize int64 `env:"REDIS_SCAN_SIZE,default=1000"`
WorkerID uint16
}
func New(log logger.Logger) *Config {
cfg := &Config{WorkerID: env.WorkerID()}
configurator.Process(log, cfg)
return cfg
}

View file

@ -0,0 +1,207 @@
package api
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"time"
"github.com/gorilla/mux"
assistAPI "openreplay/backend/internal/config/assist"
"openreplay/backend/pkg/assist/service"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/server/api"
"openreplay/backend/pkg/sessionmanager"
)
type handlersImpl struct {
cfg *assistAPI.Config
log logger.Logger
responser *api.Responser
jsonSizeLimit int64
assist service.Assist
}
func NewHandlers(log logger.Logger, cfg *assistAPI.Config, responser *api.Responser, assist service.Assist) (api.Handlers, error) {
return &handlersImpl{
cfg: cfg,
log: log,
responser: responser,
jsonSizeLimit: cfg.JsonSizeLimit,
assist: assist,
}, nil
}
func (e *handlersImpl) GetAll() []*api.Description {
keyPrefix := "/assist"
if e.cfg.AssistKey != "" {
keyPrefix = fmt.Sprintf("/assist/%s", e.cfg.AssistKey)
}
return []*api.Description{
{keyPrefix + "/sockets-list/{projectKey}/autocomplete", e.autocomplete, "GET"}, // event search with live=true
{keyPrefix + "/sockets-list/{projectKey}/{sessionId}", e.socketsListByProject, "GET"}, // is_live for getReplay call
{keyPrefix + "/sockets-live/{projectKey}", e.socketsLiveByProject, "POST"}, // handler /{projectId}/assist/sessions for co-browser
{keyPrefix + "/sockets-live/{projectKey}/{sessionId}", e.socketsLiveBySession, "GET"}, // for get_live_session (with data) and for session_exists
{"/v1/ping", e.ping, "GET"},
}
}
func (e *handlersImpl) ping(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}
func getProjectKey(r *http.Request) (string, error) {
vars := mux.Vars(r)
key := vars["projectKey"]
if key == "" {
return "", fmt.Errorf("empty project key")
}
return key, nil
}
func getSessionID(r *http.Request) (string, error) {
vars := mux.Vars(r)
key := vars["sessionId"]
if key == "" {
return "", fmt.Errorf("empty session ID")
}
return key, nil
}
func getQuery(r *http.Request) (*service.Query, error) {
params := r.URL.Query()
q := &service.Query{
Key: params.Get("key"),
Value: params.Get("q"),
}
if q.Key == "" || q.Value == "" {
return nil, fmt.Errorf("empty key or value")
}
return q, nil
}
func (e *handlersImpl) autocomplete(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
projectKey, err := getProjectKey(r)
if err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
query, err := getQuery(r)
if err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
resp, err := e.assist.Autocomplete(projectKey, query)
if err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
response := map[string]interface{}{
"data": resp,
}
e.responser.ResponseWithJSON(e.log, r.Context(), w, response, startTime, r.URL.Path, bodySize)
}
func (e *handlersImpl) socketsListByProject(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
projectKey, err := getProjectKey(r)
if err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
sessionID, err := getSessionID(r)
if err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
resp, err := e.assist.GetByID(projectKey, sessionID)
if err != nil {
if errors.Is(err, sessionmanager.ErrSessionNotFound) {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusNotFound, err, startTime, r.URL.Path, bodySize)
} else if errors.Is(err, sessionmanager.ErrSessionNotBelongToProject) {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusForbidden, err, startTime, r.URL.Path, bodySize)
} else {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
}
return
}
response := map[string]interface{}{
"data": resp,
}
e.responser.ResponseWithJSON(e.log, r.Context(), w, response, startTime, r.URL.Path, bodySize)
}
func (e *handlersImpl) socketsLiveByProject(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
projectKey, err := getProjectKey(r)
if err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
bodyBytes, err := api.ReadBody(e.log, w, r, e.jsonSizeLimit)
if err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusRequestEntityTooLarge, err, startTime, r.URL.Path, bodySize)
return
}
e.log.Debug(context.Background(), "bodyBytes: %s", bodyBytes)
bodySize = len(bodyBytes)
req := &service.Request{}
if err := json.Unmarshal(bodyBytes, req); err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
resp, err := e.assist.GetAll(projectKey, req)
if err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
response := map[string]interface{}{
"data": resp,
}
e.responser.ResponseWithJSON(e.log, r.Context(), w, response, startTime, r.URL.Path, bodySize)
}
func (e *handlersImpl) socketsLiveBySession(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
projectKey, err := getProjectKey(r)
if err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
sessionID, err := getSessionID(r)
if err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
resp, err := e.assist.GetByID(projectKey, sessionID)
if err != nil {
if errors.Is(err, sessionmanager.ErrSessionNotFound) {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusNotFound, err, startTime, r.URL.Path, bodySize)
} else if errors.Is(err, sessionmanager.ErrSessionNotBelongToProject) {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusForbidden, err, startTime, r.URL.Path, bodySize)
} else {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
}
return
}
response := map[string]interface{}{
"data": resp,
}
e.responser.ResponseWithJSON(e.log, r.Context(), w, response, startTime, r.URL.Path, bodySize)
}

View file

@ -0,0 +1,42 @@
package assist
import (
"time"
"openreplay/backend/internal/config/assist"
assistAPI "openreplay/backend/pkg/assist/api"
"openreplay/backend/pkg/assist/service"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/db/redis"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/metrics/database"
"openreplay/backend/pkg/metrics/web"
"openreplay/backend/pkg/projects"
"openreplay/backend/pkg/server/api"
"openreplay/backend/pkg/server/limiter"
"openreplay/backend/pkg/sessionmanager"
)
type ServicesBuilder struct {
RateLimiter *limiter.UserRateLimiter
AssistAPI api.Handlers
}
func NewServiceBuilder(log logger.Logger, cfg *assist.Config, webMetrics web.Web, dbMetrics database.Database, pgconn pool.Pool, redis *redis.Client) (*ServicesBuilder, error) {
projectsManager := projects.New(log, pgconn, redis, dbMetrics)
sessManager, err := sessionmanager.New(log, cfg, redis.Redis)
if err != nil {
return nil, err
}
sessManager.Start()
assistManager := service.NewAssist(log, pgconn, projectsManager, sessManager)
responser := api.NewResponser(webMetrics)
handlers, err := assistAPI.NewHandlers(log, cfg, responser, assistManager)
if err != nil {
return nil, err
}
return &ServicesBuilder{
RateLimiter: limiter.NewUserRateLimiter(10, 30, 1*time.Minute, 5*time.Minute),
AssistAPI: handlers,
}, nil
}

View file

@ -0,0 +1,119 @@
package service
import (
"fmt"
"strconv"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/projects"
"openreplay/backend/pkg/sessionmanager"
)
type assistImpl struct {
log logger.Logger
pgconn pool.Pool
projects projects.Projects
sessions sessionmanager.SessionManager
}
type Assist interface {
Autocomplete(projectKey string, query *Query) (interface{}, error)
IsLive(projectKey, sessionID string) (bool, error)
GetAll(projectKey string, filters *Request) (interface{}, error)
GetByID(projectKey, sessionID string) (interface{}, error)
}
func NewAssist(log logger.Logger, pgconn pool.Pool, projects projects.Projects, sessions sessionmanager.SessionManager) Assist {
return &assistImpl{
log: log,
pgconn: pgconn,
projects: projects,
sessions: sessions,
}
}
func (a *assistImpl) Autocomplete(projectKey string, query *Query) (interface{}, error) {
switch {
case projectKey == "":
return nil, fmt.Errorf("project key is required")
case query == nil:
return nil, fmt.Errorf("query is required")
case query.Key == "":
return nil, fmt.Errorf("query key is required")
case query.Value == "":
return nil, fmt.Errorf("query value is required")
}
project, err := a.projects.GetProjectByKey(projectKey)
if err != nil {
return nil, fmt.Errorf("failed to get project by key: %s", err)
}
return a.sessions.Autocomplete(strconv.Itoa(int(project.ProjectID)), sessionmanager.FilterType(query.Key), query.Value)
}
func (a *assistImpl) IsLive(projectKey, sessionID string) (bool, error) {
switch {
case projectKey == "":
return false, fmt.Errorf("project key is required")
case sessionID == "":
return false, fmt.Errorf("session ID is required")
}
project, err := a.projects.GetProjectByKey(projectKey)
if err != nil {
return false, fmt.Errorf("failed to get project by key: %s", err)
}
sess, err := a.sessions.GetByID(strconv.Itoa(int(project.ProjectID)), sessionID)
if err != nil {
return false, fmt.Errorf("failed to get session by ID: %s", err)
}
return sess != nil, nil
}
func (a *assistImpl) GetAll(projectKey string, request *Request) (interface{}, error) {
switch {
case projectKey == "":
return nil, fmt.Errorf("project key is required")
case request == nil:
return nil, fmt.Errorf("filters are required")
}
project, err := a.projects.GetProjectByKey(projectKey)
if err != nil {
return nil, fmt.Errorf("failed to get project by key: %s", err)
}
order := sessionmanager.Asc
if request.Sort.Order == "DESC" {
order = sessionmanager.Desc
}
filters := make([]*sessionmanager.Filter, 0, len(request.Filters))
for name, f := range request.Filters {
filters = append(filters, &sessionmanager.Filter{
Type: sessionmanager.FilterType(name),
Value: f.Value,
Operator: f.Operator == "is",
})
}
sessions, total, counter, err := a.sessions.GetAll(strconv.Itoa(int(project.ProjectID)), filters, order, request.Pagination.Page, request.Pagination.Limit)
if err != nil {
return nil, fmt.Errorf("failed to get sessions: %s", err)
}
resp := map[string]interface{}{
"total": total,
"counter": counter,
"sessions": sessions,
}
return resp, nil
}
func (a *assistImpl) GetByID(projectKey, sessionID string) (interface{}, error) {
switch {
case projectKey == "":
return nil, fmt.Errorf("project key is required")
case sessionID == "":
return nil, fmt.Errorf("session ID is required")
}
project, err := a.projects.GetProjectByKey(projectKey)
if err != nil {
return nil, fmt.Errorf("failed to get project by key: %s", err)
}
return a.sessions.GetByID(strconv.Itoa(int(project.ProjectID)), sessionID)
}

View file

@ -0,0 +1,27 @@
package service
type Query struct {
Key string
Value string
}
type Filter struct {
Value []string `json:"values"`
Operator string `json:"operator"` // is|contains
}
type Pagination struct {
Limit int `json:"limit"`
Page int `json:"page"`
}
type Sort struct {
Key string `json:"key"` // useless
Order string `json:"order"` // [ASC|DESC]
}
type Request struct {
Filters map[string]Filter `json:"filter"`
Pagination Pagination `json:"pagination"`
Sort Sort `json:"sort"`
}

View file

@ -0,0 +1,589 @@
package sessionmanager
import (
"context"
"encoding/json"
"errors"
"fmt"
"sort"
"strings"
"sync"
"time"
"github.com/go-redis/redis"
"openreplay/backend/internal/config/assist"
"openreplay/backend/pkg/logger"
)
const (
NodeKeyPattern = "assist:nodes:*"
ActiveSessionPrefix = "assist:online_sessions:"
RecentlyUpdatedSessions = "assist:updated_sessions"
)
type SessionData struct {
Timestamp uint64 `json:"timestamp"`
ProjectID string `json:"projectID"`
SessionID string `json:"sessionID"`
UserID *string `json:"userID"`
UserUUID *string `json:"userUUID"`
UserOS *string `json:"userOs"`
UserBrowser *string `json:"userBrowser"`
UserDevice *string `json:"userDevice"`
UserPlatform *string `json:"userDeviceType"` // is
UserCountry *string `json:"userCountry"` // is
UserState *string `json:"userState"` // is
UserCity *string `json:"userCity"` // is
Metadata *map[string]string `json:"metadata"` // contains
IsActive bool `json:"active"`
Raw interface{}
}
type SessionManager interface {
Start()
Stop()
GetByID(projectID, sessionID string) (interface{}, error)
GetAll(projectID string, filters []*Filter, sort SortOrder, page, limit int) ([]interface{}, int, map[string]map[string]int, error)
Autocomplete(projectID string, key FilterType, value string) ([]interface{}, error)
}
type sessionManagerImpl struct {
ctx context.Context
log logger.Logger
client *redis.Client
ticker *time.Ticker
wg *sync.WaitGroup
stopChan chan struct{}
mutex *sync.RWMutex
cache map[string]*SessionData
sorted []*SessionData
batchSize int
scanSize int64
}
func New(log logger.Logger, cfg *assist.Config, redis *redis.Client) (SessionManager, error) {
switch {
case cfg == nil:
return nil, fmt.Errorf("config is required")
case log == nil:
return nil, fmt.Errorf("logger is required")
case redis == nil:
return nil, fmt.Errorf("redis client is required")
}
sm := &sessionManagerImpl{
ctx: context.Background(),
log: log,
client: redis,
ticker: time.NewTicker(cfg.CacheTTL),
wg: &sync.WaitGroup{},
stopChan: make(chan struct{}),
mutex: &sync.RWMutex{},
cache: make(map[string]*SessionData),
sorted: make([]*SessionData, 0),
batchSize: cfg.BatchSize,
scanSize: cfg.ScanSize,
}
return sm, nil
}
func (sm *sessionManagerImpl) Start() {
sm.log.Debug(sm.ctx, "Starting session manager...")
go func() {
sm.loadSessions()
for {
select {
case <-sm.ticker.C:
sm.updateSessions()
case <-sm.stopChan:
sm.log.Debug(sm.ctx, "Stopping session manager...")
return
}
}
}()
}
func (sm *sessionManagerImpl) Stop() {
close(sm.stopChan)
sm.ticker.Stop()
sm.wg.Wait()
if err := sm.client.Close(); err != nil {
sm.log.Debug(sm.ctx, "Error closing Redis connection: %v", err)
}
sm.log.Debug(sm.ctx, "Session manager stopped")
}
func (sm *sessionManagerImpl) getNodeIDs() ([]string, error) {
var nodeIDs = make([]string, 0, 16) // Let's assume we have at most 16 nodes
var cursor uint64 = 0
for {
keys, nextCursor, err := sm.client.Scan(cursor, NodeKeyPattern, 100).Result()
if err != nil {
return nil, fmt.Errorf("scan failed: %v", err)
}
for _, key := range keys {
nodeIDs = append(nodeIDs, key)
}
cursor = nextCursor
if cursor == 0 {
break
}
}
return nodeIDs, nil
}
func (sm *sessionManagerImpl) getAllNodeSessions(nodeIDs []string) map[string]struct{} {
allSessionIDs := make(map[string]struct{})
var mu sync.Mutex
var wg sync.WaitGroup
for _, nodeID := range nodeIDs {
wg.Add(1)
go func(id string) {
defer wg.Done()
sessionListJSON, err := sm.client.Get(id).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
return
}
sm.log.Debug(sm.ctx, "Error getting session list for node %s: %v", id, err)
return
}
var sessionList []string
if err = json.Unmarshal([]byte(sessionListJSON), &sessionList); err != nil {
sm.log.Debug(sm.ctx, "Error unmarshalling session list for node %s: %v", id, err)
return
}
mu.Lock()
for _, sessionID := range sessionList {
allSessionIDs[sessionID] = struct{}{}
}
mu.Unlock()
}(nodeID)
}
wg.Wait()
return allSessionIDs
}
func (sm *sessionManagerImpl) getOnlineSessionIDs() (map[string]struct{}, error) {
nodeIDs, err := sm.getNodeIDs()
if err != nil {
sm.log.Debug(sm.ctx, "Error getting node IDs: %v", err)
return nil, err
}
sm.log.Debug(sm.ctx, "Found %d nodes", len(nodeIDs))
allSessionIDs := sm.getAllNodeSessions(nodeIDs)
sm.log.Debug(sm.ctx, "Collected %d unique session IDs", len(allSessionIDs))
return allSessionIDs, nil
}
func (sm *sessionManagerImpl) getSessionData(sessionIDs []string) map[string]*SessionData {
sessionData := make(map[string]*SessionData, len(sessionIDs))
for i := 0; i < len(sessionIDs); i += sm.batchSize {
end := i + sm.batchSize
if end > len(sessionIDs) {
end = len(sessionIDs)
}
batch := sessionIDs[i:end]
keys := make([]string, len(batch))
for j, id := range batch {
keys[j] = ActiveSessionPrefix + id
}
results, err := sm.client.MGet(keys...).Result()
if err != nil {
sm.log.Debug(sm.ctx, "Error in MGET operation: %v", err)
continue // TODO: Handle the error
}
for j, result := range results {
if result == nil {
continue
}
strVal, ok := result.(string)
if !ok {
sm.log.Debug(sm.ctx, "Unexpected type for session data: %T", result)
continue
}
var data SessionData
if err := json.Unmarshal([]byte(strVal), &data); err != nil {
sm.log.Debug(sm.ctx, "Error unmarshalling session data: %v", err)
continue
}
raw := make(map[string]interface{})
if err := json.Unmarshal([]byte(strVal), &raw); err != nil {
sm.log.Debug(sm.ctx, "Error unmarshalling raw session data: %v", err)
continue
}
data.Raw = raw
sessionData[batch[j]] = &data
}
sm.log.Debug(sm.ctx, "Collected %d new sessions", len(results))
}
sm.wg.Wait()
return sessionData
}
func (sm *sessionManagerImpl) updateCache(sessionsToAdd map[string]*SessionData, sessionsToRemove []string) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
if sessionsToRemove != nil {
for _, sessID := range sessionsToRemove {
delete(sm.cache, sessID)
}
}
if sessionsToAdd == nil {
return
}
for sessID, session := range sessionsToAdd {
sm.cache[sessID] = session
}
sessionList := make([]*SessionData, 0, len(sm.cache))
for _, s := range sm.cache {
sessionList = append(sessionList, s)
}
sort.Slice(sessionList, func(i, j int) bool {
return sessionList[i].Timestamp < sessionList[j].Timestamp
})
sm.sorted = sessionList
}
func (sm *sessionManagerImpl) loadSessions() {
startTime := time.Now()
sm.log.Debug(sm.ctx, "Starting session processing cycle")
sessIDs, err := sm.getOnlineSessionIDs()
if err != nil {
sm.log.Debug(sm.ctx, "Error getting online session IDs: %v", err)
return
}
if len(sessIDs) == 0 {
sm.log.Debug(sm.ctx, "No sessions found for nodes")
return
}
allSessionIDsList := make([]string, 0, len(sessIDs))
for sessionID := range sessIDs {
allSessionIDsList = append(allSessionIDsList, sessionID)
}
sessionMap := sm.getSessionData(allSessionIDsList)
sm.updateCache(sessionMap, nil)
duration := time.Since(startTime)
sm.log.Debug(sm.ctx, "Session processing cycle completed in %v. Processed %d sessions", duration, len(sm.cache))
}
func (sm *sessionManagerImpl) getAllRecentlyUpdatedSessions() (map[string]struct{}, error) {
var (
cursor uint64
allIDs = make(map[string]struct{})
batchIDs []string
err error
)
for {
batchIDs, cursor, err = sm.client.SScan(RecentlyUpdatedSessions, cursor, "*", sm.scanSize).Result()
if err != nil {
sm.log.Debug(sm.ctx, "Error scanning updated session IDs: %v", err)
return nil, err
}
for _, id := range batchIDs {
allIDs[id] = struct{}{}
}
if cursor == 0 {
break
}
}
if len(allIDs) == 0 {
sm.log.Debug(sm.ctx, "No updated session IDs found")
return allIDs, nil
}
var sessionIDsSlice []interface{}
for id := range allIDs {
sessionIDsSlice = append(sessionIDsSlice, id)
}
removed := sm.client.SRem(RecentlyUpdatedSessions, sessionIDsSlice...).Val()
sm.log.Debug(sm.ctx, "Fetched and removed %d session IDs from updated_session_set", removed)
return allIDs, nil
}
func (sm *sessionManagerImpl) updateSessions() {
startTime := time.Now()
sm.log.Debug(sm.ctx, "Starting session processing cycle")
sessIDs, err := sm.getOnlineSessionIDs()
if err != nil {
sm.log.Debug(sm.ctx, "Error getting online session IDs: %v", err)
return
}
updatedSessIDs, err := sm.getAllRecentlyUpdatedSessions()
if err != nil {
sm.log.Debug(sm.ctx, "Error getting recently updated sessions: %v", err)
return
}
sm.mutex.RLock()
toAdd := make([]string, 0, len(updatedSessIDs))
if updatedSessIDs == nil {
updatedSessIDs = make(map[string]struct{})
}
for sessID, _ := range sessIDs {
if _, exists := sm.cache[sessID]; !exists {
updatedSessIDs[sessID] = struct{}{} // Add to updated sessions if not in cache
}
}
for sessID, _ := range updatedSessIDs {
toAdd = append(toAdd, sessID)
}
toRemove := make([]string, 0, len(sessIDs)/16)
for sessID, _ := range sm.cache {
if _, exists := sessIDs[sessID]; !exists {
toRemove = append(toRemove, sessID)
}
}
sm.mutex.RUnlock()
// Load full session data from Redis
newCache := sm.getSessionData(toAdd)
sm.updateCache(newCache, toRemove)
duration := time.Since(startTime)
sm.log.Debug(sm.ctx, "Session processing cycle completed in %v. Processed %d sessions", duration, len(sm.cache))
}
var ErrSessionNotFound = errors.New("session not found")
var ErrSessionNotBelongToProject = errors.New("session does not belong to the project")
func (sm *sessionManagerImpl) GetByID(projectID, sessionID string) (interface{}, error) {
if sessionID == "" {
return nil, fmt.Errorf("session ID is required")
}
sm.mutex.RLock()
defer sm.mutex.RUnlock()
sessionData, exists := sm.cache[sessionID]
if !exists {
return nil, ErrSessionNotFound
}
if sessionData.ProjectID != projectID {
return nil, ErrSessionNotBelongToProject
}
return sessionData.Raw, nil
}
func (sm *sessionManagerImpl) GetAll(projectID string, filters []*Filter, sortOrder SortOrder, page, limit int) ([]interface{}, int, map[string]map[string]int, error) {
if projectID == "" {
return nil, 0, nil, fmt.Errorf("project ID is required")
}
counter := make(map[string]map[string]int)
for _, filter := range filters {
if _, ok := counter[string(filter.Type)]; !ok {
counter[string(filter.Type)] = make(map[string]int)
}
for _, value := range filter.Value {
counter[string(filter.Type)][value] = 0
}
}
if page < 1 || limit < 1 {
page, limit = 1, 10
}
start := (page - 1) * limit
end := start + limit
result := make([]interface{}, 0, limit)
totalMatches := 0
doFiltering := func(session *SessionData) {
if session.ProjectID != projectID {
return // TODO: keep sessions separate by projectID
}
if matchesFilters(session, filters, counter) {
if totalMatches >= start && totalMatches < end {
result = append(result, session.Raw)
}
totalMatches++
}
}
sm.mutex.RLock()
defer sm.mutex.RUnlock()
if sortOrder == Asc {
for _, session := range sm.sorted {
doFiltering(session)
}
} else {
for i := len(sm.sorted) - 1; i >= 0; i-- {
doFiltering(sm.sorted[i])
}
}
return result, totalMatches, counter, nil
}
func matchesFilters(session *SessionData, filters []*Filter, counter map[string]map[string]int) bool {
if filters == nil || len(filters) == 0 {
return true
}
matchedFilters := make(map[string][]string, len(filters))
for _, filter := range filters {
name := string(filter.Type)
if _, ok := matchedFilters[name]; !ok {
matchedFilters[name] = make([]string, 0, len(filter.Value))
}
var value string
switch filter.Type {
case UserID:
if session.UserID != nil {
value = *session.UserID
}
case UserAnonymousID:
if session.UserUUID != nil {
value = *session.UserUUID
}
case UserOS:
if session.UserOS != nil {
value = *session.UserOS
}
case UserBrowser:
if session.UserBrowser != nil {
value = *session.UserBrowser
}
case UserDevice:
if session.UserDevice != nil {
value = *session.UserDevice
}
case UserPlatform:
if session.UserPlatform != nil {
value = *session.UserPlatform
}
case UserCountry:
if session.UserCountry != nil {
value = *session.UserCountry
}
case UserState:
if session.UserState != nil {
value = *session.UserState
}
case UserCity:
if session.UserCity != nil {
value = *session.UserCity
}
case IsActive:
if session.IsActive {
value = "true"
} else {
value = "false"
}
default:
if val, ok := (*session.Metadata)[name]; ok {
value = val
}
}
matched := false
for _, filterValue := range filter.Value {
if filter.Operator == Is && value != filterValue {
continue
} else if filter.Operator == Contains && !strings.Contains(strings.ToLower(value), strings.ToLower(filterValue)) {
continue
}
matched = true
matchedFilters[name] = append(matchedFilters[name], value)
}
if !matched {
return false
}
}
for values, filter := range matchedFilters {
for _, value := range filter {
counter[values][value]++
}
}
return true
}
func (sm *sessionManagerImpl) Autocomplete(projectID string, key FilterType, value string) ([]interface{}, error) {
matches := make(map[string]struct{}) // To ensure uniqueness
lowerValue := strings.ToLower(value)
sm.mutex.RLock()
defer sm.mutex.RUnlock()
for _, session := range sm.sorted {
if session.ProjectID != projectID {
continue
}
var fieldValue string
switch key {
case UserID:
if session.UserID != nil {
fieldValue = *session.UserID
}
case UserAnonymousID:
if session.UserUUID != nil {
fieldValue = *session.UserUUID
}
case UserOS:
if session.UserOS != nil {
fieldValue = *session.UserOS
}
case UserBrowser:
if session.UserBrowser != nil {
fieldValue = *session.UserBrowser
}
case UserDevice:
if session.UserDevice != nil {
fieldValue = *session.UserDevice
}
case UserState:
if session.UserState != nil {
fieldValue = *session.UserState
}
case UserCity:
if session.UserCity != nil {
fieldValue = *session.UserCity
}
default:
if v, ok := (*session.Metadata)[string(key)]; ok {
fieldValue = v
}
}
if fieldValue != "" && strings.Contains(strings.ToLower(fieldValue), lowerValue) {
matches[fieldValue] = struct{}{}
}
}
results := make([]interface{}, 0, len(matches))
keyName := strings.ToUpper(string(key))
type pair struct {
Type string `json:"type"`
Value string `json:"value"`
}
for k := range matches {
results = append(results, pair{Type: keyName, Value: k})
}
return results, nil
}

View file

@ -0,0 +1,37 @@
package sessionmanager
type SortOrder bool
const (
Asc SortOrder = true
Desc SortOrder = false
)
type FilterType string
const (
UserID FilterType = "userId"
UserAnonymousID = "userAnonymousId"
UserOS = "userOs"
UserBrowser = "userBrowser"
UserDevice = "userDevice"
UserPlatform = "platform"
UserCountry = "userCountry"
UserState = "userState"
UserCity = "userCity"
IsActive = "active"
)
type FilterOperator bool
const (
Is FilterOperator = true
Contains FilterOperator = false
)
type Filter struct {
Type FilterType
Value []string
Operator FilterOperator
Source string // for metadata only
}

View file

@ -0,0 +1,23 @@
# Patterns to ignore when building packages.
# This supports shell glob matching, relative path matching, and
# negation (prefixed with !). Only one pattern per line.
.DS_Store
# Common VCS dirs
.git/
.gitignore
.bzr/
.bzrignore
.hg/
.hgignore
.svn/
# Common backup files
*.swp
*.bak
*.tmp
*.orig
*~
# Various IDEs
.project
.idea/
*.tmproj
.vscode/

View file

@ -0,0 +1,24 @@
apiVersion: v2
name: assist-api
description: A Helm chart for Kubernetes
# A chart can be either an 'application' or a 'library' chart.
#
# Application charts are a collection of templates that can be packaged into versioned archives
# to be deployed.
#
# Library charts provide useful utilities or functions for the chart developer. They're included as
# a dependency of application charts to inject those utilities and functions into the rassist-apiing
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.1.1
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
AppVersion: "v1.22.0"

View file

@ -0,0 +1,22 @@
1. Get the application URL by running these commands:
{{- if .Values.ingress.enabled }}
{{- range $host := .Values.ingress.hosts }}
{{- range .paths }}
http{{ if $.Values.ingress.tls }}s{{ end }}://{{ $host.host }}{{ .path }}
{{- end }}
{{- end }}
{{- else if contains "NodePort" .Values.service.type }}
export NODE_PORT=$(kubectl get --namespace {{ .Release.Namespace }} -o jsonpath="{.spec.ports[0].nodePort}" services {{ include "assist-api.fullname" . }})
export NODE_IP=$(kubectl get nodes --namespace {{ .Release.Namespace }} -o jsonpath="{.items[0].status.addresses[0].address}")
echo http://$NODE_IP:$NODE_PORT
{{- else if contains "LoadBalancer" .Values.service.type }}
NOTE: It may take a few minutes for the LoadBalancer IP to be available.
You can watch the status of by running 'kubectl get --namespace {{ .Release.Namespace }} svc -w {{ include "assist-api.fullname" . }}'
export SERVICE_IP=$(kubectl get svc --namespace {{ .Release.Namespace }} {{ include "assist-api.fullname" . }} --template "{{"{{ range (index .status.loadBalancer.ingress 0) }}{{.}}{{ end }}"}}")
echo http://$SERVICE_IP:{{ .Values.service.port }}
{{- else if contains "ClusterIP" .Values.service.type }}
export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "assist-api.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}")
export CONTAINER_PORT=$(kubectl get pod --namespace {{ .Release.Namespace }} $POD_NAME -o jsonpath="{.spec.containers[0].ports[0].containerPort}")
echo "Visit http://127.0.0.1:8080 to use your application"
kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8080:$CONTAINER_PORT
{{- end }}

View file

@ -0,0 +1,65 @@
{{/*
Expand the name of the chart.
*/}}
{{- define "assist-api.name" -}}
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }}
{{- end }}
{{/*
Create a default fully qualified app name.
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
If release name contains chart name it will be used as a full name.
*/}}
{{- define "assist-api.fullname" -}}
{{- if .Values.fullnameOverride }}
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }}
{{- else }}
{{- $name := default .Chart.Name .Values.nameOverride }}
{{- if contains $name .Release.Name }}
{{- .Release.Name | trunc 63 | trimSuffix "-" }}
{{- else }}
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }}
{{- end }}
{{- end }}
{{- end }}
{{/*
Create chart name and version as used by the chart label.
*/}}
{{- define "assist-api.chart" -}}
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }}
{{- end }}
{{/*
Common labels
*/}}
{{- define "assist-api.labels" -}}
helm.sh/chart: {{ include "assist-api.chart" . }}
{{ include "assist-api.selectorLabels" . }}
{{- if .Chart.AppVersion }}
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
{{- end }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
{{- if .Values.global.appLabels }}
{{- .Values.global.appLabels | toYaml | nindent 0}}
{{- end}}
{{- end }}
{{/*
Selector labels
*/}}
{{- define "assist-api.selectorLabels" -}}
app.kubernetes.io/name: {{ include "assist-api.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- end }}
{{/*
Create the name of the service account to use
*/}}
{{- define "assist-api.serviceAccountName" -}}
{{- if .Values.serviceAccount.create }}
{{- default (include "assist-api.fullname" .) .Values.serviceAccount.name }}
{{- else }}
{{- default "default" .Values.serviceAccount.name }}
{{- end }}
{{- end }}

View file

@ -0,0 +1,103 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "assist-api.fullname" . }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "assist-api.labels" . | nindent 4 }}
spec:
{{- if not .Values.autoscaling.enabled }}
replicas: {{ .Values.replicaCount }}
{{- end }}
selector:
matchLabels:
{{- include "assist-api.selectorLabels" . | nindent 6 }}
template:
metadata:
{{- with .Values.podAnnotations }}
annotations:
{{- toYaml . | nindent 8 }}
{{- end }}
labels:
{{- include "assist-api.selectorLabels" . | nindent 8 }}
spec:
{{- with .Values.imagePullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}
serviceAccountName: {{ include "assist-api.serviceAccountName" . }}
securityContext:
{{- toYaml .Values.podSecurityContext | nindent 8 }}
shareProcessNamespace: true
containers:
- name: {{ .Chart.Name }}
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
{{- if .Values.global.enterpriseEditionLicense }}
image: "{{ tpl .Values.image.repository . }}:{{ .Values.image.tag | default .Chart.AppVersion }}-ee"
{{- else }}
image: "{{ tpl .Values.image.repository . }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
{{- end }}
imagePullPolicy: {{ .Values.image.pullPolicy }}
{{- if .Values.healthCheck}}
{{- .Values.healthCheck | toYaml | nindent 10}}
{{- end}}
env:
- name: LICENSE_KEY
value: '{{ .Values.global.enterpriseEditionLicense }}'
- name: KAFKA_SERVERS
value: '{{ .Values.global.kafka.kafkaHost }}:{{ .Values.global.kafka.kafkaPort }}'
- name: KAFKA_USE_SSL
value: '{{ .Values.global.kafka.kafkaUseSsl }}'
- name: ASSIST_KEY
value: {{ .Values.global.assistKey }}
- name: pg_password
{{- if .Values.global.postgresql.existingSecret }}
valueFrom:
secretKeyRef:
name: {{ .Values.global.postgresql.existingSecret }}
key: postgresql-postgres-password
{{- else }}
value: '{{ .Values.global.postgresql.postgresqlPassword }}'
{{- end}}
- name: POSTGRES_STRING
value: 'postgres://{{ .Values.global.postgresql.postgresqlUser }}:$(pg_password)@{{ .Values.global.postgresql.postgresqlHost }}:{{ .Values.global.postgresql.postgresqlPort }}/{{ .Values.global.postgresql.postgresqlDatabase }}'
{{- include "openreplay.env.redis_string" .Values.global.redis | nindent 12 }}
{{- range $key, $val := .Values.global.env }}
- name: {{ $key }}
value: '{{ $val }}'
{{- end }}
{{- range $key, $val := .Values.env }}
- name: {{ $key }}
value: '{{ $val }}'
{{- end}}
ports:
{{- range $key, $val := .Values.service.ports }}
- name: {{ $key }}
containerPort: {{ $val }}
protocol: TCP
{{- end }}
volumeMounts:
{{- include "openreplay.volume.redis_ca_certificate.mount" .Values.global.redis | nindent 12 }}
{{- with .Values.persistence.mounts }}
{{- toYaml . | nindent 12 }}
{{- end }}
resources:
{{- toYaml .Values.resources | nindent 12 }}
volumes:
{{- with .Values.persistence.volumes }}
{{- toYaml . | nindent 8 }}
{{- end }}
{{- include "openreplay.volume.redis_ca_certificate" .Values.global.redis | nindent 8 }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}

View file

@ -0,0 +1,33 @@
{{- if .Values.autoscaling.enabled }}
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: {{ include "assist-api.fullname" . }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "assist-api.labels" . | nindent 4 }}
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: {{ include "assist-api.fullname" . }}
minReplicas: {{ .Values.autoscaling.minReplicas }}
maxReplicas: {{ .Values.autoscaling.maxReplicas }}
metrics:
{{- if .Values.autoscaling.targetCPUUtilizationPercentage }}
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: {{ .Values.autoscaling.targetCPUUtilizationPercentage }}
{{- end }}
{{- if .Values.autoscaling.targetMemoryUtilizationPercentage }}
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: {{ .Values.autoscaling.targetMemoryUtilizationPercentage }}
{{- end }}
{{- end }}

View file

@ -0,0 +1,36 @@
{{- if .Values.ingress.enabled -}}
{{- $fullName := include "assist-api.fullname" . -}}
{{- $svcPort := .Values.service.ports.http -}}
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: {{ $fullName }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "assist-api.labels" . | nindent 4 }}
annotations:
nginx.ingress.kubernetes.io/rewrite-target: /$1
nginx.ingress.kubernetes.io/upstream-hash-by: $http_x_forwarded_for
{{- with .Values.ingress.annotations }}
{{- toYaml . | nindent 4 }}
{{- end }}
spec:
ingressClassName: "{{ tpl .Values.ingress.className . }}"
tls:
- hosts:
- {{ .Values.global.domainName }}
{{- if .Values.ingress.tls.secretName}}
secretName: {{ .Values.ingress.tls.secretName }}
{{- end }}
rules:
- host: {{ .Values.global.domainName }}
http:
paths:
- path: /api-assist/(.*)
pathType: Prefix
backend:
service:
name: {{ $fullName }}
port:
number: {{ $svcPort }}
{{- end }}

View file

@ -0,0 +1,18 @@
apiVersion: v1
kind: Service
metadata:
name: {{ include "assist-api.fullname" . }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "assist-api.labels" . | nindent 4 }}
spec:
type: {{ .Values.service.type }}
ports:
{{- range $key, $val := .Values.service.ports }}
- port: {{ $val }}
targetPort: {{ $key }}
protocol: TCP
name: {{ $key }}
{{- end}}
selector:
{{- include "assist-api.selectorLabels" . | nindent 4 }}

View file

@ -0,0 +1,18 @@
{{- if and ( .Capabilities.APIVersions.Has "monitoring.coreos.com/v1" ) ( .Values.serviceMonitor.enabled ) }}
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: {{ include "assist-api.fullname" . }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "assist-api.labels" . | nindent 4 }}
{{- if .Values.serviceMonitor.additionalLabels }}
{{- toYaml .Values.serviceMonitor.additionalLabels | nindent 4 }}
{{- end }}
spec:
endpoints:
{{- .Values.serviceMonitor.scrapeConfigs | toYaml | nindent 4 }}
selector:
matchLabels:
{{- include "assist-api.selectorLabels" . | nindent 6 }}
{{- end }}

View file

@ -0,0 +1,13 @@
{{- if .Values.serviceAccount.create -}}
apiVersion: v1
kind: ServiceAccount
metadata:
name: {{ include "assist-api.serviceAccountName" . }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "assist-api.labels" . | nindent 4 }}
{{- with .Values.serviceAccount.annotations }}
annotations:
{{- toYaml . | nindent 4 }}
{{- end }}
{{- end }}

View file

@ -0,0 +1,15 @@
apiVersion: v1
kind: Pod
metadata:
name: "{{ include "assist-api.fullname" . }}-test-connection"
labels:
{{- include "assist-api.labels" . | nindent 4 }}
annotations:
"helm.sh/hook": test
spec:
containers:
- name: wget
image: busybox
command: ['wget']
args: ['{{ include "assist-api.fullname" . }}:{{ .Values.service.port }}']
restartPolicy: Never

View file

@ -0,0 +1,119 @@
# Default values for openreplay.
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.
replicaCount: 1
image:
repository: "{{ .Values.global.openReplayContainerRegistry }}/assist-api"
pullPolicy: IfNotPresent
# Overrides the image tag whose default is the chart appVersion.
tag: ""
imagePullSecrets: []
nameOverride: "assist-api"
fullnameOverride: "assist-api-openreplay"
serviceAccount:
# Specifies whether a service account should be created
create: true
# Annotations to add to the service account
annotations: {}
# The name of the service account to use.
# If not set and create is true, a name is generated using the fullname template
name: ""
podAnnotations: {}
securityContext:
runAsUser: 1001
runAsGroup: 1001
podSecurityContext:
runAsUser: 1001
runAsGroup: 1001
fsGroup: 1001
fsGroupChangePolicy: "OnRootMismatch"
# podSecurityContext: {}
# fsGroup: 2000
# securityContext: {}
# capabilities:
# drop:
# - ALL
# readOnlyRootFilesystem: true
# runAsNonRoot: true
# runAsUser: 1000
#service:
# type: ClusterIP
# port: 9000
serviceMonitor:
enabled: false
additionalLabels:
release: observability
scrapeConfigs:
- port: metrics
honorLabels: true
interval: 15s
path: /metrics
scheme: http
scrapeTimeout: 10s
service:
type: ClusterIP
ports:
http: 8080
metrics: 8888
ingress:
enabled: false
className: "{{ .Values.global.ingress.controller.ingressClassResource.name }}"
annotations:
nginx.ingress.kubernetes.io/proxy-read-timeout: "3600"
nginx.ingress.kubernetes.io/proxy-send-timeout: "3600"
# kubernetes.io/ingress.class: nginx
# kubernetes.io/tls-acme: "true"
tls:
secretName: openreplay-ssl
resources: {}
# We usually recommend not to specify default resources and to leave this as a conscious
# choice for the user. This also increases chances charts run on environments with little
# resources, such as Minikube. If you do want to specify resources, uncomment the following
# lines, adjust them as necessary, and remove the curly braces after 'resources:'.
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
autoscaling:
enabled: false
minReplicas: 1
maxReplicas: 5
targetCPUUtilizationPercentage: 80
# targetMemoryUtilizationPercentage: 80
env:
CLEAR_SOCKET_TIME: 720
nodeSelector: {}
tolerations: []
affinity: {}
persistence: {}
# # Spec of spec.template.spec.containers[*].volumeMounts
# mounts:
# - name: kafka-ssl
# mountPath: /opt/kafka/ssl
# # Spec of spec.template.spec.volumes
# volumes:
# - name: kafka-ssl
# secret:
# secretName: kafka-ssl

View file

@ -110,7 +110,6 @@ autoscaling:
env:
debug: 0
uws: false
redis: false
CLEAR_SOCKET_TIME: 720