diff --git a/backend/cmd/assets/main.go b/backend/cmd/assets/main.go index b41dedd87..b05ecbe52 100644 --- a/backend/cmd/assets/main.go +++ b/backend/cmd/assets/main.go @@ -1,9 +1,7 @@ package main import ( - "context" "log" - "openreplay/backend/pkg/pprof" "os" "os/signal" "syscall" @@ -13,12 +11,16 @@ import ( "openreplay/backend/internal/assets/cacher" config "openreplay/backend/internal/config/assets" "openreplay/backend/pkg/messages" - "openreplay/backend/pkg/monitoring" + "openreplay/backend/pkg/metrics" + assetsMetrics "openreplay/backend/pkg/metrics/assets" + "openreplay/backend/pkg/pprof" "openreplay/backend/pkg/queue" ) func main() { - metrics := monitoring.New("assets") + m := metrics.New() + m.Register(assetsMetrics.List()) + log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) cfg := config.New() @@ -26,18 +28,13 @@ func main() { pprof.StartProfilingServer() } - cacher := cacher.NewCacher(cfg, metrics) - - totalAssets, err := metrics.RegisterCounter("assets_total") - if err != nil { - log.Printf("can't create assets_total metric: %s", err) - } + cacher := cacher.NewCacher(cfg) msgHandler := func(msg messages.Message) { switch m := msg.(type) { case *messages.AssetCache: cacher.CacheURL(m.SessionID(), m.URL) - totalAssets.Add(context.Background(), 1) + assetsMetrics.IncreaseProcessesSessions() // TODO: connect to "raw" topic in order to listen for JSException case *messages.JSException: sourceList, err := assets.ExtractJSExceptionSources(&m.Payload) diff --git a/backend/cmd/db/main.go b/backend/cmd/db/main.go index f9440a908..84b0d81ed 100644 --- a/backend/cmd/db/main.go +++ b/backend/cmd/db/main.go @@ -3,8 +3,6 @@ package main import ( "errors" "log" - types2 "openreplay/backend/pkg/db/types" - "openreplay/backend/pkg/pprof" "os" "os/signal" "syscall" @@ -14,16 +12,21 @@ import ( "openreplay/backend/internal/db/datasaver" "openreplay/backend/pkg/db/cache" "openreplay/backend/pkg/db/postgres" + types2 "openreplay/backend/pkg/db/types" "openreplay/backend/pkg/handlers" custom2 "openreplay/backend/pkg/handlers/custom" "openreplay/backend/pkg/messages" - "openreplay/backend/pkg/monitoring" + "openreplay/backend/pkg/metrics" + databaseMetrics "openreplay/backend/pkg/metrics/database" + "openreplay/backend/pkg/pprof" "openreplay/backend/pkg/queue" "openreplay/backend/pkg/sessions" ) func main() { - metrics := monitoring.New("db") + m := metrics.New() + m.Register(databaseMetrics.List()) + log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) cfg := db.New() @@ -33,7 +36,7 @@ func main() { // Init database pg := cache.NewPGCache( - postgres.NewConn(cfg.Postgres.String(), cfg.BatchQueueLimit, cfg.BatchSizeLimit, metrics), cfg.ProjectExpirationTimeoutMs) + postgres.NewConn(cfg.Postgres.String(), cfg.BatchQueueLimit, cfg.BatchSizeLimit), cfg.ProjectExpirationTimeoutMs) defer pg.Close() // HandlersFabric returns the list of message handlers we want to be applied to each incoming message. diff --git a/backend/cmd/ender/main.go b/backend/cmd/ender/main.go index 74b0b8bd2..da7ca9b89 100644 --- a/backend/cmd/ender/main.go +++ b/backend/cmd/ender/main.go @@ -2,8 +2,6 @@ package main import ( "log" - "openreplay/backend/internal/storage" - "openreplay/backend/pkg/pprof" "os" "os/signal" "strings" @@ -12,16 +10,23 @@ import ( "openreplay/backend/internal/config/ender" "openreplay/backend/internal/sessionender" + "openreplay/backend/internal/storage" "openreplay/backend/pkg/db/cache" "openreplay/backend/pkg/db/postgres" "openreplay/backend/pkg/intervals" "openreplay/backend/pkg/messages" - "openreplay/backend/pkg/monitoring" + "openreplay/backend/pkg/metrics" + databaseMetrics "openreplay/backend/pkg/metrics/database" + enderMetrics "openreplay/backend/pkg/metrics/ender" + "openreplay/backend/pkg/pprof" "openreplay/backend/pkg/queue" ) func main() { - metrics := monitoring.New("ender") + m := metrics.New() + m.Register(enderMetrics.List()) + m.Register(databaseMetrics.List()) + log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) cfg := ender.New() @@ -29,10 +34,10 @@ func main() { pprof.StartProfilingServer() } - pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres.String(), 0, 0, metrics), cfg.ProjectExpirationTimeoutMs) + pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres.String(), 0, 0), cfg.ProjectExpirationTimeoutMs) defer pg.Close() - sessions, err := sessionender.New(metrics, intervals.EVENTS_SESSION_END_TIMEOUT, cfg.PartitionsNumber) + sessions, err := sessionender.New(intervals.EVENTS_SESSION_END_TIMEOUT, cfg.PartitionsNumber) if err != nil { log.Printf("can't init ender service: %s", err) return diff --git a/backend/cmd/http/main.go b/backend/cmd/http/main.go index 4fb82b635..83eedaf29 100644 --- a/backend/cmd/http/main.go +++ b/backend/cmd/http/main.go @@ -2,23 +2,28 @@ package main import ( "log" - "openreplay/backend/internal/config/http" - "openreplay/backend/internal/http/router" - "openreplay/backend/internal/http/server" - "openreplay/backend/internal/http/services" - "openreplay/backend/pkg/monitoring" - "openreplay/backend/pkg/pprof" "os" "os/signal" "syscall" + "openreplay/backend/internal/config/http" + "openreplay/backend/internal/http/router" + "openreplay/backend/internal/http/server" + "openreplay/backend/internal/http/services" "openreplay/backend/pkg/db/cache" "openreplay/backend/pkg/db/postgres" + "openreplay/backend/pkg/metrics" + databaseMetrics "openreplay/backend/pkg/metrics/database" + httpMetrics "openreplay/backend/pkg/metrics/http" + "openreplay/backend/pkg/pprof" "openreplay/backend/pkg/queue" ) func main() { - metrics := monitoring.New("http") + m := metrics.New() + m.Register(httpMetrics.List()) + m.Register(databaseMetrics.List()) + log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) cfg := http.New() @@ -31,14 +36,14 @@ func main() { defer producer.Close(15000) // Connect to database - dbConn := cache.NewPGCache(postgres.NewConn(cfg.Postgres.String(), 0, 0, metrics), 1000*60*20) + dbConn := cache.NewPGCache(postgres.NewConn(cfg.Postgres.String(), 0, 0), 1000*60*20) defer dbConn.Close() // Build all services services := services.New(cfg, producer, dbConn) // Init server's routes - router, err := router.NewRouter(cfg, services, metrics) + router, err := router.NewRouter(cfg, services) if err != nil { log.Fatalf("failed while creating engine: %s", err) } diff --git a/backend/cmd/integrations/main.go b/backend/cmd/integrations/main.go index 8c6d56966..3fa07ee9c 100644 --- a/backend/cmd/integrations/main.go +++ b/backend/cmd/integrations/main.go @@ -2,24 +2,26 @@ package main import ( "log" - config "openreplay/backend/internal/config/integrations" - "openreplay/backend/internal/integrations/clientManager" - "openreplay/backend/pkg/monitoring" - "openreplay/backend/pkg/pprof" - "time" - "os" "os/signal" "syscall" + "time" + config "openreplay/backend/internal/config/integrations" + "openreplay/backend/internal/integrations/clientManager" "openreplay/backend/pkg/db/postgres" "openreplay/backend/pkg/intervals" + "openreplay/backend/pkg/metrics" + databaseMetrics "openreplay/backend/pkg/metrics/database" + "openreplay/backend/pkg/pprof" "openreplay/backend/pkg/queue" "openreplay/backend/pkg/token" ) func main() { - metrics := monitoring.New("integrations") + m := metrics.New() + m.Register(databaseMetrics.List()) + log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) cfg := config.New() @@ -27,7 +29,7 @@ func main() { pprof.StartProfilingServer() } - pg := postgres.NewConn(cfg.Postgres.String(), 0, 0, metrics) + pg := postgres.NewConn(cfg.Postgres.String(), 0, 0) defer pg.Close() tokenizer := token.NewTokenizer(cfg.TokenSecret) diff --git a/backend/cmd/sink/main.go b/backend/cmd/sink/main.go index 74e0b1db1..4bbaeeee4 100644 --- a/backend/cmd/sink/main.go +++ b/backend/cmd/sink/main.go @@ -2,10 +2,8 @@ package main import ( "bytes" - "context" "encoding/binary" "log" - "openreplay/backend/pkg/pprof" "os" "os/signal" "syscall" @@ -16,13 +14,16 @@ import ( "openreplay/backend/internal/sink/sessionwriter" "openreplay/backend/internal/storage" "openreplay/backend/pkg/messages" - "openreplay/backend/pkg/monitoring" + "openreplay/backend/pkg/metrics" + sinkMetrics "openreplay/backend/pkg/metrics/sink" + "openreplay/backend/pkg/pprof" "openreplay/backend/pkg/queue" "openreplay/backend/pkg/url/assets" ) func main() { - metrics := monitoring.New("sink") + m := metrics.New() + m.Register(sinkMetrics.List()) log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) cfg := sink.New() @@ -39,22 +40,8 @@ func main() { producer := queue.NewProducer(cfg.MessageSizeLimit, true) defer producer.Close(cfg.ProducerCloseTimeout) rewriter := assets.NewRewriter(cfg.AssetsOrigin) - assetMessageHandler := assetscache.New(cfg, rewriter, producer, metrics) - + assetMessageHandler := assetscache.New(cfg, rewriter, producer) counter := storage.NewLogCounter() - // Session message metrics - totalMessages, err := metrics.RegisterCounter("messages_total") - if err != nil { - log.Printf("can't create messages_total metric: %s", err) - } - savedMessages, err := metrics.RegisterCounter("messages_saved") - if err != nil { - log.Printf("can't create messages_saved metric: %s", err) - } - messageSize, err := metrics.RegisterHistogram("messages_size") - if err != nil { - log.Printf("can't create messages_size metric: %s", err) - } var ( sessionID uint64 @@ -74,11 +61,12 @@ func main() { if domBuffer.Len() <= 0 && devBuffer.Len() <= 0 { return } + sinkMetrics.RecordWrittenBytes(float64(domBuffer.Len()), "dom") + sinkMetrics.RecordWrittenBytes(float64(devBuffer.Len()), "devtools") // Write buffered batches to the session if err := writer.Write(sessionID, domBuffer.Bytes(), devBuffer.Bytes()); err != nil { log.Printf("writer error: %s", err) - return } // Prepare buffer for the next batch @@ -88,8 +76,7 @@ func main() { return } - // [METRICS] Increase the number of processed messages - totalMessages.Add(context.Background(), 1) + sinkMetrics.IncreaseTotalMessages() // Send SessionEnd trigger to storage service if msg.TypeID() == messages.MsgSessionEnd { @@ -187,9 +174,8 @@ func main() { } } - // [METRICS] Increase the number of written to the files messages and the message size - messageSize.Record(context.Background(), float64(len(msg.Encode()))) - savedMessages.Add(context.Background(), 1) + sinkMetrics.IncreaseWrittenMessages() + sinkMetrics.RecordMessageSize(float64(len(msg.Encode()))) } consumer := queue.NewConsumer( diff --git a/backend/cmd/storage/main.go b/backend/cmd/storage/main.go index dcb1b53ed..472324b95 100644 --- a/backend/cmd/storage/main.go +++ b/backend/cmd/storage/main.go @@ -2,7 +2,6 @@ package main import ( "log" - "openreplay/backend/pkg/pprof" "os" "os/signal" "syscall" @@ -12,13 +11,17 @@ import ( "openreplay/backend/internal/storage" "openreplay/backend/pkg/failover" "openreplay/backend/pkg/messages" - "openreplay/backend/pkg/monitoring" + "openreplay/backend/pkg/metrics" + storageMetrics "openreplay/backend/pkg/metrics/storage" + "openreplay/backend/pkg/pprof" "openreplay/backend/pkg/queue" - s3storage "openreplay/backend/pkg/storage" + cloud "openreplay/backend/pkg/storage" ) func main() { - metrics := monitoring.New("storage") + m := metrics.New() + m.Register(storageMetrics.List()) + log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) cfg := config.New() @@ -26,8 +29,8 @@ func main() { pprof.StartProfilingServer() } - s3 := s3storage.NewS3(cfg.S3Region, cfg.S3Bucket) - srv, err := storage.New(cfg, s3, metrics) + s3 := cloud.NewS3(cfg.S3Region, cfg.S3Bucket) + srv, err := storage.New(cfg, s3) if err != nil { log.Printf("can't init storage service: %s", err) return diff --git a/backend/internal/assets/cacher/cacher.go b/backend/internal/assets/cacher/cacher.go index 8bbee092f..4b0353a9a 100644 --- a/backend/internal/assets/cacher/cacher.go +++ b/backend/internal/assets/cacher/cacher.go @@ -1,16 +1,13 @@ package cacher import ( - "context" "crypto/tls" "fmt" - "go.opentelemetry.io/otel/metric/instrument/syncfloat64" "io" "io/ioutil" - "log" "mime" "net/http" - "openreplay/backend/pkg/monitoring" + metrics "openreplay/backend/pkg/metrics/assets" "path/filepath" "strings" "time" @@ -25,30 +22,22 @@ import ( const MAX_CACHE_DEPTH = 5 type cacher struct { - timeoutMap *timeoutMap // Concurrency implemented - s3 *storage.S3 // AWS Docs: "These clients are safe to use concurrently." - httpClient *http.Client // Docs: "Clients are safe for concurrent use by multiple goroutines." - rewriter *assets.Rewriter // Read only - Errors chan error - sizeLimit int - downloadedAssets syncfloat64.Counter - requestHeaders map[string]string - workers *WorkerPool + timeoutMap *timeoutMap // Concurrency implemented + s3 *storage.S3 // AWS Docs: "These clients are safe to use concurrently." + httpClient *http.Client // Docs: "Clients are safe for concurrent use by multiple goroutines." + rewriter *assets.Rewriter // Read only + Errors chan error + sizeLimit int + requestHeaders map[string]string + workers *WorkerPool } func (c *cacher) CanCache() bool { return c.workers.CanAddTask() } -func NewCacher(cfg *config.Config, metrics *monitoring.Metrics) *cacher { +func NewCacher(cfg *config.Config) *cacher { rewriter := assets.NewRewriter(cfg.AssetsOrigin) - if metrics == nil { - log.Fatalf("metrics are empty") - } - downloadedAssets, err := metrics.RegisterCounter("assets_downloaded") - if err != nil { - log.Printf("can't create downloaded_assets metric: %s", err) - } c := &cacher{ timeoutMap: newTimeoutMap(), s3: storage.NewS3(cfg.AWSRegion, cfg.S3BucketAssets), @@ -59,11 +48,10 @@ func NewCacher(cfg *config.Config, metrics *monitoring.Metrics) *cacher { TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, }, }, - rewriter: rewriter, - Errors: make(chan error), - sizeLimit: cfg.AssetsSizeLimit, - downloadedAssets: downloadedAssets, - requestHeaders: cfg.AssetsRequestHeaders, + rewriter: rewriter, + Errors: make(chan error), + sizeLimit: cfg.AssetsSizeLimit, + requestHeaders: cfg.AssetsRequestHeaders, } c.workers = NewPool(64, c.CacheFile) return c @@ -75,6 +63,7 @@ func (c *cacher) CacheFile(task *Task) { func (c *cacher) cacheURL(t *Task) { t.retries-- + start := time.Now() req, _ := http.NewRequest("GET", t.requestURL, nil) if t.retries%2 == 0 { req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20100101 Firefox/31.0") @@ -87,6 +76,7 @@ func (c *cacher) cacheURL(t *Task) { c.Errors <- errors.Wrap(err, t.urlContext) return } + metrics.RecordDownloadDuration(float64(time.Now().Sub(start).Milliseconds()), res.StatusCode) defer res.Body.Close() if res.StatusCode >= 400 { printErr := true @@ -122,12 +112,15 @@ func (c *cacher) cacheURL(t *Task) { } // TODO: implement in streams + start = time.Now() err = c.s3.Upload(strings.NewReader(strData), t.cachePath, contentType, false) if err != nil { + metrics.RecordUploadDuration(float64(time.Now().Sub(start).Milliseconds()), true) c.Errors <- errors.Wrap(err, t.urlContext) return } - c.downloadedAssets.Add(context.Background(), 1) + metrics.RecordUploadDuration(float64(time.Now().Sub(start).Milliseconds()), false) + metrics.IncreaseSavedSessions() if isCSS { if t.depth > 0 { diff --git a/backend/internal/http/router/handlers-ios.go b/backend/internal/http/router/handlers-ios.go index e0fc73b6f..b11918d54 100644 --- a/backend/internal/http/router/handlers-ios.go +++ b/backend/internal/http/router/handlers-ios.go @@ -22,28 +22,28 @@ func (e *Router) startSessionHandlerIOS(w http.ResponseWriter, r *http.Request) req := &StartIOSSessionRequest{} if r.Body == nil { - ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty")) + ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"), startTime, r.URL.Path, 0) return } body := http.MaxBytesReader(w, r.Body, e.cfg.JsonSizeLimit) defer body.Close() if err := json.NewDecoder(body).Decode(req); err != nil { - ResponseWithError(w, http.StatusBadRequest, err) + ResponseWithError(w, http.StatusBadRequest, err, startTime, r.URL.Path, 0) return } if req.ProjectKey == nil { - ResponseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required")) + ResponseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"), startTime, r.URL.Path, 0) return } p, err := e.services.Database.GetProjectByKey(*req.ProjectKey) if err != nil { if postgres.IsNoRowsErr(err) { - ResponseWithError(w, http.StatusNotFound, errors.New("Project doesn't exist or is not active")) + ResponseWithError(w, http.StatusNotFound, errors.New("Project doesn't exist or is not active"), startTime, r.URL.Path, 0) } else { - ResponseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging + ResponseWithError(w, http.StatusInternalServerError, err, startTime, r.URL.Path, 0) // TODO: send error here only on staging } return } @@ -53,18 +53,18 @@ func (e *Router) startSessionHandlerIOS(w http.ResponseWriter, r *http.Request) if err != nil { // Starting the new one dice := byte(rand.Intn(100)) // [0, 100) if dice >= p.SampleRate { - ResponseWithError(w, http.StatusForbidden, errors.New("cancel")) + ResponseWithError(w, http.StatusForbidden, errors.New("cancel"), startTime, r.URL.Path, 0) return } ua := e.services.UaParser.ParseFromHTTPRequest(r) if ua == nil { - ResponseWithError(w, http.StatusForbidden, errors.New("browser not recognized")) + ResponseWithError(w, http.StatusForbidden, errors.New("browser not recognized"), startTime, r.URL.Path, 0) return } sessionID, err := e.services.Flaker.Compose(uint64(startTime.UnixMilli())) if err != nil { - ResponseWithError(w, http.StatusInternalServerError, err) + ResponseWithError(w, http.StatusInternalServerError, err, startTime, r.URL.Path, 0) return } // TODO: if EXPIRED => send message for two sessions association @@ -94,22 +94,24 @@ func (e *Router) startSessionHandlerIOS(w http.ResponseWriter, r *http.Request) UserUUID: userUUID, SessionID: strconv.FormatUint(tokenData.ID, 10), BeaconSizeLimit: e.cfg.BeaconSizeLimit, - }) + }, startTime, r.URL.Path, 0) } func (e *Router) pushMessagesHandlerIOS(w http.ResponseWriter, r *http.Request) { + startTime := time.Now() sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r) if err != nil { - ResponseWithError(w, http.StatusUnauthorized, err) + ResponseWithError(w, http.StatusUnauthorized, err, startTime, r.URL.Path, 0) return } e.pushMessages(w, r, sessionData.ID, e.cfg.TopicRawIOS) } func (e *Router) pushLateMessagesHandlerIOS(w http.ResponseWriter, r *http.Request) { + startTime := time.Now() sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r) if err != nil && err != token.EXPIRED { - ResponseWithError(w, http.StatusUnauthorized, err) + ResponseWithError(w, http.StatusUnauthorized, err, startTime, r.URL.Path, 0) return } // Check timestamps here? @@ -117,16 +119,17 @@ func (e *Router) pushLateMessagesHandlerIOS(w http.ResponseWriter, r *http.Reque } func (e *Router) imagesUploadHandlerIOS(w http.ResponseWriter, r *http.Request) { + startTime := time.Now() log.Printf("recieved imagerequest") sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r) if err != nil { // Should accept expired token? - ResponseWithError(w, http.StatusUnauthorized, err) + ResponseWithError(w, http.StatusUnauthorized, err, startTime, r.URL.Path, 0) return } if r.Body == nil { - ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty")) + ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"), startTime, r.URL.Path, 0) return } r.Body = http.MaxBytesReader(w, r.Body, e.cfg.FileSizeLimit) @@ -134,21 +137,21 @@ func (e *Router) imagesUploadHandlerIOS(w http.ResponseWriter, r *http.Request) err = r.ParseMultipartForm(1e6) // ~1Mb if err == http.ErrNotMultipart || err == http.ErrMissingBoundary { - ResponseWithError(w, http.StatusUnsupportedMediaType, err) + ResponseWithError(w, http.StatusUnsupportedMediaType, err, startTime, r.URL.Path, 0) return // } else if err == multipart.ErrMessageTooLarge // if non-files part exceeds 10 MB } else if err != nil { - ResponseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging + ResponseWithError(w, http.StatusInternalServerError, err, startTime, r.URL.Path, 0) // TODO: send error here only on staging return } if r.MultipartForm == nil { - ResponseWithError(w, http.StatusInternalServerError, errors.New("Multipart not parsed")) + ResponseWithError(w, http.StatusInternalServerError, errors.New("Multipart not parsed"), startTime, r.URL.Path, 0) return } if len(r.MultipartForm.Value["projectKey"]) == 0 { - ResponseWithError(w, http.StatusBadRequest, errors.New("projectKey parameter missing")) // status for missing/wrong parameter? + ResponseWithError(w, http.StatusBadRequest, errors.New("projectKey parameter missing"), startTime, r.URL.Path, 0) // status for missing/wrong parameter? return } diff --git a/backend/internal/http/router/handlers-web.go b/backend/internal/http/router/handlers-web.go index 7afd184e5..52a37b7f0 100644 --- a/backend/internal/http/router/handlers-web.go +++ b/backend/internal/http/router/handlers-web.go @@ -3,18 +3,17 @@ package router import ( "encoding/json" "errors" - "github.com/Masterminds/semver" - "go.opentelemetry.io/otel/attribute" "io" "log" "math/rand" "net/http" - "openreplay/backend/internal/http/uuid" - "openreplay/backend/pkg/flakeid" "strconv" "time" + "github.com/Masterminds/semver" + "openreplay/backend/internal/http/uuid" "openreplay/backend/pkg/db/postgres" + "openreplay/backend/pkg/flakeid" . "openreplay/backend/pkg/messages" "openreplay/backend/pkg/token" ) @@ -28,13 +27,6 @@ func (e *Router) readBody(w http.ResponseWriter, r *http.Request, limit int64) ( if err != nil { return nil, err } - - reqSize := len(bodyBytes) - e.requestSize.Record( - r.Context(), - float64(reqSize), - []attribute.KeyValue{attribute.String("method", r.URL.Path)}..., - ) return bodyBytes, nil } @@ -56,40 +48,43 @@ func getSessionTimestamp(req *StartSessionRequest, startTimeMili int64) (ts uint func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) { startTime := time.Now() + bodySize := 0 // Check request body if r.Body == nil { - ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty")) + ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"), startTime, r.URL.Path, bodySize) return } bodyBytes, err := e.readBody(w, r, e.cfg.JsonSizeLimit) if err != nil { log.Printf("error while reading request body: %s", err) - ResponseWithError(w, http.StatusRequestEntityTooLarge, err) + ResponseWithError(w, http.StatusRequestEntityTooLarge, err, startTime, r.URL.Path, bodySize) return } + bodySize = len(bodyBytes) // Parse request body req := &StartSessionRequest{} if err := json.Unmarshal(bodyBytes, req); err != nil { - ResponseWithError(w, http.StatusBadRequest, err) + ResponseWithError(w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize) return } // Handler's logic if req.ProjectKey == nil { - ResponseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required")) + ResponseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"), startTime, r.URL.Path, bodySize) return } p, err := e.services.Database.GetProjectByKey(*req.ProjectKey) if err != nil { if postgres.IsNoRowsErr(err) { - ResponseWithError(w, http.StatusNotFound, errors.New("project doesn't exist or capture limit has been reached")) + ResponseWithError(w, http.StatusNotFound, + errors.New("project doesn't exist or capture limit has been reached"), startTime, r.URL.Path, bodySize) } else { log.Printf("can't get project by key: %s", err) - ResponseWithError(w, http.StatusInternalServerError, errors.New("can't get project by key")) + ResponseWithError(w, http.StatusInternalServerError, errors.New("can't get project by key"), startTime, r.URL.Path, bodySize) } return } @@ -99,19 +94,19 @@ func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) if err != nil || req.Reset { // Starting the new one dice := byte(rand.Intn(100)) // [0, 100) if dice >= p.SampleRate { - ResponseWithError(w, http.StatusForbidden, errors.New("cancel")) + ResponseWithError(w, http.StatusForbidden, errors.New("cancel"), startTime, r.URL.Path, bodySize) return } ua := e.services.UaParser.ParseFromHTTPRequest(r) if ua == nil { - ResponseWithError(w, http.StatusForbidden, errors.New("browser not recognized")) + ResponseWithError(w, http.StatusForbidden, errors.New("browser not recognized"), startTime, r.URL.Path, bodySize) return } startTimeMili := startTime.UnixMilli() sessionID, err := e.services.Flaker.Compose(uint64(startTimeMili)) if err != nil { - ResponseWithError(w, http.StatusInternalServerError, err) + ResponseWithError(w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize) return } // TODO: if EXPIRED => send message for two sessions association @@ -163,29 +158,33 @@ func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) BeaconSizeLimit: e.getBeaconSize(tokenData.ID), StartTimestamp: int64(flakeid.ExtractTimestamp(tokenData.ID)), Delay: tokenData.Delay, - }) + }, startTime, r.URL.Path, bodySize) } func (e *Router) pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request) { + startTime := time.Now() + bodySize := 0 + // Check authorization sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r) if err != nil { - ResponseWithError(w, http.StatusUnauthorized, err) + ResponseWithError(w, http.StatusUnauthorized, err, startTime, r.URL.Path, bodySize) return } // Check request body if r.Body == nil { - ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty")) + ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"), startTime, r.URL.Path, bodySize) return } bodyBytes, err := e.readBody(w, r, e.getBeaconSize(sessionData.ID)) if err != nil { log.Printf("error while reading request body: %s", err) - ResponseWithError(w, http.StatusRequestEntityTooLarge, err) + ResponseWithError(w, http.StatusRequestEntityTooLarge, err, startTime, r.URL.Path, bodySize) return } + bodySize = len(bodyBytes) // Send processed messages to queue as array of bytes // TODO: check bytes for nonsense crap @@ -194,39 +193,43 @@ func (e *Router) pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request) log.Printf("can't send processed messages to queue: %s", err) } - w.WriteHeader(http.StatusOK) + ResponseOK(w, startTime, r.URL.Path, bodySize) } func (e *Router) notStartedHandlerWeb(w http.ResponseWriter, r *http.Request) { + startTime := time.Now() + bodySize := 0 + // Check request body if r.Body == nil { - ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty")) + ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"), startTime, r.URL.Path, bodySize) return } bodyBytes, err := e.readBody(w, r, e.cfg.JsonSizeLimit) if err != nil { log.Printf("error while reading request body: %s", err) - ResponseWithError(w, http.StatusRequestEntityTooLarge, err) + ResponseWithError(w, http.StatusRequestEntityTooLarge, err, startTime, r.URL.Path, bodySize) return } + bodySize = len(bodyBytes) // Parse request body req := &NotStartedRequest{} if err := json.Unmarshal(bodyBytes, req); err != nil { - ResponseWithError(w, http.StatusBadRequest, err) + ResponseWithError(w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize) return } // Handler's logic if req.ProjectKey == nil { - ResponseWithError(w, http.StatusForbidden, errors.New("projectKey value required")) + ResponseWithError(w, http.StatusForbidden, errors.New("projectKey value required"), startTime, r.URL.Path, bodySize) return } ua := e.services.UaParser.ParseFromHTTPRequest(r) // TODO?: insert anyway if ua == nil { - ResponseWithError(w, http.StatusForbidden, errors.New("browser not recognized")) + ResponseWithError(w, http.StatusForbidden, errors.New("browser not recognized"), startTime, r.URL.Path, bodySize) return } country := e.services.GeoIP.ExtractISOCodeFromHTTPRequest(r) @@ -248,5 +251,5 @@ func (e *Router) notStartedHandlerWeb(w http.ResponseWriter, r *http.Request) { log.Printf("Unable to insert Unstarted Session: %v\n", err) } - w.WriteHeader(http.StatusOK) + ResponseOK(w, startTime, r.URL.Path, bodySize) } diff --git a/backend/internal/http/router/handlers.go b/backend/internal/http/router/handlers.go index c36fdd668..425177341 100644 --- a/backend/internal/http/router/handlers.go +++ b/backend/internal/http/router/handlers.go @@ -6,9 +6,11 @@ import ( "io/ioutil" "log" "net/http" + "time" ) func (e *Router) pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64, topicName string) { + start := time.Now() body := http.MaxBytesReader(w, r.Body, e.cfg.BeaconSizeLimit) defer body.Close() @@ -21,7 +23,7 @@ func (e *Router) pushMessages(w http.ResponseWriter, r *http.Request, sessionID reader, err = gzip.NewReader(body) if err != nil { - ResponseWithError(w, http.StatusInternalServerError, err) // TODO: stage-dependent response + ResponseWithError(w, http.StatusInternalServerError, err, start, r.URL.Path, 0) // TODO: stage-dependent response return } //log.Println("Gzip reader init", reader) @@ -32,7 +34,7 @@ func (e *Router) pushMessages(w http.ResponseWriter, r *http.Request, sessionID //log.Println("Reader after switch:", reader) buf, err := ioutil.ReadAll(reader) if err != nil { - ResponseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging + ResponseWithError(w, http.StatusInternalServerError, err, start, r.URL.Path, 0) // TODO: send error here only on staging return } e.services.Producer.Produce(topicName, sessionID, buf) // What if not able to send? diff --git a/backend/internal/http/router/response.go b/backend/internal/http/router/response.go index 0b4725419..b66b7c563 100644 --- a/backend/internal/http/router/response.go +++ b/backend/internal/http/router/response.go @@ -4,21 +4,44 @@ import ( "encoding/json" "log" "net/http" + "time" + + metrics "openreplay/backend/pkg/metrics/http" ) -func ResponseWithJSON(w http.ResponseWriter, res interface{}) { +func recordMetrics(requestStart time.Time, url string, code, bodySize int) { + if bodySize > 0 { + metrics.RecordRequestSize(float64(bodySize), url, code) + } + metrics.IncreaseTotalRequests() + metrics.RecordRequestDuration(float64(time.Now().Sub(requestStart).Milliseconds()), url, code) +} + +func ResponseOK(w http.ResponseWriter, requestStart time.Time, url string, bodySize int) { + w.WriteHeader(http.StatusOK) + recordMetrics(requestStart, url, http.StatusOK, bodySize) +} + +func ResponseWithJSON(w http.ResponseWriter, res interface{}, requestStart time.Time, url string, bodySize int) { body, err := json.Marshal(res) if err != nil { log.Println(err) } w.Header().Set("Content-Type", "application/json") w.Write(body) + recordMetrics(requestStart, url, http.StatusOK, bodySize) } -func ResponseWithError(w http.ResponseWriter, code int, err error) { - type response struct { - Error string `json:"error"` +type response struct { + Error string `json:"error"` +} + +func ResponseWithError(w http.ResponseWriter, code int, err error, requestStart time.Time, url string, bodySize int) { + body, err := json.Marshal(&response{err.Error()}) + if err != nil { + log.Println(err) } w.WriteHeader(code) - ResponseWithJSON(w, &response{err.Error()}) + w.Write(body) + recordMetrics(requestStart, url, code, bodySize) } diff --git a/backend/internal/http/router/router.go b/backend/internal/http/router/router.go index 964016dfd..6cd7efe79 100644 --- a/backend/internal/http/router/router.go +++ b/backend/internal/http/router/router.go @@ -1,19 +1,16 @@ package router import ( - "context" "fmt" - "github.com/gorilla/mux" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric/instrument/syncfloat64" "log" "net/http" + "sync" + "time" + + "github.com/gorilla/mux" http3 "openreplay/backend/internal/config/http" http2 "openreplay/backend/internal/http/services" "openreplay/backend/internal/http/util" - "openreplay/backend/pkg/monitoring" - "sync" - "time" ) type BeaconSize struct { @@ -25,21 +22,16 @@ type Router struct { router *mux.Router cfg *http3.Config services *http2.ServicesBuilder - requestSize syncfloat64.Histogram - requestDuration syncfloat64.Histogram - totalRequests syncfloat64.Counter mutex *sync.RWMutex beaconSizeCache map[uint64]*BeaconSize // Cache for session's beaconSize } -func NewRouter(cfg *http3.Config, services *http2.ServicesBuilder, metrics *monitoring.Metrics) (*Router, error) { +func NewRouter(cfg *http3.Config, services *http2.ServicesBuilder) (*Router, error) { switch { case cfg == nil: return nil, fmt.Errorf("config is empty") case services == nil: return nil, fmt.Errorf("services is empty") - case metrics == nil: - return nil, fmt.Errorf("metrics is empty") } e := &Router{ cfg: cfg, @@ -47,7 +39,6 @@ func NewRouter(cfg *http3.Config, services *http2.ServicesBuilder, metrics *moni mutex: &sync.RWMutex{}, beaconSizeCache: make(map[uint64]*BeaconSize), } - e.initMetrics(metrics) e.init() go e.clearBeaconSizes() return e, nil @@ -115,22 +106,6 @@ func (e *Router) init() { e.router.Use(e.corsMiddleware) } -func (e *Router) initMetrics(metrics *monitoring.Metrics) { - var err error - e.requestSize, err = metrics.RegisterHistogram("requests_body_size") - if err != nil { - log.Printf("can't create requests_body_size metric: %s", err) - } - e.requestDuration, err = metrics.RegisterHistogram("requests_duration") - if err != nil { - log.Printf("can't create requests_duration metric: %s", err) - } - e.totalRequests, err = metrics.RegisterCounter("requests_total") - if err != nil { - log.Printf("can't create requests_total metric: %s", err) - } -} - func (e *Router) root(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } @@ -149,17 +124,8 @@ func (e *Router) corsMiddleware(next http.Handler) http.Handler { log.Printf("Request: %v - %v ", r.Method, util.SafeString(r.URL.Path)) - requestStart := time.Now() - // Serve request next.ServeHTTP(w, r) - - metricsContext, _ := context.WithTimeout(context.Background(), time.Millisecond*100) - e.totalRequests.Add(metricsContext, 1) - e.requestDuration.Record(metricsContext, - float64(time.Now().Sub(requestStart).Milliseconds()), - []attribute.KeyValue{attribute.String("method", r.URL.Path)}..., - ) }) } diff --git a/backend/internal/sessionender/ender.go b/backend/internal/sessionender/ender.go index c1c2c9b7f..e1ddb0ffe 100644 --- a/backend/internal/sessionender/ender.go +++ b/backend/internal/sessionender/ender.go @@ -1,13 +1,11 @@ package sessionender import ( - "context" - "fmt" - "go.opentelemetry.io/otel/metric/instrument/syncfloat64" "log" - "openreplay/backend/pkg/messages" - "openreplay/backend/pkg/monitoring" "time" + + "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/metrics/ender" ) // EndedSessionHandler handler for ended sessions @@ -23,32 +21,16 @@ type session struct { // SessionEnder updates timestamp of last message for each session type SessionEnder struct { - timeout int64 - sessions map[uint64]*session // map[sessionID]session - timeCtrl *timeController - activeSessions syncfloat64.UpDownCounter - totalSessions syncfloat64.Counter + timeout int64 + sessions map[uint64]*session // map[sessionID]session + timeCtrl *timeController } -func New(metrics *monitoring.Metrics, timeout int64, parts int) (*SessionEnder, error) { - if metrics == nil { - return nil, fmt.Errorf("metrics module is empty") - } - activeSessions, err := metrics.RegisterUpDownCounter("sessions_active") - if err != nil { - return nil, fmt.Errorf("can't register session.active metric: %s", err) - } - totalSessions, err := metrics.RegisterCounter("sessions_total") - if err != nil { - return nil, fmt.Errorf("can't register session.total metric: %s", err) - } - +func New(timeout int64, parts int) (*SessionEnder, error) { return &SessionEnder{ - timeout: timeout, - sessions: make(map[uint64]*session), - timeCtrl: NewTimeController(parts), - activeSessions: activeSessions, - totalSessions: totalSessions, + timeout: timeout, + sessions: make(map[uint64]*session), + timeCtrl: NewTimeController(parts), }, nil } @@ -74,8 +56,8 @@ func (se *SessionEnder) UpdateSession(msg messages.Message) { lastUserTime: msgTimestamp, // last timestamp from user's machine isEnded: false, } - se.activeSessions.Add(context.Background(), 1) - se.totalSessions.Add(context.Background(), 1) + ender.IncreaseActiveSessions() + ender.IncreaseTotalSessions() return } // Keep the highest user's timestamp for correct session duration value @@ -100,7 +82,8 @@ func (se *SessionEnder) HandleEndedSessions(handler EndedSessionHandler) { sess.isEnded = true if handler(sessID, sess.lastUserTime) { delete(se.sessions, sessID) - se.activeSessions.Add(context.Background(), -1) + ender.DecreaseActiveSessions() + ender.IncreaseClosedSessions() removedSessions++ } else { log.Printf("sessID: %d, userTime: %d", sessID, sess.lastUserTime) diff --git a/backend/internal/sink/assetscache/assets.go b/backend/internal/sink/assetscache/assets.go index 4c63f6897..387ee5c92 100644 --- a/backend/internal/sink/assetscache/assets.go +++ b/backend/internal/sink/assetscache/assets.go @@ -1,20 +1,19 @@ package assetscache import ( - "context" "crypto/md5" - "go.opentelemetry.io/otel/metric/instrument/syncfloat64" "io" "log" "net/url" - "openreplay/backend/internal/config/sink" - "openreplay/backend/pkg/messages" - "openreplay/backend/pkg/monitoring" - "openreplay/backend/pkg/queue/types" - "openreplay/backend/pkg/url/assets" + metrics "openreplay/backend/pkg/metrics/sink" "strings" "sync" "time" + + "openreplay/backend/internal/config/sink" + "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/queue/types" + "openreplay/backend/pkg/url/assets" ) type CachedAsset struct { @@ -23,52 +22,21 @@ type CachedAsset struct { } type AssetsCache struct { - mutex sync.RWMutex - cfg *sink.Config - rewriter *assets.Rewriter - producer types.Producer - cache map[string]*CachedAsset - blackList []string // use "example.com" to filter all domains or ".example.com" to filter only third-level domain - totalAssets syncfloat64.Counter - cachedAssets syncfloat64.Counter - skippedAssets syncfloat64.Counter - assetSize syncfloat64.Histogram - assetDuration syncfloat64.Histogram + mutex sync.RWMutex + cfg *sink.Config + rewriter *assets.Rewriter + producer types.Producer + cache map[string]*CachedAsset + blackList []string // use "example.com" to filter all domains or ".example.com" to filter only third-level domain } -func New(cfg *sink.Config, rewriter *assets.Rewriter, producer types.Producer, metrics *monitoring.Metrics) *AssetsCache { - // Assets metrics - totalAssets, err := metrics.RegisterCounter("assets_total") - if err != nil { - log.Printf("can't create assets_total metric: %s", err) - } - cachedAssets, err := metrics.RegisterCounter("assets_cached") - if err != nil { - log.Printf("can't create assets_cached metric: %s", err) - } - skippedAssets, err := metrics.RegisterCounter("assets_skipped") - if err != nil { - log.Printf("can't create assets_skipped metric: %s", err) - } - assetSize, err := metrics.RegisterHistogram("asset_size") - if err != nil { - log.Printf("can't create asset_size metric: %s", err) - } - assetDuration, err := metrics.RegisterHistogram("asset_duration") - if err != nil { - log.Printf("can't create asset_duration metric: %s", err) - } +func New(cfg *sink.Config, rewriter *assets.Rewriter, producer types.Producer) *AssetsCache { assetsCache := &AssetsCache{ - cfg: cfg, - rewriter: rewriter, - producer: producer, - cache: make(map[string]*CachedAsset, 64), - blackList: make([]string, 0), - totalAssets: totalAssets, - cachedAssets: cachedAssets, - skippedAssets: skippedAssets, - assetSize: assetSize, - assetDuration: assetDuration, + cfg: cfg, + rewriter: rewriter, + producer: producer, + cache: make(map[string]*CachedAsset, 64), + blackList: make([]string, 0), } // Parse black list for cache layer if len(cfg.CacheBlackList) > 0 { @@ -84,7 +52,7 @@ func New(cfg *sink.Config, rewriter *assets.Rewriter, producer types.Producer, m } func (e *AssetsCache) cleaner() { - cleanTick := time.Tick(time.Minute * 30) + cleanTick := time.Tick(time.Minute * 3) for { select { case <-cleanTick: @@ -105,6 +73,7 @@ func (e *AssetsCache) clearCache() { if int64(now.Sub(cache.ts).Minutes()) > e.cfg.CacheExpiration { deleted++ delete(e.cache, id) + metrics.DecreaseCachedAssets() } } log.Printf("cache cleaner: deleted %d/%d assets", deleted, cacheSize) @@ -232,8 +201,7 @@ func parseHost(baseURL string) (string, error) { } func (e *AssetsCache) handleCSS(sessionID uint64, baseURL string, css string) string { - ctx := context.Background() - e.totalAssets.Add(ctx, 1) + metrics.IncreaseTotalAssets() // Try to find asset in cache h := md5.New() // Cut first part of url (scheme + host) @@ -255,7 +223,7 @@ func (e *AssetsCache) handleCSS(sessionID uint64, baseURL string, css string) st e.mutex.RUnlock() if ok { if int64(time.Now().Sub(cachedAsset.ts).Minutes()) < e.cfg.CacheExpiration { - e.skippedAssets.Add(ctx, 1) + metrics.IncreaseSkippedAssets() return cachedAsset.msg } } @@ -267,8 +235,8 @@ func (e *AssetsCache) handleCSS(sessionID uint64, baseURL string, css string) st start := time.Now() res := e.getRewrittenCSS(sessionID, baseURL, css) duration := time.Now().Sub(start).Milliseconds() - e.assetSize.Record(ctx, float64(len(res))) - e.assetDuration.Record(ctx, float64(duration)) + metrics.RecordAssetSize(float64(len(res))) + metrics.RecordProcessAssetDuration(float64(duration)) // Save asset to cache if we spent more than threshold if duration > e.cfg.CacheThreshold { e.mutex.Lock() @@ -277,7 +245,7 @@ func (e *AssetsCache) handleCSS(sessionID uint64, baseURL string, css string) st ts: time.Now(), } e.mutex.Unlock() - e.cachedAssets.Add(ctx, 1) + metrics.IncreaseCachedAssets() } // Return rewritten asset return res diff --git a/backend/internal/storage/storage.go b/backend/internal/storage/storage.go index fbe9e2228..1e2507163 100644 --- a/backend/internal/storage/storage.go +++ b/backend/internal/storage/storage.go @@ -2,20 +2,20 @@ package storage import ( "bytes" - "context" "fmt" - gzip "github.com/klauspost/pgzip" - "go.opentelemetry.io/otel/metric/instrument/syncfloat64" "log" - config "openreplay/backend/internal/config/storage" - "openreplay/backend/pkg/messages" - "openreplay/backend/pkg/monitoring" - "openreplay/backend/pkg/storage" "os" "strconv" "strings" "sync" "time" + + config "openreplay/backend/internal/config/storage" + "openreplay/backend/pkg/messages" + metrics "openreplay/backend/pkg/metrics/storage" + "openreplay/backend/pkg/storage" + + gzip "github.com/klauspost/pgzip" ) type FileType string @@ -25,6 +25,13 @@ const ( DEV FileType = "/devtools.mob" ) +func (t FileType) String() string { + if t == DOM { + return "dom" + } + return "devtools" +} + type Task struct { id string doms *bytes.Buffer @@ -36,92 +43,23 @@ type Storage struct { cfg *config.Config s3 *storage.S3 startBytes []byte - - totalSessions syncfloat64.Counter - sessionDOMSize syncfloat64.Histogram - sessionDEVSize syncfloat64.Histogram - readingDOMTime syncfloat64.Histogram - readingDEVTime syncfloat64.Histogram - sortingDOMTime syncfloat64.Histogram - sortingDEVTime syncfloat64.Histogram - archivingDOMTime syncfloat64.Histogram - archivingDEVTime syncfloat64.Histogram - uploadingDOMTime syncfloat64.Histogram - uploadingDEVTime syncfloat64.Histogram - - tasks chan *Task - ready chan struct{} + tasks chan *Task + ready chan struct{} } -func New(cfg *config.Config, s3 *storage.S3, metrics *monitoring.Metrics) (*Storage, error) { +func New(cfg *config.Config, s3 *storage.S3) (*Storage, error) { switch { case cfg == nil: return nil, fmt.Errorf("config is empty") case s3 == nil: return nil, fmt.Errorf("s3 storage is empty") } - // Create metrics - totalSessions, err := metrics.RegisterCounter("sessions_total") - if err != nil { - log.Printf("can't create sessions_total metric: %s", err) - } - sessionDOMSize, err := metrics.RegisterHistogram("sessions_size") - if err != nil { - log.Printf("can't create session_size metric: %s", err) - } - sessionDevtoolsSize, err := metrics.RegisterHistogram("sessions_dt_size") - if err != nil { - log.Printf("can't create sessions_dt_size metric: %s", err) - } - readingDOMTime, err := metrics.RegisterHistogram("reading_duration") - if err != nil { - log.Printf("can't create reading_duration metric: %s", err) - } - readingDEVTime, err := metrics.RegisterHistogram("reading_dt_duration") - if err != nil { - log.Printf("can't create reading_duration metric: %s", err) - } - sortingDOMTime, err := metrics.RegisterHistogram("sorting_duration") - if err != nil { - log.Printf("can't create reading_duration metric: %s", err) - } - sortingDEVTime, err := metrics.RegisterHistogram("sorting_dt_duration") - if err != nil { - log.Printf("can't create reading_duration metric: %s", err) - } - archivingDOMTime, err := metrics.RegisterHistogram("archiving_duration") - if err != nil { - log.Printf("can't create archiving_duration metric: %s", err) - } - archivingDEVTime, err := metrics.RegisterHistogram("archiving_dt_duration") - if err != nil { - log.Printf("can't create archiving_duration metric: %s", err) - } - uploadingDOMTime, err := metrics.RegisterHistogram("uploading_duration") - if err != nil { - log.Printf("can't create uploading_duration metric: %s", err) - } - uploadingDEVTime, err := metrics.RegisterHistogram("uploading_dt_duration") - if err != nil { - log.Printf("can't create uploading_duration metric: %s", err) - } newStorage := &Storage{ - cfg: cfg, - s3: s3, - startBytes: make([]byte, cfg.FileSplitSize), - totalSessions: totalSessions, - sessionDOMSize: sessionDOMSize, - sessionDEVSize: sessionDevtoolsSize, - readingDOMTime: readingDOMTime, - readingDEVTime: readingDEVTime, - sortingDOMTime: sortingDOMTime, - sortingDEVTime: sortingDEVTime, - archivingDOMTime: archivingDOMTime, - archivingDEVTime: archivingDEVTime, - uploadingDOMTime: uploadingDOMTime, - uploadingDEVTime: uploadingDEVTime, - tasks: make(chan *Task, 1), - ready: make(chan struct{}), + cfg: cfg, + s3: s3, + startBytes: make([]byte, cfg.FileSplitSize), + tasks: make(chan *Task, 1), + ready: make(chan struct{}), } go newStorage.worker() return newStorage, nil @@ -187,11 +125,7 @@ func (s *Storage) openSession(filePath string, tp FileType) ([]byte, error) { if err != nil { return nil, fmt.Errorf("can't sort session, err: %s", err) } - if tp == DOM { - s.sortingDOMTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds())) - } else { - s.sortingDEVTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds())) - } + metrics.RecordSessionSortDuration(float64(time.Now().Sub(start).Milliseconds()), tp.String()) return res, nil } @@ -215,26 +149,19 @@ func (s *Storage) prepareSession(path string, tp FileType, task *Task) error { if err != nil { return err } - durRead := time.Now().Sub(startRead).Milliseconds() - // Send metrics - ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*200) - if tp == DOM { - s.sessionDOMSize.Record(ctx, float64(len(mob))) - s.readingDOMTime.Record(ctx, float64(durRead)) - } else { - s.sessionDEVSize.Record(ctx, float64(len(mob))) - s.readingDEVTime.Record(ctx, float64(durRead)) - } + metrics.RecordSessionSize(float64(len(mob)), tp.String()) + metrics.RecordSessionReadDuration(float64(time.Now().Sub(startRead).Milliseconds()), tp.String()) + // Encode and compress session if tp == DEV { - startCompress := time.Now() + start := time.Now() task.dev = s.compressSession(mob) - s.archivingDEVTime.Record(ctx, float64(time.Now().Sub(startCompress).Milliseconds())) + metrics.RecordSessionCompressDuration(float64(time.Now().Sub(start).Milliseconds()), tp.String()) } else { if len(mob) <= s.cfg.FileSplitSize { - startCompress := time.Now() + start := time.Now() task.doms = s.compressSession(mob) - s.archivingDOMTime.Record(ctx, float64(time.Now().Sub(startCompress).Milliseconds())) + metrics.RecordSessionCompressDuration(float64(time.Now().Sub(start).Milliseconds()), tp.String()) return nil } wg := &sync.WaitGroup{} @@ -253,7 +180,7 @@ func (s *Storage) prepareSession(path string, tp FileType, task *Task) error { wg.Done() }() wg.Wait() - s.archivingDOMTime.Record(ctx, float64(firstPart+secondPart)) + metrics.RecordSessionCompressDuration(float64(firstPart+secondPart), tp.String()) } return nil } @@ -324,11 +251,9 @@ func (s *Storage) uploadSession(task *Task) { wg.Done() }() wg.Wait() - // Record metrics - ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*200) - s.uploadingDOMTime.Record(ctx, float64(uploadDoms+uploadDome)) - s.uploadingDEVTime.Record(ctx, float64(uploadDev)) - s.totalSessions.Add(ctx, 1) + metrics.RecordSessionUploadDuration(float64(uploadDoms+uploadDome), DOM.String()) + metrics.RecordSessionUploadDuration(float64(uploadDev), DEV.String()) + metrics.IncreaseStorageTotalSessions() } func (s *Storage) worker() { diff --git a/backend/pkg/db/postgres/batches.go b/backend/pkg/db/postgres/batches.go index c1283da10..abdee36f2 100644 --- a/backend/pkg/db/postgres/batches.go +++ b/backend/pkg/db/postgres/batches.go @@ -1,14 +1,13 @@ package postgres import ( - "context" - "github.com/jackc/pgx/v4" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric/instrument/syncfloat64" "log" - "openreplay/backend/pkg/monitoring" "strings" "time" + + "openreplay/backend/pkg/metrics/database" + + "github.com/jackc/pgx/v4" ) type batchItem struct { @@ -78,21 +77,17 @@ func NewBatchesTask(size int) *batchesTask { } type BatchSet struct { - c Pool - batches map[uint64]*SessionBatch - batchQueueLimit int - batchSizeLimit int - batchSizeBytes syncfloat64.Histogram - batchSizeLines syncfloat64.Histogram - sqlRequestTime syncfloat64.Histogram - sqlRequestCounter syncfloat64.Counter - updates map[uint64]*sessionUpdates - workerTask chan *batchesTask - done chan struct{} - finished chan struct{} + c Pool + batches map[uint64]*SessionBatch + batchQueueLimit int + batchSizeLimit int + updates map[uint64]*sessionUpdates + workerTask chan *batchesTask + done chan struct{} + finished chan struct{} } -func NewBatchSet(c Pool, queueLimit, sizeLimit int, metrics *monitoring.Metrics) *BatchSet { +func NewBatchSet(c Pool, queueLimit, sizeLimit int) *BatchSet { bs := &BatchSet{ c: c, batches: make(map[uint64]*SessionBatch), @@ -103,31 +98,10 @@ func NewBatchSet(c Pool, queueLimit, sizeLimit int, metrics *monitoring.Metrics) finished: make(chan struct{}), updates: make(map[uint64]*sessionUpdates), } - bs.initMetrics(metrics) go bs.worker() return bs } -func (conn *BatchSet) initMetrics(metrics *monitoring.Metrics) { - var err error - conn.batchSizeBytes, err = metrics.RegisterHistogram("batch_size_bytes") - if err != nil { - log.Printf("can't create batchSizeBytes metric: %s", err) - } - conn.batchSizeLines, err = metrics.RegisterHistogram("batch_size_lines") - if err != nil { - log.Printf("can't create batchSizeLines metric: %s", err) - } - conn.sqlRequestTime, err = metrics.RegisterHistogram("sql_request_time") - if err != nil { - log.Printf("can't create sqlRequestTime metric: %s", err) - } - conn.sqlRequestCounter, err = metrics.RegisterCounter("sql_request_number") - if err != nil { - log.Printf("can't create sqlRequestNumber metric: %s", err) - } -} - func (conn *BatchSet) getBatch(sessionID uint64) *SessionBatch { sessionID = sessionID % 10 if _, ok := conn.batches[sessionID]; !ok { @@ -194,11 +168,10 @@ func (conn *BatchSet) sendBatches(t *batchesTask) { // Append session update sql request to the end of batch batch.Prepare() // Record batch size in bytes and number of lines - conn.batchSizeBytes.Record(context.Background(), float64(batch.Size())) - conn.batchSizeLines.Record(context.Background(), float64(batch.Len())) + database.RecordBatchSize(float64(batch.Size())) + database.RecordBatchElements(float64(batch.Len())) start := time.Now() - isFailed := false // Send batch to db and execute br := conn.c.SendBatch(batch.batch) @@ -209,15 +182,11 @@ func (conn *BatchSet) sendBatches(t *batchesTask) { failedSql := batch.items[i] query := strings.ReplaceAll(failedSql.query, "\n", " ") log.Println("failed sql req:", query, failedSql.arguments) - isFailed = true } } br.Close() // returns err - dur := time.Now().Sub(start).Milliseconds() - conn.sqlRequestTime.Record(context.Background(), float64(dur), - attribute.String("method", "batch"), attribute.Bool("failed", isFailed)) - conn.sqlRequestCounter.Add(context.Background(), 1, - attribute.String("method", "batch"), attribute.Bool("failed", isFailed)) + database.RecordBatchInsertDuration(float64(time.Now().Sub(start).Milliseconds())) + database.IncreaseTotalBatches() } } diff --git a/backend/pkg/db/postgres/bulk.go b/backend/pkg/db/postgres/bulk.go index 8c6c42f78..b6a2ddd35 100644 --- a/backend/pkg/db/postgres/bulk.go +++ b/backend/pkg/db/postgres/bulk.go @@ -2,13 +2,9 @@ package postgres import ( "bytes" - "context" "errors" "fmt" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric/instrument/syncfloat64" - "log" - "openreplay/backend/pkg/monitoring" + "openreplay/backend/pkg/metrics/database" "time" ) @@ -25,15 +21,13 @@ type Bulk interface { } type bulkImpl struct { - conn Pool - table string - columns string - template string - setSize int - sizeLimit int - values []interface{} - bulkSize syncfloat64.Histogram - bulkDuration syncfloat64.Histogram + conn Pool + table string + columns string + template string + setSize int + sizeLimit int + values []interface{} } func (b *bulkImpl) Append(args ...interface{}) error { @@ -79,18 +73,15 @@ func (b *bulkImpl) send() error { return fmt.Errorf("send bulk err: %s", err) } // Save bulk metrics - ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*200) - b.bulkDuration.Record(ctx, float64(time.Now().Sub(start).Milliseconds()), attribute.String("table", b.table)) - b.bulkSize.Record(ctx, float64(size), attribute.String("table", b.table)) + database.RecordBulkElements(float64(size), "pg", b.table) + database.RecordBulkInsertDuration(float64(time.Now().Sub(start).Milliseconds()), "pg", b.table) return nil } -func NewBulk(conn Pool, metrics *monitoring.Metrics, table, columns, template string, setSize, sizeLimit int) (Bulk, error) { +func NewBulk(conn Pool, table, columns, template string, setSize, sizeLimit int) (Bulk, error) { switch { case conn == nil: return nil, errors.New("db conn is empty") - case metrics == nil: - return nil, errors.New("metrics is empty") case table == "": return nil, errors.New("table is empty") case columns == "": @@ -102,23 +93,13 @@ func NewBulk(conn Pool, metrics *monitoring.Metrics, table, columns, template st case sizeLimit <= 0: return nil, errors.New("size limit is wrong") } - messagesInBulk, err := metrics.RegisterHistogram("messages_in_bulk") - if err != nil { - log.Printf("can't create messages_size metric: %s", err) - } - bulkInsertDuration, err := metrics.RegisterHistogram("bulk_insert_duration") - if err != nil { - log.Printf("can't create messages_size metric: %s", err) - } return &bulkImpl{ - conn: conn, - table: table, - columns: columns, - template: template, - setSize: setSize, - sizeLimit: sizeLimit, - values: make([]interface{}, 0, setSize*sizeLimit), - bulkSize: messagesInBulk, - bulkDuration: bulkInsertDuration, + conn: conn, + table: table, + columns: columns, + template: template, + setSize: setSize, + sizeLimit: sizeLimit, + values: make([]interface{}, 0, setSize*sizeLimit), }, nil } diff --git a/backend/pkg/db/postgres/bulks.go b/backend/pkg/db/postgres/bulks.go index 5774ba184..f3e9e95c9 100644 --- a/backend/pkg/db/postgres/bulks.go +++ b/backend/pkg/db/postgres/bulks.go @@ -2,7 +2,6 @@ package postgres import ( "log" - "openreplay/backend/pkg/monitoring" "time" ) @@ -30,16 +29,14 @@ type BulkSet struct { webCustomEvents Bulk webClickEvents Bulk webNetworkRequest Bulk - metrics *monitoring.Metrics workerTask chan *bulksTask done chan struct{} finished chan struct{} } -func NewBulkSet(c Pool, metrics *monitoring.Metrics) *BulkSet { +func NewBulkSet(c Pool) *BulkSet { bs := &BulkSet{ c: c, - metrics: metrics, workerTask: make(chan *bulksTask, 1), done: make(chan struct{}), finished: make(chan struct{}), @@ -86,7 +83,7 @@ func (conn *BulkSet) Get(name string) Bulk { func (conn *BulkSet) initBulks() { var err error - conn.autocompletes, err = NewBulk(conn.c, conn.metrics, + conn.autocompletes, err = NewBulk(conn.c, "autocomplete", "(value, type, project_id)", "($%d, $%d, $%d)", @@ -94,7 +91,7 @@ func (conn *BulkSet) initBulks() { if err != nil { log.Fatalf("can't create autocomplete bulk: %s", err) } - conn.requests, err = NewBulk(conn.c, conn.metrics, + conn.requests, err = NewBulk(conn.c, "events_common.requests", "(session_id, timestamp, seq_index, url, duration, success)", "($%d, $%d, $%d, LEFT($%d, 8000), $%d, $%d)", @@ -102,7 +99,7 @@ func (conn *BulkSet) initBulks() { if err != nil { log.Fatalf("can't create requests bulk: %s", err) } - conn.customEvents, err = NewBulk(conn.c, conn.metrics, + conn.customEvents, err = NewBulk(conn.c, "events_common.customs", "(session_id, timestamp, seq_index, name, payload)", "($%d, $%d, $%d, LEFT($%d, 2000), $%d)", @@ -110,7 +107,7 @@ func (conn *BulkSet) initBulks() { if err != nil { log.Fatalf("can't create customEvents bulk: %s", err) } - conn.webPageEvents, err = NewBulk(conn.c, conn.metrics, + conn.webPageEvents, err = NewBulk(conn.c, "events.pages", "(session_id, message_id, timestamp, referrer, base_referrer, host, path, query, dom_content_loaded_time, "+ "load_time, response_end, first_paint_time, first_contentful_paint_time, speed_index, visually_complete, "+ @@ -122,7 +119,7 @@ func (conn *BulkSet) initBulks() { if err != nil { log.Fatalf("can't create webPageEvents bulk: %s", err) } - conn.webInputEvents, err = NewBulk(conn.c, conn.metrics, + conn.webInputEvents, err = NewBulk(conn.c, "events.inputs", "(session_id, message_id, timestamp, value, label)", "($%d, $%d, $%d, LEFT($%d, 2000), NULLIF(LEFT($%d, 2000),''))", @@ -130,7 +127,7 @@ func (conn *BulkSet) initBulks() { if err != nil { log.Fatalf("can't create webPageEvents bulk: %s", err) } - conn.webGraphQL, err = NewBulk(conn.c, conn.metrics, + conn.webGraphQL, err = NewBulk(conn.c, "events.graphql", "(session_id, timestamp, message_id, name, request_body, response_body)", "($%d, $%d, $%d, LEFT($%d, 2000), $%d, $%d)", @@ -138,7 +135,7 @@ func (conn *BulkSet) initBulks() { if err != nil { log.Fatalf("can't create webPageEvents bulk: %s", err) } - conn.webErrors, err = NewBulk(conn.c, conn.metrics, + conn.webErrors, err = NewBulk(conn.c, "errors", "(error_id, project_id, source, name, message, payload)", "($%d, $%d, $%d, $%d, $%d, $%d::jsonb)", @@ -146,7 +143,7 @@ func (conn *BulkSet) initBulks() { if err != nil { log.Fatalf("can't create webErrors bulk: %s", err) } - conn.webErrorEvents, err = NewBulk(conn.c, conn.metrics, + conn.webErrorEvents, err = NewBulk(conn.c, "events.errors", "(session_id, message_id, timestamp, error_id)", "($%d, $%d, $%d, $%d)", @@ -154,7 +151,7 @@ func (conn *BulkSet) initBulks() { if err != nil { log.Fatalf("can't create webErrorEvents bulk: %s", err) } - conn.webErrorTags, err = NewBulk(conn.c, conn.metrics, + conn.webErrorTags, err = NewBulk(conn.c, "public.errors_tags", "(session_id, message_id, error_id, key, value)", "($%d, $%d, $%d, $%d, $%d)", @@ -162,7 +159,7 @@ func (conn *BulkSet) initBulks() { if err != nil { log.Fatalf("can't create webErrorEvents bulk: %s", err) } - conn.webIssues, err = NewBulk(conn.c, conn.metrics, + conn.webIssues, err = NewBulk(conn.c, "issues", "(project_id, issue_id, type, context_string)", "($%d, $%d, $%d, $%d)", @@ -170,7 +167,7 @@ func (conn *BulkSet) initBulks() { if err != nil { log.Fatalf("can't create webIssues bulk: %s", err) } - conn.webIssueEvents, err = NewBulk(conn.c, conn.metrics, + conn.webIssueEvents, err = NewBulk(conn.c, "events_common.issues", "(session_id, issue_id, timestamp, seq_index, payload)", "($%d, $%d, $%d, $%d, CAST($%d AS jsonb))", @@ -178,7 +175,7 @@ func (conn *BulkSet) initBulks() { if err != nil { log.Fatalf("can't create webIssueEvents bulk: %s", err) } - conn.webCustomEvents, err = NewBulk(conn.c, conn.metrics, + conn.webCustomEvents, err = NewBulk(conn.c, "events_common.customs", "(session_id, seq_index, timestamp, name, payload, level)", "($%d, $%d, $%d, LEFT($%d, 2000), $%d, $%d)", @@ -186,7 +183,7 @@ func (conn *BulkSet) initBulks() { if err != nil { log.Fatalf("can't create webCustomEvents bulk: %s", err) } - conn.webClickEvents, err = NewBulk(conn.c, conn.metrics, + conn.webClickEvents, err = NewBulk(conn.c, "events.clicks", "(session_id, message_id, timestamp, label, selector, url, path)", "($%d, $%d, $%d, NULLIF(LEFT($%d, 2000), ''), LEFT($%d, 8000), LEFT($%d, 2000), LEFT($%d, 2000))", @@ -194,7 +191,7 @@ func (conn *BulkSet) initBulks() { if err != nil { log.Fatalf("can't create webClickEvents bulk: %s", err) } - conn.webNetworkRequest, err = NewBulk(conn.c, conn.metrics, + conn.webNetworkRequest, err = NewBulk(conn.c, "events_common.requests", "(session_id, timestamp, seq_index, url, host, path, query, request_body, response_body, status_code, method, duration, success)", "($%d, $%d, $%d, LEFT($%d, 8000), LEFT($%d, 300), LEFT($%d, 2000), LEFT($%d, 8000), $%d, $%d, $%d::smallint, NULLIF($%d, '')::http_method, $%d, $%d)", diff --git a/backend/pkg/db/postgres/connector.go b/backend/pkg/db/postgres/connector.go index 2e8f3d425..6904dc135 100644 --- a/backend/pkg/db/postgres/connector.go +++ b/backend/pkg/db/postgres/connector.go @@ -2,11 +2,10 @@ package postgres import ( "context" - "github.com/jackc/pgx/v4/pgxpool" - "go.opentelemetry.io/otel/metric/instrument/syncfloat64" "log" + + "github.com/jackc/pgx/v4/pgxpool" "openreplay/backend/pkg/db/types" - "openreplay/backend/pkg/monitoring" ) type CH interface { @@ -15,36 +14,28 @@ type CH interface { // Conn contains batches, bulks and cache for all sessions type Conn struct { - c Pool - batches *BatchSet - bulks *BulkSet - batchSizeBytes syncfloat64.Histogram - batchSizeLines syncfloat64.Histogram - sqlRequestTime syncfloat64.Histogram - sqlRequestCounter syncfloat64.Counter - chConn CH + c Pool + batches *BatchSet + bulks *BulkSet + chConn CH } func (conn *Conn) SetClickHouse(ch CH) { conn.chConn = ch } -func NewConn(url string, queueLimit, sizeLimit int, metrics *monitoring.Metrics) *Conn { - if metrics == nil { - log.Fatalf("metrics is nil") - } +func NewConn(url string, queueLimit, sizeLimit int) *Conn { c, err := pgxpool.Connect(context.Background(), url) if err != nil { log.Fatalf("pgxpool.Connect err: %s", err) } conn := &Conn{} - conn.initMetrics(metrics) - conn.c, err = NewPool(c, conn.sqlRequestTime, conn.sqlRequestCounter) + conn.c, err = NewPool(c) if err != nil { log.Fatalf("can't create new pool wrapper: %s", err) } - conn.bulks = NewBulkSet(conn.c, metrics) - conn.batches = NewBatchSet(conn.c, queueLimit, sizeLimit, metrics) + conn.bulks = NewBulkSet(conn.c) + conn.batches = NewBatchSet(conn.c, queueLimit, sizeLimit) return conn } @@ -55,26 +46,6 @@ func (conn *Conn) Close() error { return nil } -func (conn *Conn) initMetrics(metrics *monitoring.Metrics) { - var err error - conn.batchSizeBytes, err = metrics.RegisterHistogram("batch_size_bytes") - if err != nil { - log.Printf("can't create batchSizeBytes metric: %s", err) - } - conn.batchSizeLines, err = metrics.RegisterHistogram("batch_size_lines") - if err != nil { - log.Printf("can't create batchSizeLines metric: %s", err) - } - conn.sqlRequestTime, err = metrics.RegisterHistogram("sql_request_time") - if err != nil { - log.Printf("can't create sqlRequestTime metric: %s", err) - } - conn.sqlRequestCounter, err = metrics.RegisterCounter("sql_request_number") - if err != nil { - log.Printf("can't create sqlRequestNumber metric: %s", err) - } -} - func (conn *Conn) insertAutocompleteValue(sessionID uint64, projectID uint32, tp string, value string) { if len(value) == 0 { return diff --git a/backend/pkg/db/postgres/pool.go b/backend/pkg/db/postgres/pool.go index 5f9cbaa29..5214be8d0 100644 --- a/backend/pkg/db/postgres/pool.go +++ b/backend/pkg/db/postgres/pool.go @@ -3,12 +3,12 @@ package postgres import ( "context" "errors" - "github.com/jackc/pgx/v4" - "github.com/jackc/pgx/v4/pgxpool" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric/instrument/syncfloat64" "strings" "time" + + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" + "openreplay/backend/pkg/metrics/database" ) // Pool is a pgx.Pool wrapper with metrics integration @@ -22,19 +22,15 @@ type Pool interface { } type poolImpl struct { - conn *pgxpool.Pool - sqlRequestTime syncfloat64.Histogram - sqlRequestCounter syncfloat64.Counter + conn *pgxpool.Pool } func (p *poolImpl) Query(sql string, args ...interface{}) (pgx.Rows, error) { start := time.Now() res, err := p.conn.Query(getTimeoutContext(), sql, args...) method, table := methodName(sql) - p.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), - attribute.String("method", method), attribute.String("table", table)) - p.sqlRequestCounter.Add(context.Background(), 1, - attribute.String("method", method), attribute.String("table", table)) + database.RecordRequestDuration(float64(time.Now().Sub(start).Milliseconds()), method, table) + database.IncreaseTotalRequests(method, table) return res, err } @@ -42,10 +38,8 @@ func (p *poolImpl) QueryRow(sql string, args ...interface{}) pgx.Row { start := time.Now() res := p.conn.QueryRow(getTimeoutContext(), sql, args...) method, table := methodName(sql) - p.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), - attribute.String("method", method), attribute.String("table", table)) - p.sqlRequestCounter.Add(context.Background(), 1, - attribute.String("method", method), attribute.String("table", table)) + database.RecordRequestDuration(float64(time.Now().Sub(start).Milliseconds()), method, table) + database.IncreaseTotalRequests(method, table) return res } @@ -53,45 +47,37 @@ func (p *poolImpl) Exec(sql string, arguments ...interface{}) error { start := time.Now() _, err := p.conn.Exec(getTimeoutContext(), sql, arguments...) method, table := methodName(sql) - p.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), - attribute.String("method", method), attribute.String("table", table)) - p.sqlRequestCounter.Add(context.Background(), 1, - attribute.String("method", method), attribute.String("table", table)) + database.RecordRequestDuration(float64(time.Now().Sub(start).Milliseconds()), method, table) + database.IncreaseTotalRequests(method, table) return err } func (p *poolImpl) SendBatch(b *pgx.Batch) pgx.BatchResults { start := time.Now() res := p.conn.SendBatch(getTimeoutContext(), b) - p.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), - attribute.String("method", "sendBatch")) - p.sqlRequestCounter.Add(context.Background(), 1, - attribute.String("method", "sendBatch")) + database.RecordRequestDuration(float64(time.Now().Sub(start).Milliseconds()), "sendBatch", "") + database.IncreaseTotalRequests("sendBatch", "") return res } func (p *poolImpl) Begin() (*_Tx, error) { start := time.Now() tx, err := p.conn.Begin(context.Background()) - p.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), - attribute.String("method", "begin")) - p.sqlRequestCounter.Add(context.Background(), 1, - attribute.String("method", "begin")) - return &_Tx{tx, p.sqlRequestTime, p.sqlRequestCounter}, err + database.RecordRequestDuration(float64(time.Now().Sub(start).Milliseconds()), "begin", "") + database.IncreaseTotalRequests("begin", "") + return &_Tx{tx}, err } func (p *poolImpl) Close() { p.conn.Close() } -func NewPool(conn *pgxpool.Pool, sqlRequestTime syncfloat64.Histogram, sqlRequestCounter syncfloat64.Counter) (Pool, error) { +func NewPool(conn *pgxpool.Pool) (Pool, error) { if conn == nil { return nil, errors.New("conn is empty") } return &poolImpl{ - conn: conn, - sqlRequestTime: sqlRequestTime, - sqlRequestCounter: sqlRequestCounter, + conn: conn, }, nil } @@ -99,38 +85,30 @@ func NewPool(conn *pgxpool.Pool, sqlRequestTime syncfloat64.Histogram, sqlReques type _Tx struct { pgx.Tx - sqlRequestTime syncfloat64.Histogram - sqlRequestCounter syncfloat64.Counter } func (tx *_Tx) exec(sql string, args ...interface{}) error { start := time.Now() _, err := tx.Exec(context.Background(), sql, args...) method, table := methodName(sql) - tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), - attribute.String("method", method), attribute.String("table", table)) - tx.sqlRequestCounter.Add(context.Background(), 1, - attribute.String("method", method), attribute.String("table", table)) + database.RecordRequestDuration(float64(time.Now().Sub(start).Milliseconds()), method, table) + database.IncreaseTotalRequests(method, table) return err } func (tx *_Tx) rollback() error { start := time.Now() err := tx.Rollback(context.Background()) - tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), - attribute.String("method", "rollback")) - tx.sqlRequestCounter.Add(context.Background(), 1, - attribute.String("method", "rollback")) + database.RecordRequestDuration(float64(time.Now().Sub(start).Milliseconds()), "rollback", "") + database.IncreaseTotalRequests("rollback", "") return err } func (tx *_Tx) commit() error { start := time.Now() err := tx.Commit(context.Background()) - tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()), - attribute.String("method", "commit")) - tx.sqlRequestCounter.Add(context.Background(), 1, - attribute.String("method", "commit")) + database.RecordRequestDuration(float64(time.Now().Sub(start).Milliseconds()), "commit", "") + database.IncreaseTotalRequests("commit", "") return err } @@ -169,7 +147,8 @@ func methodName(sql string) (string, string) { case "update": table = strings.TrimSpace(parts[1]) case "insert": - table = strings.TrimSpace(parts[2]) + tableNameParts := strings.Split(strings.TrimSpace(parts[2]), "(") + table = tableNameParts[0] } return cmd, table } diff --git a/backend/pkg/messages/iterator-sink.go b/backend/pkg/messages/iterator-sink.go index a5897c3b7..be12b63eb 100644 --- a/backend/pkg/messages/iterator-sink.go +++ b/backend/pkg/messages/iterator-sink.go @@ -3,6 +3,7 @@ package messages import ( "fmt" "log" + "openreplay/backend/pkg/metrics/sink" ) type sinkMessageIteratorImpl struct { @@ -53,6 +54,8 @@ func (i *sinkMessageIteratorImpl) sendBatchEnd() { } func (i *sinkMessageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) { + sink.RecordBatchSize(float64(len(batchData))) + sink.IncreaseTotalBatches() // Create new message reader reader := NewMessageReader(batchData) diff --git a/backend/pkg/messages/iterator.go b/backend/pkg/messages/iterator.go index a6717257e..f7b014d30 100644 --- a/backend/pkg/messages/iterator.go +++ b/backend/pkg/messages/iterator.go @@ -74,12 +74,13 @@ func (i *messageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) { i.messageInfo.Index++ msg := reader.Message() + msgType := msg.TypeID() // Preprocess "system" messages if _, ok := i.preFilter[msg.TypeID()]; ok { msg = msg.Decode() if msg == nil { - log.Printf("decode error, type: %d, info: %s", msg.TypeID(), i.batchInfo.Info()) + log.Printf("decode error, type: %d, info: %s", msgType, i.batchInfo.Info()) return } msg = transformDeprecated(msg) @@ -99,7 +100,7 @@ func (i *messageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) { if i.autoDecode { msg = msg.Decode() if msg == nil { - log.Printf("decode error, type: %d, info: %s", msg.TypeID(), i.batchInfo.Info()) + log.Printf("decode error, type: %d, info: %s", msgType, i.batchInfo.Info()) return } } diff --git a/backend/pkg/metrics/assets/metrics.go b/backend/pkg/metrics/assets/metrics.go new file mode 100644 index 000000000..44af0dfa9 --- /dev/null +++ b/backend/pkg/metrics/assets/metrics.go @@ -0,0 +1,72 @@ +package assets + +import ( + "github.com/prometheus/client_golang/prometheus" + "openreplay/backend/pkg/metrics/common" + "strconv" +) + +var assetsProcessedSessions = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "assets", + Name: "processed_total", + Help: "A counter displaying the total count of processed assets.", + }, +) + +func IncreaseProcessesSessions() { + assetsProcessedSessions.Inc() +} + +var assetsSavedSessions = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "assets", + Name: "saved_total", + Help: "A counter displaying the total number of cached assets.", + }, +) + +func IncreaseSavedSessions() { + assetsSavedSessions.Inc() +} + +var assetsDownloadDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "assets", + Name: "download_duration_seconds", + Help: "A histogram displaying the duration of downloading for each asset in seconds.", + Buckets: common.DefaultDurationBuckets, + }, + []string{"response_code"}, +) + +func RecordDownloadDuration(durMillis float64, code int) { + assetsDownloadDuration.WithLabelValues(strconv.Itoa(code)).Observe(durMillis / 1000.0) +} + +var assetsUploadDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "assets", + Name: "upload_s3_duration_seconds", + Help: "A histogram displaying the duration of uploading to s3 for each asset in seconds.", + Buckets: common.DefaultDurationBuckets, + }, + []string{"failed"}, +) + +func RecordUploadDuration(durMillis float64, isFailed bool) { + failed := "false" + if isFailed { + failed = "true" + } + assetsUploadDuration.WithLabelValues(failed).Observe(durMillis / 1000.0) +} + +func List() []prometheus.Collector { + return []prometheus.Collector{ + assetsProcessedSessions, + assetsSavedSessions, + assetsDownloadDuration, + assetsUploadDuration, + } +} diff --git a/backend/pkg/metrics/common/metrics.go b/backend/pkg/metrics/common/metrics.go new file mode 100644 index 000000000..85b66c713 --- /dev/null +++ b/backend/pkg/metrics/common/metrics.go @@ -0,0 +1,11 @@ +package common + +// DefaultDurationBuckets is a set of buckets from 5 milliseconds to 1000 seconds (16.6667 minutes) +var DefaultDurationBuckets = []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 25, 50, 100, 250, 500, 1000} + +// DefaultSizeBuckets is a set of buckets from 1 byte to 1_000_000_000 bytes (~1 Gb) +var DefaultSizeBuckets = []float64{1, 10, 50, 100, 250, 500, 1000, 2500, 5000, 10000, 25000, 50000, 100_000, 250_000, + 500_000, 1_000_000, 10_000_000, 100_000_000, 1_000_000_000} + +// DefaultBuckets is a set of buckets from 1 to 1_000_000 elements +var DefaultBuckets = []float64{1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10_000, 50_000, 100_000, 1_000_000} diff --git a/backend/pkg/metrics/database/metrics.go b/backend/pkg/metrics/database/metrics.go new file mode 100644 index 000000000..a9f3990cd --- /dev/null +++ b/backend/pkg/metrics/database/metrics.go @@ -0,0 +1,127 @@ +package database + +import ( + "github.com/prometheus/client_golang/prometheus" + "openreplay/backend/pkg/metrics/common" +) + +var dbBatchSize = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "db", + Name: "batch_size_bytes", + Help: "A histogram displaying the batch size in bytes.", + Buckets: common.DefaultSizeBuckets, + }, +) + +func RecordBatchSize(size float64) { + dbBatchSize.Observe(size) +} + +var dbBatchElements = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "db", + Name: "batch_size_elements", + Help: "A histogram displaying the number of SQL commands in each batch.", + Buckets: common.DefaultBuckets, + }, +) + +func RecordBatchElements(number float64) { + dbBatchElements.Observe(number) +} + +var dbBatchInsertDuration = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "db", + Name: "batch_insert_duration_seconds", + Help: "A histogram displaying the duration of batch inserts in seconds.", + Buckets: common.DefaultDurationBuckets, + }, +) + +func RecordBatchInsertDuration(durMillis float64) { + dbBatchInsertDuration.Observe(durMillis / 1000.0) +} + +var dbBulkSize = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "db", + Name: "bulk_size_bytes", + Help: "A histogram displaying the bulk size in bytes.", + Buckets: common.DefaultSizeBuckets, + }, + []string{"db", "table"}, +) + +func RecordBulkSize(size float64, db, table string) { + dbBulkSize.WithLabelValues(db, table).Observe(size) +} + +var dbBulkElements = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "db", + Name: "bulk_size_elements", + Help: "A histogram displaying the size of data set in each bulk.", + Buckets: common.DefaultBuckets, + }, + []string{"db", "table"}, +) + +func RecordBulkElements(size float64, db, table string) { + dbBulkElements.WithLabelValues(db, table).Observe(size) +} + +var dbBulkInsertDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "db", + Name: "bulk_insert_duration_seconds", + Help: "A histogram displaying the duration of bulk inserts in seconds.", + Buckets: common.DefaultDurationBuckets, + }, + []string{"db", "table"}, +) + +func RecordBulkInsertDuration(durMillis float64, db, table string) { + dbBulkInsertDuration.WithLabelValues(db, table).Observe(durMillis / 1000.0) +} + +var dbRequestDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "db", + Name: "request_duration_seconds", + Help: "A histogram displaying the duration of each sql request in seconds.", + Buckets: common.DefaultDurationBuckets, + }, + []string{"method", "table"}, +) + +func RecordRequestDuration(durMillis float64, method, table string) { + dbRequestDuration.WithLabelValues(method, table).Observe(durMillis / 1000.0) +} + +var dbTotalRequests = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "db", + Name: "requests_total", + Help: "A counter showing the total number of all SQL requests.", + }, + []string{"method", "table"}, +) + +func IncreaseTotalRequests(method, table string) { + dbTotalRequests.WithLabelValues(method, table).Inc() +} + +func List() []prometheus.Collector { + return []prometheus.Collector{ + dbBatchSize, + dbBatchElements, + dbBatchInsertDuration, + dbBulkSize, + dbBulkElements, + dbBulkInsertDuration, + dbRequestDuration, + dbTotalRequests, + } +} diff --git a/backend/pkg/metrics/ender/metrics.go b/backend/pkg/metrics/ender/metrics.go new file mode 100644 index 000000000..5e3308554 --- /dev/null +++ b/backend/pkg/metrics/ender/metrics.go @@ -0,0 +1,51 @@ +package ender + +import "github.com/prometheus/client_golang/prometheus" + +var enderActiveSessions = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "ender", + Name: "sessions_active", + Help: "A gauge displaying the number of active (live) sessions.", + }, +) + +func IncreaseActiveSessions() { + enderActiveSessions.Inc() +} + +func DecreaseActiveSessions() { + enderActiveSessions.Dec() +} + +var enderClosedSessions = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "ender", + Name: "sessions_closed", + Help: "A counter displaying the number of closed sessions (sent SessionEnd).", + }, +) + +func IncreaseClosedSessions() { + enderClosedSessions.Inc() +} + +var enderTotalSessions = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "ender", + Name: "sessions_total", + Help: "A counter displaying the number of all processed sessions.", + }, +) + +func IncreaseTotalSessions() { + enderTotalSessions.Inc() +} + +func List() []prometheus.Collector { + return []prometheus.Collector{ + enderActiveSessions, + enderClosedSessions, + enderTotalSessions, + } +} diff --git a/backend/pkg/metrics/http/metrics.go b/backend/pkg/metrics/http/metrics.go new file mode 100644 index 000000000..7a835d7f6 --- /dev/null +++ b/backend/pkg/metrics/http/metrics.go @@ -0,0 +1,55 @@ +package http + +import ( + "github.com/prometheus/client_golang/prometheus" + "openreplay/backend/pkg/metrics/common" + "strconv" +) + +var httpRequestSize = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "http", + Name: "request_size_bytes", + Help: "A histogram displaying the size of each HTTP request in bytes.", + Buckets: common.DefaultSizeBuckets, + }, + []string{"url", "response_code"}, +) + +func RecordRequestSize(size float64, url string, code int) { + httpRequestSize.WithLabelValues(url, strconv.Itoa(code)).Observe(size) +} + +var httpRequestDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "http", + Name: "request_duration_seconds", + Help: "A histogram displaying the duration of each HTTP request in seconds.", + Buckets: common.DefaultDurationBuckets, + }, + []string{"url", "response_code"}, +) + +func RecordRequestDuration(durMillis float64, url string, code int) { + httpRequestDuration.WithLabelValues(url, strconv.Itoa(code)).Observe(durMillis / 1000.0) +} + +var httpTotalRequests = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "http", + Name: "requests_total", + Help: "A counter displaying the number all HTTP requests.", + }, +) + +func IncreaseTotalRequests() { + httpTotalRequests.Inc() +} + +func List() []prometheus.Collector { + return []prometheus.Collector{ + httpRequestSize, + httpRequestDuration, + httpTotalRequests, + } +} diff --git a/backend/pkg/metrics/server.go b/backend/pkg/metrics/server.go new file mode 100644 index 000000000..fb3be5afc --- /dev/null +++ b/backend/pkg/metrics/server.go @@ -0,0 +1,40 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promhttp" + "log" + "net/http" +) + +type MetricServer struct { + registry *prometheus.Registry +} + +func New() *MetricServer { + registry := prometheus.NewRegistry() + // Add go runtime metrics and process collectors. + registry.MustRegister( + collectors.NewGoCollector(), + collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), + ) + // Expose /metrics HTTP endpoint using the created custom registry. + http.Handle( + "/metrics", promhttp.HandlerFor( + registry, + promhttp.HandlerOpts{ + EnableOpenMetrics: true, + }), + ) + go func() { + log.Println(http.ListenAndServe(":8888", nil)) + }() + return &MetricServer{ + registry: registry, + } +} + +func (s *MetricServer) Register(cs []prometheus.Collector) { + s.registry.MustRegister(cs...) +} diff --git a/backend/pkg/metrics/sink/metrics.go b/backend/pkg/metrics/sink/metrics.go new file mode 100644 index 000000000..52cb73ba1 --- /dev/null +++ b/backend/pkg/metrics/sink/metrics.go @@ -0,0 +1,185 @@ +package sink + +import ( + "github.com/prometheus/client_golang/prometheus" + "openreplay/backend/pkg/metrics/common" +) + +var sinkMessageSize = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "sink", + Name: "message_size_bytes", + Help: "A histogram displaying the size of each message in bytes.", + Buckets: common.DefaultSizeBuckets, + }, +) + +func RecordMessageSize(size float64) { + sinkMessageSize.Observe(size) +} + +var sinkWrittenMessages = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "sink", + Name: "messages_written", + Help: "A counter displaying the total number of all written messages.", + }, +) + +func IncreaseWrittenMessages() { + sinkWrittenMessages.Inc() +} + +var sinkTotalMessages = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "sink", + Name: "messages_total", + Help: "A counter displaying the total number of all processed messages.", + }, +) + +func IncreaseTotalMessages() { + sinkTotalMessages.Inc() +} + +var sinkBatchSize = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "sink", + Name: "batch_size_bytes", + Help: "A histogram displaying the size of each batch in bytes.", + Buckets: common.DefaultSizeBuckets, + }, +) + +func RecordBatchSize(size float64) { + sinkBatchSize.Observe(size) +} + +var sinkTotalBatches = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "sink", + Name: "batches_total", + Help: "A counter displaying the total number of all written batches.", + }, +) + +func IncreaseTotalBatches() { + sinkTotalBatches.Inc() +} + +var sinkWrittenBytes = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "sink", + Name: "written_bytes", + Help: "A histogram displaying the size of buffer in bytes written to session file.", + Buckets: common.DefaultSizeBuckets, + }, + []string{"file_type"}, +) + +func RecordWrittenBytes(size float64, fileType string) { + if size == 0 { + return + } + sinkWrittenBytes.WithLabelValues(fileType).Observe(size) + IncreaseTotalWrittenBytes(size, fileType) +} + +var sinkTotalWrittenBytes = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "sink", + Name: "written_bytes_total", + Help: "A counter displaying the total number of bytes written to all session files.", + }, + []string{"file_type"}, +) + +func IncreaseTotalWrittenBytes(size float64, fileType string) { + if size == 0 { + return + } + sinkTotalWrittenBytes.WithLabelValues(fileType).Add(size) +} + +var sinkCachedAssets = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "sink", + Name: "assets_cached", + Help: "A gauge displaying the current number of cached assets.", + }, +) + +func IncreaseCachedAssets() { + sinkCachedAssets.Inc() +} + +func DecreaseCachedAssets() { + sinkCachedAssets.Dec() +} + +var sinkSkippedAssets = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "sink", + Name: "assets_skipped", + Help: "A counter displaying the total number of all skipped assets.", + }, +) + +func IncreaseSkippedAssets() { + sinkSkippedAssets.Inc() +} + +var sinkTotalAssets = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "sink", + Name: "assets_total", + Help: "A counter displaying the total number of all processed assets.", + }, +) + +func IncreaseTotalAssets() { + sinkTotalAssets.Inc() +} + +var sinkAssetSize = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "sink", + Name: "asset_size_bytes", + Help: "A histogram displaying the size of each asset in bytes.", + Buckets: common.DefaultSizeBuckets, + }, +) + +func RecordAssetSize(size float64) { + sinkAssetSize.Observe(size) +} + +var sinkProcessAssetDuration = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "sink", + Name: "asset_process_duration_seconds", + Help: "A histogram displaying the duration of processing for each asset in seconds.", + Buckets: common.DefaultDurationBuckets, + }, +) + +func RecordProcessAssetDuration(durMillis float64) { + sinkProcessAssetDuration.Observe(durMillis / 1000.0) +} + +func List() []prometheus.Collector { + return []prometheus.Collector{ + sinkMessageSize, + sinkWrittenMessages, + sinkTotalMessages, + sinkBatchSize, + sinkTotalBatches, + sinkWrittenBytes, + sinkTotalWrittenBytes, + sinkCachedAssets, + sinkSkippedAssets, + sinkTotalAssets, + sinkAssetSize, + sinkProcessAssetDuration, + } +} diff --git a/backend/pkg/metrics/storage/metrics.go b/backend/pkg/metrics/storage/metrics.go new file mode 100644 index 000000000..26459c90d --- /dev/null +++ b/backend/pkg/metrics/storage/metrics.go @@ -0,0 +1,114 @@ +package storage + +import ( + "github.com/prometheus/client_golang/prometheus" + "openreplay/backend/pkg/metrics/common" +) + +var storageSessionSize = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "storage", + Name: "session_size_bytes", + Help: "A histogram displaying the size of each session file in bytes prior to any manipulation.", + Buckets: common.DefaultSizeBuckets, + }, + []string{"file_type"}, +) + +func RecordSessionSize(fileSize float64, fileType string) { + storageSessionSize.WithLabelValues(fileType).Observe(fileSize) +} + +var storageTotalSessions = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "storage", + Name: "sessions_total", + Help: "A counter displaying the total number of all processed sessions.", + }, +) + +func IncreaseStorageTotalSessions() { + storageTotalSessions.Inc() +} + +var storageSessionReadDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "storage", + Name: "read_duration_seconds", + Help: "A histogram displaying the duration of reading for each session in seconds.", + Buckets: common.DefaultDurationBuckets, + }, + []string{"file_type"}, +) + +func RecordSessionReadDuration(durMillis float64, fileType string) { + storageSessionReadDuration.WithLabelValues(fileType).Observe(durMillis / 1000.0) +} + +var storageSessionSortDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "storage", + Name: "sort_duration_seconds", + Help: "A histogram displaying the duration of sorting for each session in seconds.", + Buckets: common.DefaultDurationBuckets, + }, + []string{"file_type"}, +) + +func RecordSessionSortDuration(durMillis float64, fileType string) { + storageSessionSortDuration.WithLabelValues(fileType).Observe(durMillis / 1000.0) +} + +var storageSessionEncodeDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "storage", + Name: "encode_duration_seconds", + Help: "A histogram displaying the duration of encoding for each session in seconds.", + Buckets: common.DefaultDurationBuckets, + }, + []string{"file_type"}, +) + +func RecordSessionEncodeDuration(durMillis float64, fileType string) { + storageSessionEncodeDuration.WithLabelValues(fileType).Observe(durMillis / 1000.0) +} + +var storageSessionCompressDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "storage", + Name: "compress_duration_seconds", + Help: "A histogram displaying the duration of compressing for each session in seconds.", + Buckets: common.DefaultDurationBuckets, + }, + []string{"file_type"}, +) + +func RecordSessionCompressDuration(durMillis float64, fileType string) { + storageSessionCompressDuration.WithLabelValues(fileType).Observe(durMillis / 1000.0) +} + +var storageSessionUploadDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "storage", + Name: "upload_duration_seconds", + Help: "A histogram displaying the duration of uploading to s3 for each session in seconds.", + Buckets: common.DefaultDurationBuckets, + }, + []string{"file_type"}, +) + +func RecordSessionUploadDuration(durMillis float64, fileType string) { + storageSessionUploadDuration.WithLabelValues(fileType).Observe(durMillis / 1000.0) +} + +func List() []prometheus.Collector { + return []prometheus.Collector{ + storageSessionSize, + storageTotalSessions, + storageSessionReadDuration, + storageSessionSortDuration, + storageSessionEncodeDuration, + storageSessionCompressDuration, + storageSessionUploadDuration, + } +} diff --git a/ee/backend/pkg/db/clickhouse/bulk.go b/ee/backend/pkg/db/clickhouse/bulk.go index 706b66f68..6eb8d98fd 100644 --- a/ee/backend/pkg/db/clickhouse/bulk.go +++ b/ee/backend/pkg/db/clickhouse/bulk.go @@ -5,6 +5,8 @@ import ( "errors" "fmt" "log" + "openreplay/backend/pkg/metrics/database" + "time" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" ) @@ -16,19 +18,23 @@ type Bulk interface { type bulkImpl struct { conn driver.Conn + table string query string values [][]interface{} } -func NewBulk(conn driver.Conn, query string) (Bulk, error) { +func NewBulk(conn driver.Conn, table, query string) (Bulk, error) { switch { case conn == nil: return nil, errors.New("clickhouse connection is empty") + case table == "": + return nil, errors.New("table is empty") case query == "": return nil, errors.New("query is empty") } return &bulkImpl{ conn: conn, + table: table, query: query, values: make([][]interface{}, 0), }, nil @@ -40,6 +46,7 @@ func (b *bulkImpl) Append(args ...interface{}) error { } func (b *bulkImpl) Send() error { + start := time.Now() batch, err := b.conn.PrepareBatch(context.Background(), b.query) if err != nil { return fmt.Errorf("can't create new batch: %s", err) @@ -50,6 +57,11 @@ func (b *bulkImpl) Send() error { log.Printf("failed query: %s", b.query) } } + err = batch.Send() + // Save bulk metrics + database.RecordBulkElements(float64(len(b.values)), "ch", b.table) + database.RecordBulkInsertDuration(float64(time.Now().Sub(start).Milliseconds()), "ch", b.table) + // Prepare values slice for a new data b.values = make([][]interface{}, 0) - return batch.Send() + return err } diff --git a/ee/backend/pkg/db/clickhouse/connector.go b/ee/backend/pkg/db/clickhouse/connector.go index 157d384b9..b872adcc2 100644 --- a/ee/backend/pkg/db/clickhouse/connector.go +++ b/ee/backend/pkg/db/clickhouse/connector.go @@ -3,18 +3,16 @@ package clickhouse import ( "errors" "fmt" + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "log" "openreplay/backend/pkg/db/types" "openreplay/backend/pkg/hashid" "openreplay/backend/pkg/messages" "openreplay/backend/pkg/url" - "os" "strings" "time" - "github.com/ClickHouse/clickhouse-go/v2" - "github.com/ClickHouse/clickhouse-go/v2/lib/driver" - "openreplay/backend/pkg/license" ) @@ -52,28 +50,14 @@ type connectorImpl struct { finished chan struct{} } -// Check env variables. If not present, return default value. -func getEnv(key, fallback string) string { - if value, ok := os.LookupEnv(key); ok { - return value - } - return fallback -} - func NewConnector(url string) Connector { license.CheckLicense() - // Check username, password, database - userName := getEnv("CH_USERNAME", "default") - password := getEnv("CH_PASSWORD", "") - database := getEnv("CH_DATABASE", "default") url = strings.TrimPrefix(url, "tcp://") - url = strings.TrimSuffix(url, "/"+database) + url = strings.TrimSuffix(url, "/default") conn, err := clickhouse.Open(&clickhouse.Options{ Addr: []string{url}, Auth: clickhouse.Auth{ - Database: database, - Username: userName, - Password: password, + Database: "default", }, MaxOpenConns: 20, MaxIdleConns: 15, @@ -99,7 +83,7 @@ func NewConnector(url string) Connector { } func (c *connectorImpl) newBatch(name, query string) error { - batch, err := NewBulk(c.conn, query) + batch, err := NewBulk(c.conn, name, query) if err != nil { return fmt.Errorf("can't create new batch: %s", err) }