feat(images): added a proper observability

This commit is contained in:
Alexander 2025-03-10 14:14:43 +01:00
parent af4a344c85
commit 4b09213448
3 changed files with 203 additions and 150 deletions

View file

@ -14,7 +14,7 @@ import (
"openreplay/backend/pkg/logger" "openreplay/backend/pkg/logger"
"openreplay/backend/pkg/messages" "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/metrics" "openreplay/backend/pkg/metrics"
storageMetrics "openreplay/backend/pkg/metrics/images" imagesMetrics "openreplay/backend/pkg/metrics/images"
"openreplay/backend/pkg/objectstorage/store" "openreplay/backend/pkg/objectstorage/store"
"openreplay/backend/pkg/queue" "openreplay/backend/pkg/queue"
) )
@ -23,14 +23,15 @@ func main() {
ctx := context.Background() ctx := context.Background()
log := logger.New() log := logger.New()
cfg := config.New(log) cfg := config.New(log)
metrics.New(log, storageMetrics.List()) imageMetrics := imagesMetrics.New("images")
metrics.New(log, imageMetrics.List())
objStore, err := store.NewStore(&cfg.ObjectsConfig) objStore, err := store.NewStore(&cfg.ObjectsConfig)
if err != nil { if err != nil {
log.Fatal(ctx, "can't init object storage: %s", err) log.Fatal(ctx, "can't init object storage: %s", err)
} }
srv, err := images.New(cfg, log, objStore) srv, err := images.New(cfg, log, objStore, imageMetrics)
if err != nil { if err != nil {
log.Fatal(ctx, "can't init images service: %s", err) log.Fatal(ctx, "can't init images service: %s", err)
} }

View file

@ -15,6 +15,7 @@ import (
config "openreplay/backend/internal/config/images" config "openreplay/backend/internal/config/images"
"openreplay/backend/pkg/logger" "openreplay/backend/pkg/logger"
"openreplay/backend/pkg/metrics/images"
"openreplay/backend/pkg/objectstorage" "openreplay/backend/pkg/objectstorage"
"openreplay/backend/pkg/pool" "openreplay/backend/pkg/pool"
) )
@ -38,9 +39,10 @@ type ImageStorage struct {
objStorage objectstorage.ObjectStorage objStorage objectstorage.ObjectStorage
saverPool pool.WorkerPool saverPool pool.WorkerPool
uploaderPool pool.WorkerPool uploaderPool pool.WorkerPool
metrics images.Images
} }
func New(cfg *config.Config, log logger.Logger, objStorage objectstorage.ObjectStorage) (*ImageStorage, error) { func New(cfg *config.Config, log logger.Logger, objStorage objectstorage.ObjectStorage, metrics images.Images) (*ImageStorage, error) {
switch { switch {
case cfg == nil: case cfg == nil:
return nil, fmt.Errorf("config is empty") return nil, fmt.Errorf("config is empty")
@ -48,11 +50,14 @@ func New(cfg *config.Config, log logger.Logger, objStorage objectstorage.ObjectS
return nil, fmt.Errorf("logger is empty") return nil, fmt.Errorf("logger is empty")
case objStorage == nil: case objStorage == nil:
return nil, fmt.Errorf("objStorage is empty") return nil, fmt.Errorf("objStorage is empty")
case metrics == nil:
return nil, fmt.Errorf("metrics is empty")
} }
s := &ImageStorage{ s := &ImageStorage{
cfg: cfg, cfg: cfg,
log: log, log: log,
objStorage: objStorage, objStorage: objStorage,
metrics: metrics,
} }
s.saverPool = pool.NewPool(4, 8, s.writeToDisk) s.saverPool = pool.NewPool(4, 8, s.writeToDisk)
s.uploaderPool = pool.NewPool(8, 8, s.sendToS3) s.uploaderPool = pool.NewPool(8, 8, s.sendToS3)
@ -92,8 +97,11 @@ func (v *ImageStorage) Process(ctx context.Context, sessID uint64, data []byte)
v.log.Error(ctx, "ExtractTarGz: unknown type: %d in %s", header.Typeflag, header.Name) v.log.Error(ctx, "ExtractTarGz: unknown type: %d in %s", header.Typeflag, header.Name)
} }
} }
v.metrics.RecordOriginalArchiveExtractionDuration(time.Since(start).Seconds())
v.metrics.RecordOriginalArchiveSize(float64(len(images)))
v.metrics.IncreaseTotalSavedArchives()
v.log.Info(ctx, "arch size: %d, extracted archive in: %s", len(data), time.Since(start)) v.log.Debug(ctx, "arch size: %d, extracted archive in: %s", len(data), time.Since(start))
v.saverPool.Submit(&saveTask{ctx: ctx, sessionID: sessID, images: images}) v.saverPool.Submit(&saveTask{ctx: ctx, sessionID: sessID, images: images})
return nil return nil
} }
@ -115,6 +123,7 @@ func (v *ImageStorage) writeToDisk(payload interface{}) {
// Write images to disk // Write images to disk
saved := 0 saved := 0
for name, img := range task.images { for name, img := range task.images {
start := time.Now()
outFile, err := os.Create(path + name) // or open file in rewrite mode outFile, err := os.Create(path + name) // or open file in rewrite mode
if err != nil { if err != nil {
v.log.Error(task.ctx, "can't create file: %s", err.Error()) v.log.Error(task.ctx, "can't create file: %s", err.Error())
@ -128,9 +137,11 @@ func (v *ImageStorage) writeToDisk(payload interface{}) {
if err := outFile.Close(); err != nil { if err := outFile.Close(); err != nil {
v.log.Warn(task.ctx, "can't close file: %s", err.Error()) v.log.Warn(task.ctx, "can't close file: %s", err.Error())
} }
v.metrics.RecordSavingImageDuration(time.Since(start).Seconds())
v.metrics.IncreaseTotalSavedImages()
saved++ saved++
} }
v.log.Info(task.ctx, "saved %d images to disk", saved) v.log.Debug(task.ctx, "saved %d images to disk", saved)
return return
} }
@ -151,8 +162,10 @@ func (v *ImageStorage) PackScreenshots(ctx context.Context, sessID uint64, files
if err != nil { if err != nil {
return fmt.Errorf("failed to execute command: %v, stderr: %v", err, stderr.String()) return fmt.Errorf("failed to execute command: %v, stderr: %v", err, stderr.String())
} }
v.log.Info(ctx, "packed replay in %v", time.Since(start)) v.metrics.RecordArchivingDuration(time.Since(start).Seconds())
v.metrics.IncreaseTotalCreatedArchives()
v.log.Debug(ctx, "packed replay in %v", time.Since(start))
v.uploaderPool.Submit(&uploadTask{ctx: ctx, sessionID: sessionID, path: archPath, name: sessionID + "/replay.tar.zst"}) v.uploaderPool.Submit(&uploadTask{ctx: ctx, sessionID: sessionID, path: archPath, name: sessionID + "/replay.tar.zst"})
return nil return nil
} }
@ -167,6 +180,9 @@ func (v *ImageStorage) sendToS3(payload interface{}) {
if err := v.objStorage.Upload(bytes.NewReader(video), task.name, "application/octet-stream", objectstorage.NoContentEncoding, objectstorage.Zstd); err != nil { if err := v.objStorage.Upload(bytes.NewReader(video), task.name, "application/octet-stream", objectstorage.NoContentEncoding, objectstorage.Zstd); err != nil {
v.log.Fatal(task.ctx, "failed to upload replay file: %s", err) v.log.Fatal(task.ctx, "failed to upload replay file: %s", err)
} }
v.log.Info(task.ctx, "replay file (size: %d) uploaded successfully in %v", len(video), time.Since(start)) v.metrics.RecordUploadingDuration(time.Since(start).Seconds())
v.metrics.RecordArchiveSize(float64(len(video)))
v.log.Debug(task.ctx, "replay file (size: %d) uploaded successfully in %v", len(video), time.Since(start))
return return
} }

