From 78cf538b6b101e39e5c6a063a2f192345c68e1a3 Mon Sep 17 00:00:00 2001 From: Alexander Zavorotynskiy Date: Tue, 7 Jun 2022 10:12:42 +0200 Subject: [PATCH] feat(backend): added metrics to storage and sink services --- backend/cmd/http/main.go | 2 +- backend/cmd/sink/main.go | 21 ++++++++++++++ backend/cmd/storage/main.go | 5 +++- backend/internal/storage/storage.go | 45 ++++++++++++++++++++++++----- 4 files changed, 64 insertions(+), 9 deletions(-) diff --git a/backend/cmd/http/main.go b/backend/cmd/http/main.go index 0ead20855..97d49b93e 100644 --- a/backend/cmd/http/main.go +++ b/backend/cmd/http/main.go @@ -18,7 +18,7 @@ import ( ) func main() { - metrics := monitoring.New("ender") + metrics := monitoring.New("http") log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) pprof.StartProfilingServer() diff --git a/backend/cmd/sink/main.go b/backend/cmd/sink/main.go index a4356b65e..d6edbb22c 100644 --- a/backend/cmd/sink/main.go +++ b/backend/cmd/sink/main.go @@ -1,11 +1,13 @@ package main import ( + "context" "encoding/binary" "log" "openreplay/backend/internal/sink/assetscache" "openreplay/backend/internal/sink/oswriter" "openreplay/backend/internal/storage" + "openreplay/backend/pkg/monitoring" "time" "os" @@ -20,6 +22,8 @@ import ( ) func main() { + metrics := monitoring.New("sink") + log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) cfg := sink.New() @@ -36,6 +40,18 @@ func main() { assetMessageHandler := assetscache.New(cfg, rewriter, producer) counter := storage.NewLogCounter() + totalMessages, err := metrics.RegisterCounter("messages_total") + if err != nil { + log.Printf("can't create messages_total metric: %s", err) + } + savedMessages, err := metrics.RegisterCounter("messages_saved") + if err != nil { + log.Printf("can't create messages_saved metric: %s", err) + } + messageSize, err := metrics.RegisterHistogram("messages_size") + if err != nil { + log.Printf("can't create messages_size metric: %s", err) + } consumer := queue.NewMessageConsumer( cfg.GroupSink, @@ -47,6 +63,8 @@ func main() { // Process assets message = assetMessageHandler.ParseAssets(sessionID, message) + totalMessages.Add(context.Background(), 1) + // Filter message typeID := message.TypeID() if !IsReplayerType(typeID) { @@ -74,6 +92,9 @@ func main() { if err := writer.Write(sessionID, data); err != nil { log.Printf("Writer error: %v\n", err) } + + messageSize.Record(context.Background(), float64(len(data))) + savedMessages.Add(context.Background(), 1) }, false, ) diff --git a/backend/cmd/storage/main.go b/backend/cmd/storage/main.go index 7916cd2ad..541993df6 100644 --- a/backend/cmd/storage/main.go +++ b/backend/cmd/storage/main.go @@ -2,6 +2,7 @@ package main import ( "log" + "openreplay/backend/pkg/monitoring" "os" "os/signal" "strconv" @@ -17,12 +18,14 @@ import ( ) func main() { + metrics := monitoring.New("storage") + log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) cfg := config.New() s3 := s3storage.NewS3(cfg.S3Region, cfg.S3Bucket) - srv, err := storage.New(cfg, s3) + srv, err := storage.New(cfg, s3, metrics) if err != nil { log.Printf("can't init storage service: %s", err) return diff --git a/backend/internal/storage/storage.go b/backend/internal/storage/storage.go index b83ff1577..b48448117 100644 --- a/backend/internal/storage/storage.go +++ b/backend/internal/storage/storage.go @@ -2,10 +2,13 @@ package storage import ( "bytes" + "context" "fmt" + "go.opentelemetry.io/otel/metric/instrument/syncfloat64" "log" config "openreplay/backend/internal/config/storage" "openreplay/backend/pkg/flakeid" + "openreplay/backend/pkg/monitoring" "openreplay/backend/pkg/storage" "os" "strconv" @@ -13,26 +16,41 @@ import ( ) type Storage struct { - cfg *config.Config - s3 *storage.S3 - startBytes []byte + cfg *config.Config + s3 *storage.S3 + startBytes []byte + totalSessions syncfloat64.Counter + sessionSize syncfloat64.Histogram + archivingTime syncfloat64.Histogram } -func New(cfg *config.Config, s3 *storage.S3) (*Storage, error) { +func New(cfg *config.Config, s3 *storage.S3, metrics *monitoring.Metrics) (*Storage, error) { switch { case cfg == nil: return nil, fmt.Errorf("config is empty") case s3 == nil: return nil, fmt.Errorf("s3 storage is empty") } + // Create metrics + totalSessions, err := metrics.RegisterCounter("sessions_total") + if err != nil { + log.Printf("can't create sessions_total metric: %s", err) + } + sessionSize, err := metrics.RegisterHistogram("sessions_size") + if err != nil { + log.Printf("can't create session_size metric: %s", err) + } return &Storage{ - cfg: cfg, - s3: s3, - startBytes: make([]byte, cfg.FileSplitSize), + cfg: cfg, + s3: s3, + startBytes: make([]byte, cfg.FileSplitSize), + totalSessions: totalSessions, + sessionSize: sessionSize, }, nil } func (s *Storage) UploadKey(key string, retryCount int) { + start := time.Now() if retryCount <= 0 { return } @@ -77,4 +95,17 @@ func (s *Storage) UploadKey(key string, retryCount int) { log.Fatalf("Storage: end upload failed. %v\n", err) } } + + // Save metrics + var fileSize float64 = 0 + fileInfo, err := file.Stat() + if err != nil { + log.Printf("can't get file info: %s", err) + } else { + fileSize = float64(fileInfo.Size()) + } + ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*200) + s.archivingTime.Record(ctx, float64(time.Now().Sub(start).Milliseconds())) + s.sessionSize.Record(ctx, fileSize) + s.totalSessions.Add(ctx, 1) }