From 95c5190b07e15a14b58272282875ccb64d97acb4 Mon Sep 17 00:00:00 2001 From: ShiKhu Date: Wed, 16 Feb 2022 19:44:03 +0100 Subject: [PATCH 1/4] feat(backend): incoming queue stats logging for ender & db --- backend/Dockerfile | 3 +- backend/pkg/log/queue.go | 77 ++++++++++++++++++++++++++++++++++ backend/services/db/main.go | 11 +++-- backend/services/ender/main.go | 9 ++-- 4 files changed, 91 insertions(+), 9 deletions(-) create mode 100644 backend/pkg/log/queue.go diff --git a/backend/Dockerfile b/backend/Dockerfile index 6ca305ca1..5cefd4cb4 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -43,7 +43,8 @@ ENV TZ=UTC \ AWS_REGION_ASSETS=eu-central-1 \ CACHE_ASSETS=true \ ASSETS_SIZE_LIMIT=6291456 \ - FS_CLEAN_HRS=72 + FS_CLEAN_HRS=72 \ + LOG_QUEUE_STATS_INTERVAL_SEC=60 ARG SERVICE_NAME diff --git a/backend/pkg/log/queue.go b/backend/pkg/log/queue.go new file mode 100644 index 000000000..a49f38c0d --- /dev/null +++ b/backend/pkg/log/queue.go @@ -0,0 +1,77 @@ +package log + +import ( + "time" + "fmt" + "log" + + "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/queue/types" + //"openreplay/backend/pkg/env" +) + + +type partitionStats struct { + maxts int64 + mints int64 + lastts int64 + lastID uint64 + count int +} + +type queueStats struct { + prts map[int32]*partitionStats + tick <-chan time.Time +} + +func NewQueueStats(sec int)*queueStats { + return &queueStats{ + prts: make(map[int32]*partitionStats), + tick: time.Tick(time.Duration(sec) * time.Second), + } +} + +func (qs *queueStats) HandleAndLog(sessionID uint64, m *types.Meta) { + prti := int32(sessionID % 16) // TODO use GetKeyPartition from kafka/key.go + prt, ok := qs.prts[prti] + if !ok { + qs.prts[prti] = &partitionStats{} + prt = qs.prts[prti] + } + + if prt.maxts < m.Timestamp { + prt.maxts = m.Timestamp + } + if prt.mints > m.Timestamp || prt.mints == 0 { + prt.mints = m.Timestamp + } + prt.lastts = m.Timestamp + prt.lastID = m.ID + prt.count += 1 + + + select { + case <-qs.tick: + qs.LogThenReset() + default: + } +} + + +func (qs *queueStats) LogThenReset() { + s := "Queue Statistics: " + for i, p := range qs.prts { + s = fmt.Sprintf("%v | %v:: lastTS %v, lastID %v, count %v, maxTS %v, minTS %v", + s, i, p.lastts, p.lastID, p.count, p.maxts, p.mints) + } + log.Println(s) + // reset + qs.prts = make(map[int32]*partitionStats) +} + + +// TODO: list of message id to log (mb filter function with callback in messages/utils.go or something) +func LogMessage(s string, sessionID uint64, msg messages.Message, m *types.Meta) { + log.Printf("%v | SessionID: %v, Queue info: %v, Message: %v", s, sessionID, m, msg) +} + diff --git a/backend/services/db/main.go b/backend/services/db/main.go index 5d2759c90..b60f7e7db 100644 --- a/backend/services/db/main.go +++ b/backend/services/db/main.go @@ -15,6 +15,7 @@ import ( "openreplay/backend/pkg/queue" "openreplay/backend/pkg/queue/types" "openreplay/backend/services/db/heuristics" + logger "openreplay/backend/pkg/log" ) var pg *cache.PGCache @@ -28,13 +29,18 @@ func main() { heurFinder := heuristics.NewHandler() + + statsLogger := logger.NewQueueStats(env.Int("LOG_QUEUE_STATS_INTERVAL_SEC")) + consumer := queue.NewMessageConsumer( env.String("GROUP_DB"), []string{ env.String("TOPIC_RAW_IOS"), env.String("TOPIC_TRIGGER"), }, - func(sessionID uint64, msg messages.Message, _ *types.Meta) { + func(sessionID uint64, msg messages.Message, meta *types.Meta) { + statsLogger.HandleAndLog(sessionID, meta) + if err := insertMessage(sessionID, msg); err != nil { if !postgres.IsPkeyViolation(err) { log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg) @@ -64,8 +70,7 @@ func main() { return } - err = insertStats(session, msg) - if err != nil { + if err := insertStats(session, msg); err != nil { log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg) } }) diff --git a/backend/services/ender/main.go b/backend/services/ender/main.go index 9c62d14b0..e8d739f0e 100644 --- a/backend/services/ender/main.go +++ b/backend/services/ender/main.go @@ -13,6 +13,7 @@ import ( "openreplay/backend/pkg/messages" "openreplay/backend/pkg/queue" "openreplay/backend/pkg/queue/types" + logger "openreplay/backend/pkg/log" "openreplay/backend/services/ender/builder" ) @@ -23,7 +24,8 @@ func main() { TOPIC_TRIGGER := env.String("TOPIC_TRIGGER") builderMap := builder.NewBuilderMap() - var lastTs int64 = 0 + + statsLogger := logger.NewQueueStats(env.Int("LOG_QUEUE_STATS_INTERVAL_SEC")) producer := queue.NewProducer() consumer := queue.NewMessageConsumer( @@ -33,11 +35,8 @@ func main() { env.String("TOPIC_RAW_IOS"), }, func(sessionID uint64, msg messages.Message, meta *types.Meta) { - lastTs = meta.Timestamp + statsLogger.HandleAndLog(sessionID, meta) builderMap.HandleMessage(sessionID, msg, msg.Meta().Index) - // builderMap.IterateSessionReadyMessages(sessionID, lastTs, func(readyMsg messages.Message) { - // producer.Produce(TOPIC_TRIGGER, sessionID, messages.Encode(readyMsg)) - // }) }, ) consumer.DisableAutoCommit() From 9568925b5aab8389397eb0822371eeb27dd9fa96 Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Wed, 16 Feb 2022 19:46:22 +0100 Subject: [PATCH 2/4] feat(utilities): WS-uWS fixed socket path --- utilities/servers/websocket.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utilities/servers/websocket.js b/utilities/servers/websocket.js index 6f8c4b955..7a6e53bc9 100644 --- a/utilities/servers/websocket.js +++ b/utilities/servers/websocket.js @@ -155,7 +155,7 @@ module.exports = { origin: "*", methods: ["GET", "POST", "PUT"] }, - path: '/ws-assist/socket', + path: '/socket', transports: ['websocket'], // upgrade: false }); From e9e6ddebf264f005262d386b4c707464ed7f8665 Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Wed, 16 Feb 2022 19:53:40 +0100 Subject: [PATCH 3/4] feat(utilities): WS-uWS removed force websocket --- utilities/servers/websocket.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utilities/servers/websocket.js b/utilities/servers/websocket.js index 7a6e53bc9..d19cabbcd 100644 --- a/utilities/servers/websocket.js +++ b/utilities/servers/websocket.js @@ -156,7 +156,7 @@ module.exports = { methods: ["GET", "POST", "PUT"] }, path: '/socket', - transports: ['websocket'], + // transports: ['websocket'], // upgrade: false }); io.attachApp(server); From 0e985e3920dbdb0127de6de39739f23d6f81bd0f Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Wed, 16 Feb 2022 22:57:35 +0100 Subject: [PATCH 4/4] feat(utilities): WS revert to pure socketio --- utilities/server.js | 46 ++++++------------------ utilities/servers/websocket.js | 65 +++++++++++++++------------------- 2 files changed, 39 insertions(+), 72 deletions(-) diff --git a/utilities/server.js b/utilities/server.js index cbae8f5ff..661ef081c 100644 --- a/utilities/server.js +++ b/utilities/server.js @@ -8,6 +8,7 @@ const HOST = '0.0.0.0'; const PORT = 9000; var app = express(); +var wsapp = express(); let debug = process.env.debug === "1" || false; const request_logger = (identity) => { return (req, res, next) => { @@ -22,15 +23,20 @@ const request_logger = (identity) => { } }; app.use(request_logger("[app]")); +wsapp.use(request_logger("[wsapp]")); app.use('/sourcemaps', sourcemapsReaderServer); app.use('/assist', peerRouter); +wsapp.use('/assist', socket.wsRouter); const server = app.listen(PORT, HOST, () => { console.log(`App listening on http://${HOST}:${PORT}`); console.log('Press Ctrl+C to quit.'); }); - +const wsserver = wsapp.listen(PORT + 1, HOST, () => { + console.log(`WS App listening on http://${HOST}:${PORT + 1}`); + console.log('Press Ctrl+C to quit.'); +}); const peerServer = ExpressPeerServer(server, { debug: true, path: '/', @@ -42,38 +48,6 @@ peerServer.on('disconnect', peerDisconnect); peerServer.on('error', peerError); app.use('/', peerServer); app.enable('trust proxy'); - - -const {App} = require("uWebSockets.js"); -const PREFIX = process.env.prefix || '/assist' - -const uapp = new App(); - -const healthFn = (res, req) => { - res.writeStatus('200 OK').end('ok!'); -} -uapp.get(PREFIX, healthFn); -uapp.get(`${PREFIX}/`, healthFn); - -const uWrapper = function (fn) { - return (res, req) => fn(req, res); -} -uapp.get(`${PREFIX}/${process.env.S3_KEY}/sockets-list`, uWrapper(socket.handlers.socketsList)); -uapp.get(`${PREFIX}/${process.env.S3_KEY}/sockets-list/:projectKey`, uWrapper(socket.handlers.socketsListByProject)); - -uapp.get(`${PREFIX}/${process.env.S3_KEY}/sockets-live`, uWrapper(socket.handlers.socketsLive)); -uapp.get(`${PREFIX}/${process.env.S3_KEY}/sockets-live/:projectKey`, uWrapper(socket.handlers.socketsLiveByProject)); - - -socket.start(uapp); - -uapp.listen(HOST, PORT + 1, (token) => { - if (!token) { - console.warn("port already in use"); - } - console.log(`WS App listening on http://${HOST}:${PORT + 1}`); - console.log('Press Ctrl+C to quit.'); -}); - - -module.exports = {uapp, server}; +wsapp.enable('trust proxy'); +socket.start(wsserver); +module.exports = {wsserver, server}; diff --git a/utilities/servers/websocket.js b/utilities/servers/websocket.js index d19cabbcd..ab6a2c4d5 100644 --- a/utilities/servers/websocket.js +++ b/utilities/servers/websocket.js @@ -1,7 +1,9 @@ const _io = require('socket.io'); +const express = require('express'); const uaParser = require('ua-parser-js'); const geoip2Reader = require('@maxmind/geoip2-node').Reader; var {extractPeerId} = require('./peerjs-server'); +var wsRouter = express.Router(); const IDENTITIES = {agent: 'agent', session: 'session'}; const NEW_AGENT = "NEW_AGENT"; const NO_AGENTS = "NO_AGENT"; @@ -9,11 +11,11 @@ const AGENT_DISCONNECT = "AGENT_DISCONNECTED"; const AGENTS_CONNECTED = "AGENTS_CONNECTED"; const NO_SESSIONS = "SESSION_DISCONNECTED"; const SESSION_ALREADY_CONNECTED = "SESSION_ALREADY_CONNECTED"; +// const wsReconnectionTimeout = process.env.wsReconnectionTimeout | 10 * 1000; let io; - let debug = process.env.debug === "1" || false; -const socketsList = function (req, res) { +wsRouter.get(`/${process.env.S3_KEY}/sockets-list`, function (req, res) { debug && console.log("[WS]looking for all available sessions"); let liveSessions = {}; for (let peerId of io.sockets.adapter.rooms.keys()) { @@ -23,10 +25,11 @@ const socketsList = function (req, res) { liveSessions[projectKey].push(sessionId); } } - res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify({"data": liveSessions})); -} -const socketsListByProject = function (req, res) { - req.params = {projectKey: req.getParameter(0)}; + res.statusCode = 200; + res.setHeader('Content-Type', 'application/json'); + res.end(JSON.stringify({"data": liveSessions})); +}); +wsRouter.get(`/${process.env.S3_KEY}/sockets-list/:projectKey`, function (req, res) { debug && console.log(`[WS]looking for available sessions for ${req.params.projectKey}`); let liveSessions = {}; for (let peerId of io.sockets.adapter.rooms.keys()) { @@ -36,9 +39,12 @@ const socketsListByProject = function (req, res) { liveSessions[projectKey].push(sessionId); } } - res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify({"data": liveSessions[req.params.projectKey] || []})); -} -const socketsLive = async function (req, res) { + res.statusCode = 200; + res.setHeader('Content-Type', 'application/json'); + res.end(JSON.stringify({"data": liveSessions[req.params.projectKey] || []})); +}); + +wsRouter.get(`/${process.env.S3_KEY}/sockets-live`, async function (req, res) { debug && console.log("[WS]looking for all available LIVE sessions"); let liveSessions = {}; for (let peerId of io.sockets.adapter.rooms.keys()) { @@ -53,10 +59,12 @@ const socketsLive = async function (req, res) { } } } - res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify({"data": liveSessions})); -} -const socketsLiveByProject = async function (req, res) { - req.params = {projectKey: req.getParameter(0)}; + + res.statusCode = 200; + res.setHeader('Content-Type', 'application/json'); + res.end(JSON.stringify({"data": liveSessions})); +}); +wsRouter.get(`/${process.env.S3_KEY}/sockets-live/:projectKey`, async function (req, res) { debug && console.log(`[WS]looking for available LIVE sessions for ${req.params.projectKey}`); let liveSessions = {}; for (let peerId of io.sockets.adapter.rooms.keys()) { @@ -71,8 +79,10 @@ const socketsLiveByProject = async function (req, res) { } } } - res.writeStatus('200 OK').writeHeader('Content-Type', 'application/json').end(JSON.stringify({"data": liveSessions[req.params.projectKey] || []})); -} + res.statusCode = 200; + res.setHeader('Content-Type', 'application/json'); + res.end(JSON.stringify({"data": liveSessions[req.params.projectKey] || []})); +}); const findSessionSocketId = async (io, peerId) => { const connected_sockets = await io.in(peerId).fetchSockets(); @@ -148,19 +158,16 @@ function extractSessionInfo(socket) { } module.exports = { + wsRouter, start: (server) => { - io = new _io.Server({ + io = _io(server, { maxHttpBufferSize: 1e6, cors: { origin: "*", methods: ["GET", "POST", "PUT"] }, - path: '/socket', - // transports: ['websocket'], - // upgrade: false + path: '/socket' }); - io.attachApp(server); - io.on('connection', async (socket) => { debug && console.log(`WS started:${socket.id}, Query:${JSON.stringify(socket.handshake.query)}`); @@ -200,13 +207,10 @@ module.exports = { } socket.on('disconnect', async () => { - // console.log(`${socket.id} disconnected from ${socket.peerId}, waiting ${wsReconnectionTimeout / 1000}s before checking remaining`); debug && console.log(`${socket.id} disconnected from ${socket.peerId}`); if (socket.identity === IDENTITIES.agent) { socket.to(socket.peerId).emit(AGENT_DISCONNECT, socket.id); } - // wait a little bit before notifying everyone - // setTimeout(async () => { debug && console.log("checking for number of connected agents and sessions"); let {c_sessions, c_agents} = await sessions_agents_count(io, socket); if (c_sessions === -1 && c_agents === -1) { @@ -220,9 +224,6 @@ module.exports = { debug && console.log(`notifying everyone in ${socket.peerId} about no AGENTS`); socket.to(socket.peerId).emit(NO_AGENTS); } - - - // }, wsReconnectionTimeout); }); socket.onAny(async (eventName, ...args) => { @@ -244,9 +245,7 @@ module.exports = { }); }); - console.log("WS server started"); - debug ? console.log("Debugging enabled.") : console.log("Debugging disabled, set debug=\"1\" to enable debugging."); - + console.log("WS server started") setInterval((io) => { try { let count = 0; @@ -269,11 +268,5 @@ module.exports = { console.error(e); } }, 20000, io); - }, - handlers: { - socketsList, - socketsListByProject, - socketsLive, - socketsLiveByProject } }; \ No newline at end of file