View file

@ -5,151 +5,187 @@ import (
"openreplay/backend/pkg/metrics/common" "openreplay/backend/pkg/metrics/common"
) )
var storageSessionSize = prometheus.NewHistogramVec( type Images interface {
prometheus.HistogramOpts{ RecordOriginalArchiveSize(size float64)
Namespace: "storage", RecordOriginalArchiveExtractionDuration(duration float64)
Name: "session_size_bytes", IncreaseTotalSavedArchives()
Help: "A histogram displaying the size of each session file in bytes prior to any manipulation.", RecordSavingImageDuration(duration float64)
Buckets: common.DefaultSizeBuckets, IncreaseTotalSavedImages()
}, IncreaseTotalCreatedArchives()
[]string{"file_type"}, RecordArchivingDuration(duration float64)
) RecordArchiveSize(size float64)
RecordUploadingDuration(duration float64)
func RecordSessionSize(fileSize float64, fileType string) { List() []prometheus.Collector
storageSessionSize.WithLabelValues(fileType).Observe(fileSize)
} }
var storageTotalSessions = prometheus.NewCounter( type imagesImpl struct {
prometheus.CounterOpts{ originalArchiveSize prometheus.Histogram
Namespace: "storage", originalArchiveExtractionDuration prometheus.Histogram
Name: "sessions_total", totalSavedArchives prometheus.Counter
Help: "A counter displaying the total number of all processed sessions.", savingImageDuration prometheus.Histogram
}, totalSavedImages prometheus.Counter
) totalCreatedArchives prometheus.Counter
archivingDuration prometheus.Histogram
func IncreaseStorageTotalSessions() { archiveSize prometheus.Histogram
storageTotalSessions.Inc() uploadingDuration prometheus.Histogram
} }
var storageSkippedSessionSize = prometheus.NewHistogramVec( func New(serviceName string) Images {
prometheus.HistogramOpts{ return &imagesImpl{
Namespace: "storage", originalArchiveSize: newOriginalArchiveSize(serviceName),
Name: "session_size_bytes", originalArchiveExtractionDuration: newOriginalArchiveExtractionDuration(serviceName),
Help: "A histogram displaying the size of each skipped session file in bytes.", totalSavedArchives: newTotalSavedArchives(serviceName),
Buckets: common.DefaultSizeBuckets, savingImageDuration: newSavingImageDuration(serviceName),
}, totalSavedImages: newTotalSavedImages(serviceName),
[]string{"file_type"}, totalCreatedArchives: newTotalCreatedArchives(serviceName),
) archivingDuration: newArchivingDuration(serviceName),
archiveSize: newArchiveSize(serviceName),
func RecordSkippedSessionSize(fileSize float64, fileType string) { uploadingDuration: newUploadingDuration(serviceName),
storageSkippedSessionSize.WithLabelValues(fileType).Observe(fileSize)
}
var storageTotalSkippedSessions = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "storage",
Name: "sessions_skipped_total",
Help: "A counter displaying the total number of all skipped sessions because of the size limits.",
},
)
func IncreaseStorageTotalSkippedSessions() {
storageTotalSkippedSessions.Inc()
}
var storageSessionReadDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "storage",
Name: "read_duration_seconds",
Help: "A histogram displaying the duration of reading for each session in seconds.",
Buckets: common.DefaultDurationBuckets,
},
[]string{"file_type"},
)
func RecordSessionReadDuration(durMillis float64, fileType string) {
storageSessionReadDuration.WithLabelValues(fileType).Observe(durMillis / 1000.0)
}
var storageSessionSortDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "storage",
Name: "sort_duration_seconds",
Help: "A histogram displaying the duration of sorting for each session in seconds.",
Buckets: common.DefaultDurationBuckets,
},
[]string{"file_type"},
)
func RecordSessionSortDuration(durMillis float64, fileType string) {
storageSessionSortDuration.WithLabelValues(fileType).Observe(durMillis / 1000.0)
}
var storageSessionEncryptionDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "storage",
Name: "encryption_duration_seconds",
Help: "A histogram displaying the duration of encoding for each session in seconds.",
Buckets: common.DefaultDurationBuckets,
},
[]string{"file_type"},
)
func RecordSessionEncryptionDuration(durMillis float64, fileType string) {
storageSessionEncryptionDuration.WithLabelValues(fileType).Observe(durMillis / 1000.0)
}
var storageSessionCompressDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "storage",
Name: "compress_duration_seconds",
Help: "A histogram displaying the duration of compressing for each session in seconds.",
Buckets: common.DefaultDurationBuckets,
},
[]string{"file_type"},
)
func RecordSessionCompressDuration(durMillis float64, fileType string) {
storageSessionCompressDuration.WithLabelValues(fileType).Observe(durMillis / 1000.0)
}
var storageSessionUploadDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "storage",
Name: "upload_duration_seconds",
Help: "A histogram displaying the duration of uploading to s3 for each session in seconds.",
Buckets: common.DefaultDurationBuckets,
},
[]string{"file_type"},
)
func RecordSessionUploadDuration(durMillis float64, fileType string) {
storageSessionUploadDuration.WithLabelValues(fileType).Observe(durMillis / 1000.0)
}
var storageSessionCompressionRatio = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "storage",
Name: "compression_ratio",
Help: "A histogram displaying the compression ratio of mob files for each session.",
Buckets: common.DefaultDurationBuckets,
},
[]string{"file_type"},
)
func RecordSessionCompressionRatio(ratio float64, fileType string) {
storageSessionCompressionRatio.WithLabelValues(fileType).Observe(ratio)
}
func List() []prometheus.Collector {
return []prometheus.Collector{
storageSessionSize,
storageTotalSessions,
storageSessionReadDuration,
storageSessionSortDuration,
storageSessionEncryptionDuration,
storageSessionCompressDuration,
storageSessionUploadDuration,
storageSessionCompressionRatio,
} }
} }
func (i *imagesImpl) List() []prometheus.Collector {
return []prometheus.Collector{
i.originalArchiveSize,
i.originalArchiveExtractionDuration,
i.totalSavedArchives,
i.savingImageDuration,
i.totalSavedImages,
i.totalCreatedArchives,
i.archivingDuration,
i.archiveSize,
i.uploadingDuration,
}
}
func newOriginalArchiveSize(serviceName string) prometheus.Histogram {
return prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: serviceName,
Name: "original_archive_size_bytes",
Help: "A histogram displaying the original archive size in bytes.",
Buckets: common.DefaultSizeBuckets,
},
)
}
func (i *imagesImpl) RecordOriginalArchiveSize(size float64) {
i.archiveSize.Observe(size)
}
func newOriginalArchiveExtractionDuration(serviceName string) prometheus.Histogram {
return prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: serviceName,
Name: "original_archive_extraction_duration_seconds",
Help: "A histogram displaying the duration of extracting the original archive.",
Buckets: common.DefaultDurationBuckets,
},
)
}
func (i *imagesImpl) RecordOriginalArchiveExtractionDuration(duration float64) {
i.originalArchiveExtractionDuration.Observe(duration)
}
func newTotalSavedArchives(serviceName string) prometheus.Counter {
return prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: serviceName,
Name: "total_saved_archives",
Help: "A counter displaying the total number of saved original archives.",
},
)
}
func (i *imagesImpl) IncreaseTotalSavedArchives() {
i.totalSavedArchives.Inc()
}
func newSavingImageDuration(serviceName string) prometheus.Histogram {
return prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: serviceName,
Name: "saving_image_duration_seconds",
Help: "A histogram displaying the duration of saving each image in seconds.",
Buckets: common.DefaultDurationBuckets,
},
)
}
func (i *imagesImpl) RecordSavingImageDuration(duration float64) {
i.savingImageDuration.Observe(duration)
}
func newTotalSavedImages(serviceName string) prometheus.Counter {
return prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: serviceName,
Name: "total_saved_images",
Help: "A counter displaying the total number of saved images.",
},
)
}
func (i *imagesImpl) IncreaseTotalSavedImages() {
i.totalSavedImages.Inc()
}
func newTotalCreatedArchives(serviceName string) prometheus.Counter {
return prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: serviceName,
Name: "total_created_archives",
Help: "A counter displaying the total number of created archives.",
},
)
}
func (i *imagesImpl) IncreaseTotalCreatedArchives() {
i.totalCreatedArchives.Inc()
}
func newArchivingDuration(serviceName string) prometheus.Histogram {
return prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: serviceName,
Name: "archiving_duration_seconds",
Help: "A histogram displaying the duration of archiving each session in seconds.",
Buckets: common.DefaultDurationBuckets,
},
)
}
func (i *imagesImpl) RecordArchivingDuration(duration float64) {
i.archivingDuration.Observe(duration)
}
func newArchiveSize(serviceName string) prometheus.Histogram {
return prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: serviceName,
Name: "archive_size_bytes",
Help: "A histogram displaying the session's archive size in bytes.",
Buckets: common.DefaultSizeBuckets,
},
)
}
func (i *imagesImpl) RecordArchiveSize(size float64) {
i.archiveSize.Observe(size)
}
func newUploadingDuration(serviceName string) prometheus.Histogram {
return prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: serviceName,
Name: "uploading_duration_seconds",
Help: "A histogram displaying the duration of uploading each session's archive to S3 in seconds.",
Buckets: common.DefaultDurationBuckets,
},
)
}
func (i *imagesImpl) RecordUploadingDuration(duration float64) {
i.uploadingDuration.Observe(duration)
}