From 4b09213448604dead854742c44933521ec4634c3 Mon Sep 17 00:00:00 2001 From: Alexander Date: Mon, 10 Mar 2025 14:14:43 +0100 Subject: [PATCH] feat(images): added a proper observability --- backend/cmd/images/main.go | 7 +- backend/internal/images/service.go | 26 ++- backend/pkg/metrics/images/metrics.go | 320 ++++++++++++++------------ 3 files changed, 203 insertions(+), 150 deletions(-) diff --git a/backend/cmd/images/main.go b/backend/cmd/images/main.go index dda4a270e..10e5140fc 100644 --- a/backend/cmd/images/main.go +++ b/backend/cmd/images/main.go @@ -14,7 +14,7 @@ import ( "openreplay/backend/pkg/logger" "openreplay/backend/pkg/messages" "openreplay/backend/pkg/metrics" - storageMetrics "openreplay/backend/pkg/metrics/images" + imagesMetrics "openreplay/backend/pkg/metrics/images" "openreplay/backend/pkg/objectstorage/store" "openreplay/backend/pkg/queue" ) @@ -23,14 +23,15 @@ func main() { ctx := context.Background() log := logger.New() cfg := config.New(log) - metrics.New(log, storageMetrics.List()) + imageMetrics := imagesMetrics.New("images") + metrics.New(log, imageMetrics.List()) objStore, err := store.NewStore(&cfg.ObjectsConfig) if err != nil { log.Fatal(ctx, "can't init object storage: %s", err) } - srv, err := images.New(cfg, log, objStore) + srv, err := images.New(cfg, log, objStore, imageMetrics) if err != nil { log.Fatal(ctx, "can't init images service: %s", err) } diff --git a/backend/internal/images/service.go b/backend/internal/images/service.go index 0b914ea24..11e9c9986 100644 --- a/backend/internal/images/service.go +++ b/backend/internal/images/service.go @@ -15,6 +15,7 @@ import ( config "openreplay/backend/internal/config/images" "openreplay/backend/pkg/logger" + "openreplay/backend/pkg/metrics/images" "openreplay/backend/pkg/objectstorage" "openreplay/backend/pkg/pool" ) @@ -38,9 +39,10 @@ type ImageStorage struct { objStorage objectstorage.ObjectStorage saverPool pool.WorkerPool uploaderPool pool.WorkerPool + metrics images.Images } -func New(cfg *config.Config, log logger.Logger, objStorage objectstorage.ObjectStorage) (*ImageStorage, error) { +func New(cfg *config.Config, log logger.Logger, objStorage objectstorage.ObjectStorage, metrics images.Images) (*ImageStorage, error) { switch { case cfg == nil: return nil, fmt.Errorf("config is empty") @@ -48,11 +50,14 @@ func New(cfg *config.Config, log logger.Logger, objStorage objectstorage.ObjectS return nil, fmt.Errorf("logger is empty") case objStorage == nil: return nil, fmt.Errorf("objStorage is empty") + case metrics == nil: + return nil, fmt.Errorf("metrics is empty") } s := &ImageStorage{ cfg: cfg, log: log, objStorage: objStorage, + metrics: metrics, } s.saverPool = pool.NewPool(4, 8, s.writeToDisk) s.uploaderPool = pool.NewPool(8, 8, s.sendToS3) @@ -92,8 +97,11 @@ func (v *ImageStorage) Process(ctx context.Context, sessID uint64, data []byte) v.log.Error(ctx, "ExtractTarGz: unknown type: %d in %s", header.Typeflag, header.Name) } } + v.metrics.RecordOriginalArchiveExtractionDuration(time.Since(start).Seconds()) + v.metrics.RecordOriginalArchiveSize(float64(len(images))) + v.metrics.IncreaseTotalSavedArchives() - v.log.Info(ctx, "arch size: %d, extracted archive in: %s", len(data), time.Since(start)) + v.log.Debug(ctx, "arch size: %d, extracted archive in: %s", len(data), time.Since(start)) v.saverPool.Submit(&saveTask{ctx: ctx, sessionID: sessID, images: images}) return nil } @@ -115,6 +123,7 @@ func (v *ImageStorage) writeToDisk(payload interface{}) { // Write images to disk saved := 0 for name, img := range task.images { + start := time.Now() outFile, err := os.Create(path + name) // or open file in rewrite mode if err != nil { v.log.Error(task.ctx, "can't create file: %s", err.Error()) @@ -128,9 +137,11 @@ func (v *ImageStorage) writeToDisk(payload interface{}) { if err := outFile.Close(); err != nil { v.log.Warn(task.ctx, "can't close file: %s", err.Error()) } + v.metrics.RecordSavingImageDuration(time.Since(start).Seconds()) + v.metrics.IncreaseTotalSavedImages() saved++ } - v.log.Info(task.ctx, "saved %d images to disk", saved) + v.log.Debug(task.ctx, "saved %d images to disk", saved) return } @@ -151,8 +162,10 @@ func (v *ImageStorage) PackScreenshots(ctx context.Context, sessID uint64, files if err != nil { return fmt.Errorf("failed to execute command: %v, stderr: %v", err, stderr.String()) } - v.log.Info(ctx, "packed replay in %v", time.Since(start)) + v.metrics.RecordArchivingDuration(time.Since(start).Seconds()) + v.metrics.IncreaseTotalCreatedArchives() + v.log.Debug(ctx, "packed replay in %v", time.Since(start)) v.uploaderPool.Submit(&uploadTask{ctx: ctx, sessionID: sessionID, path: archPath, name: sessionID + "/replay.tar.zst"}) return nil } @@ -167,6 +180,9 @@ func (v *ImageStorage) sendToS3(payload interface{}) { if err := v.objStorage.Upload(bytes.NewReader(video), task.name, "application/octet-stream", objectstorage.NoContentEncoding, objectstorage.Zstd); err != nil { v.log.Fatal(task.ctx, "failed to upload replay file: %s", err) } - v.log.Info(task.ctx, "replay file (size: %d) uploaded successfully in %v", len(video), time.Since(start)) + v.metrics.RecordUploadingDuration(time.Since(start).Seconds()) + v.metrics.RecordArchiveSize(float64(len(video))) + + v.log.Debug(task.ctx, "replay file (size: %d) uploaded successfully in %v", len(video), time.Since(start)) return } diff --git a/backend/pkg/metrics/images/metrics.go b/backend/pkg/metrics/images/metrics.go index f29b4b134..255a4e8f8 100644 --- a/backend/pkg/metrics/images/metrics.go +++ b/backend/pkg/metrics/images/metrics.go @@ -5,151 +5,187 @@ import ( "openreplay/backend/pkg/metrics/common" ) -var storageSessionSize = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "storage", - Name: "session_size_bytes", - Help: "A histogram displaying the size of each session file in bytes prior to any manipulation.", - Buckets: common.DefaultSizeBuckets, - }, - []string{"file_type"}, -) - -func RecordSessionSize(fileSize float64, fileType string) { - storageSessionSize.WithLabelValues(fileType).Observe(fileSize) +type Images interface { + RecordOriginalArchiveSize(size float64) + RecordOriginalArchiveExtractionDuration(duration float64) + IncreaseTotalSavedArchives() + RecordSavingImageDuration(duration float64) + IncreaseTotalSavedImages() + IncreaseTotalCreatedArchives() + RecordArchivingDuration(duration float64) + RecordArchiveSize(size float64) + RecordUploadingDuration(duration float64) + List() []prometheus.Collector } -var storageTotalSessions = prometheus.NewCounter( - prometheus.CounterOpts{ - Namespace: "storage", - Name: "sessions_total", - Help: "A counter displaying the total number of all processed sessions.", - }, -) - -func IncreaseStorageTotalSessions() { - storageTotalSessions.Inc() +type imagesImpl struct { + originalArchiveSize prometheus.Histogram + originalArchiveExtractionDuration prometheus.Histogram + totalSavedArchives prometheus.Counter + savingImageDuration prometheus.Histogram + totalSavedImages prometheus.Counter + totalCreatedArchives prometheus.Counter + archivingDuration prometheus.Histogram + archiveSize prometheus.Histogram + uploadingDuration prometheus.Histogram } -var storageSkippedSessionSize = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "storage", - Name: "session_size_bytes", - Help: "A histogram displaying the size of each skipped session file in bytes.", - Buckets: common.DefaultSizeBuckets, - }, - []string{"file_type"}, -) - -func RecordSkippedSessionSize(fileSize float64, fileType string) { - storageSkippedSessionSize.WithLabelValues(fileType).Observe(fileSize) -} - -var storageTotalSkippedSessions = prometheus.NewCounter( - prometheus.CounterOpts{ - Namespace: "storage", - Name: "sessions_skipped_total", - Help: "A counter displaying the total number of all skipped sessions because of the size limits.", - }, -) - -func IncreaseStorageTotalSkippedSessions() { - storageTotalSkippedSessions.Inc() -} - -var storageSessionReadDuration = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "storage", - Name: "read_duration_seconds", - Help: "A histogram displaying the duration of reading for each session in seconds.", - Buckets: common.DefaultDurationBuckets, - }, - []string{"file_type"}, -) - -func RecordSessionReadDuration(durMillis float64, fileType string) { - storageSessionReadDuration.WithLabelValues(fileType).Observe(durMillis / 1000.0) -} - -var storageSessionSortDuration = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "storage", - Name: "sort_duration_seconds", - Help: "A histogram displaying the duration of sorting for each session in seconds.", - Buckets: common.DefaultDurationBuckets, - }, - []string{"file_type"}, -) - -func RecordSessionSortDuration(durMillis float64, fileType string) { - storageSessionSortDuration.WithLabelValues(fileType).Observe(durMillis / 1000.0) -} - -var storageSessionEncryptionDuration = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "storage", - Name: "encryption_duration_seconds", - Help: "A histogram displaying the duration of encoding for each session in seconds.", - Buckets: common.DefaultDurationBuckets, - }, - []string{"file_type"}, -) - -func RecordSessionEncryptionDuration(durMillis float64, fileType string) { - storageSessionEncryptionDuration.WithLabelValues(fileType).Observe(durMillis / 1000.0) -} - -var storageSessionCompressDuration = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "storage", - Name: "compress_duration_seconds", - Help: "A histogram displaying the duration of compressing for each session in seconds.", - Buckets: common.DefaultDurationBuckets, - }, - []string{"file_type"}, -) - -func RecordSessionCompressDuration(durMillis float64, fileType string) { - storageSessionCompressDuration.WithLabelValues(fileType).Observe(durMillis / 1000.0) -} - -var storageSessionUploadDuration = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "storage", - Name: "upload_duration_seconds", - Help: "A histogram displaying the duration of uploading to s3 for each session in seconds.", - Buckets: common.DefaultDurationBuckets, - }, - []string{"file_type"}, -) - -func RecordSessionUploadDuration(durMillis float64, fileType string) { - storageSessionUploadDuration.WithLabelValues(fileType).Observe(durMillis / 1000.0) -} - -var storageSessionCompressionRatio = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "storage", - Name: "compression_ratio", - Help: "A histogram displaying the compression ratio of mob files for each session.", - Buckets: common.DefaultDurationBuckets, - }, - []string{"file_type"}, -) - -func RecordSessionCompressionRatio(ratio float64, fileType string) { - storageSessionCompressionRatio.WithLabelValues(fileType).Observe(ratio) -} - -func List() []prometheus.Collector { - return []prometheus.Collector{ - storageSessionSize, - storageTotalSessions, - storageSessionReadDuration, - storageSessionSortDuration, - storageSessionEncryptionDuration, - storageSessionCompressDuration, - storageSessionUploadDuration, - storageSessionCompressionRatio, +func New(serviceName string) Images { + return &imagesImpl{ + originalArchiveSize: newOriginalArchiveSize(serviceName), + originalArchiveExtractionDuration: newOriginalArchiveExtractionDuration(serviceName), + totalSavedArchives: newTotalSavedArchives(serviceName), + savingImageDuration: newSavingImageDuration(serviceName), + totalSavedImages: newTotalSavedImages(serviceName), + totalCreatedArchives: newTotalCreatedArchives(serviceName), + archivingDuration: newArchivingDuration(serviceName), + archiveSize: newArchiveSize(serviceName), + uploadingDuration: newUploadingDuration(serviceName), } } + +func (i *imagesImpl) List() []prometheus.Collector { + return []prometheus.Collector{ + i.originalArchiveSize, + i.originalArchiveExtractionDuration, + i.totalSavedArchives, + i.savingImageDuration, + i.totalSavedImages, + i.totalCreatedArchives, + i.archivingDuration, + i.archiveSize, + i.uploadingDuration, + } +} + +func newOriginalArchiveSize(serviceName string) prometheus.Histogram { + return prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: serviceName, + Name: "original_archive_size_bytes", + Help: "A histogram displaying the original archive size in bytes.", + Buckets: common.DefaultSizeBuckets, + }, + ) +} + +func (i *imagesImpl) RecordOriginalArchiveSize(size float64) { + i.archiveSize.Observe(size) +} + +func newOriginalArchiveExtractionDuration(serviceName string) prometheus.Histogram { + return prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: serviceName, + Name: "original_archive_extraction_duration_seconds", + Help: "A histogram displaying the duration of extracting the original archive.", + Buckets: common.DefaultDurationBuckets, + }, + ) +} + +func (i *imagesImpl) RecordOriginalArchiveExtractionDuration(duration float64) { + i.originalArchiveExtractionDuration.Observe(duration) +} + +func newTotalSavedArchives(serviceName string) prometheus.Counter { + return prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: serviceName, + Name: "total_saved_archives", + Help: "A counter displaying the total number of saved original archives.", + }, + ) +} + +func (i *imagesImpl) IncreaseTotalSavedArchives() { + i.totalSavedArchives.Inc() +} + +func newSavingImageDuration(serviceName string) prometheus.Histogram { + return prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: serviceName, + Name: "saving_image_duration_seconds", + Help: "A histogram displaying the duration of saving each image in seconds.", + Buckets: common.DefaultDurationBuckets, + }, + ) +} + +func (i *imagesImpl) RecordSavingImageDuration(duration float64) { + i.savingImageDuration.Observe(duration) +} + +func newTotalSavedImages(serviceName string) prometheus.Counter { + return prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: serviceName, + Name: "total_saved_images", + Help: "A counter displaying the total number of saved images.", + }, + ) +} + +func (i *imagesImpl) IncreaseTotalSavedImages() { + i.totalSavedImages.Inc() +} + +func newTotalCreatedArchives(serviceName string) prometheus.Counter { + return prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: serviceName, + Name: "total_created_archives", + Help: "A counter displaying the total number of created archives.", + }, + ) +} + +func (i *imagesImpl) IncreaseTotalCreatedArchives() { + i.totalCreatedArchives.Inc() +} + +func newArchivingDuration(serviceName string) prometheus.Histogram { + return prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: serviceName, + Name: "archiving_duration_seconds", + Help: "A histogram displaying the duration of archiving each session in seconds.", + Buckets: common.DefaultDurationBuckets, + }, + ) +} + +func (i *imagesImpl) RecordArchivingDuration(duration float64) { + i.archivingDuration.Observe(duration) +} + +func newArchiveSize(serviceName string) prometheus.Histogram { + return prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: serviceName, + Name: "archive_size_bytes", + Help: "A histogram displaying the session's archive size in bytes.", + Buckets: common.DefaultSizeBuckets, + }, + ) +} + +func (i *imagesImpl) RecordArchiveSize(size float64) { + i.archiveSize.Observe(size) +} + +func newUploadingDuration(serviceName string) prometheus.Histogram { + return prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: serviceName, + Name: "uploading_duration_seconds", + Help: "A histogram displaying the duration of uploading each session's archive to S3 in seconds.", + Buckets: common.DefaultDurationBuckets, + }, + ) +} + +func (i *imagesImpl) RecordUploadingDuration(duration float64) { + i.uploadingDuration.Observe(duration) +}