feat(canvases): added a proper canvas observability
This commit is contained in:
parent
06937b305a
commit
5f3b3bb2ef
3 changed files with 232 additions and 14 deletions
|
|
@ -13,7 +13,7 @@ import (
|
||||||
"openreplay/backend/pkg/logger"
|
"openreplay/backend/pkg/logger"
|
||||||
"openreplay/backend/pkg/messages"
|
"openreplay/backend/pkg/messages"
|
||||||
"openreplay/backend/pkg/metrics"
|
"openreplay/backend/pkg/metrics"
|
||||||
storageMetrics "openreplay/backend/pkg/metrics/images"
|
canvasesMetrics "openreplay/backend/pkg/metrics/canvas"
|
||||||
"openreplay/backend/pkg/objectstorage/store"
|
"openreplay/backend/pkg/objectstorage/store"
|
||||||
"openreplay/backend/pkg/queue"
|
"openreplay/backend/pkg/queue"
|
||||||
)
|
)
|
||||||
|
|
@ -22,7 +22,8 @@ func main() {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
log := logger.New()
|
log := logger.New()
|
||||||
cfg := config.New(log)
|
cfg := config.New(log)
|
||||||
metrics.New(log, storageMetrics.List())
|
canvasMetrics := canvasesMetrics.New("canvases")
|
||||||
|
metrics.New(log, canvasMetrics.List())
|
||||||
|
|
||||||
objStore, err := store.NewStore(&cfg.ObjectsConfig)
|
objStore, err := store.NewStore(&cfg.ObjectsConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -32,7 +33,7 @@ func main() {
|
||||||
producer := queue.NewProducer(cfg.MessageSizeLimit, true)
|
producer := queue.NewProducer(cfg.MessageSizeLimit, true)
|
||||||
defer producer.Close(15000)
|
defer producer.Close(15000)
|
||||||
|
|
||||||
srv, err := canvases.New(cfg, log, objStore, producer)
|
srv, err := canvases.New(cfg, log, objStore, producer, canvasMetrics)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(ctx, "can't init canvases service: %s", err)
|
log.Fatal(ctx, "can't init canvases service: %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,7 @@ import (
|
||||||
config "openreplay/backend/internal/config/canvases"
|
config "openreplay/backend/internal/config/canvases"
|
||||||
"openreplay/backend/pkg/logger"
|
"openreplay/backend/pkg/logger"
|
||||||
"openreplay/backend/pkg/messages"
|
"openreplay/backend/pkg/messages"
|
||||||
|
"openreplay/backend/pkg/metrics/canvas"
|
||||||
"openreplay/backend/pkg/objectstorage"
|
"openreplay/backend/pkg/objectstorage"
|
||||||
"openreplay/backend/pkg/pool"
|
"openreplay/backend/pkg/pool"
|
||||||
"openreplay/backend/pkg/queue/types"
|
"openreplay/backend/pkg/queue/types"
|
||||||
|
|
@ -29,6 +30,7 @@ type ImageStorage struct {
|
||||||
uploaderPool pool.WorkerPool
|
uploaderPool pool.WorkerPool
|
||||||
objStorage objectstorage.ObjectStorage
|
objStorage objectstorage.ObjectStorage
|
||||||
producer types.Producer
|
producer types.Producer
|
||||||
|
metrics canvas.Canvas
|
||||||
}
|
}
|
||||||
|
|
||||||
type saveTask struct {
|
type saveTask struct {
|
||||||
|
|
@ -51,10 +53,18 @@ type uploadTask struct {
|
||||||
name string
|
name string
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(cfg *config.Config, log logger.Logger, objStorage objectstorage.ObjectStorage, producer types.Producer) (*ImageStorage, error) {
|
func New(cfg *config.Config, log logger.Logger, objStorage objectstorage.ObjectStorage, producer types.Producer, metrics canvas.Canvas) (*ImageStorage, error) {
|
||||||
switch {
|
switch {
|
||||||
case cfg == nil:
|
case cfg == nil:
|
||||||
return nil, fmt.Errorf("config is empty")
|
return nil, fmt.Errorf("config is empty")
|
||||||
|
case log == nil:
|
||||||
|
return nil, fmt.Errorf("logger is empty")
|
||||||
|
case objStorage == nil:
|
||||||
|
return nil, fmt.Errorf("objectStorage is empty")
|
||||||
|
case producer == nil:
|
||||||
|
return nil, fmt.Errorf("producer is empty")
|
||||||
|
case metrics == nil:
|
||||||
|
return nil, fmt.Errorf("metrics is empty")
|
||||||
}
|
}
|
||||||
path := cfg.FSDir + "/"
|
path := cfg.FSDir + "/"
|
||||||
if cfg.CanvasDir != "" {
|
if cfg.CanvasDir != "" {
|
||||||
|
|
@ -66,6 +76,7 @@ func New(cfg *config.Config, log logger.Logger, objStorage objectstorage.ObjectS
|
||||||
basePath: path,
|
basePath: path,
|
||||||
objStorage: objStorage,
|
objStorage: objStorage,
|
||||||
producer: producer,
|
producer: producer,
|
||||||
|
metrics: metrics,
|
||||||
}
|
}
|
||||||
s.saverPool = pool.NewPool(2, 2, s.writeToDisk)
|
s.saverPool = pool.NewPool(2, 2, s.writeToDisk)
|
||||||
s.packerPool = pool.NewPool(8, 16, s.packCanvas)
|
s.packerPool = pool.NewPool(8, 16, s.packCanvas)
|
||||||
|
|
@ -108,7 +119,13 @@ func (v *ImageStorage) writeToDisk(payload interface{}) {
|
||||||
if _, err := io.Copy(outFile, task.image); err != nil {
|
if _, err := io.Copy(outFile, task.image); err != nil {
|
||||||
v.log.Fatal(task.ctx, "can't copy data to image: %s", err)
|
v.log.Fatal(task.ctx, "can't copy data to image: %s", err)
|
||||||
}
|
}
|
||||||
outFile.Close()
|
if outFile != nil {
|
||||||
|
if err := outFile.Close(); err != nil {
|
||||||
|
v.log.Warn(task.ctx, "can't close out file: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
v.metrics.RecordCanvasImageSize(float64(task.image.Len()))
|
||||||
|
v.metrics.IncreaseTotalSavedImages()
|
||||||
|
|
||||||
v.log.Debug(task.ctx, "canvas image saved, name: %s, size: %3.3f mb", task.name, float64(task.image.Len())/1024.0/1024.0)
|
v.log.Debug(task.ctx, "canvas image saved, name: %s, size: %3.3f mb", task.name, float64(task.image.Len())/1024.0/1024.0)
|
||||||
return
|
return
|
||||||
|
|
@ -127,13 +144,11 @@ func (v *ImageStorage) PrepareSessionCanvases(ctx context.Context, sessID uint64
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
names := make(map[string]bool)
|
|
||||||
|
|
||||||
// Build the list of canvas images sets
|
// Build the list of canvas images sets
|
||||||
|
names := make(map[string]int)
|
||||||
for _, file := range files {
|
for _, file := range files {
|
||||||
// Skip already created archives
|
|
||||||
if strings.HasSuffix(file.Name(), ".tar.zst") {
|
if strings.HasSuffix(file.Name(), ".tar.zst") {
|
||||||
continue
|
continue // Skip already created archives
|
||||||
}
|
}
|
||||||
name := strings.Split(file.Name(), ".")
|
name := strings.Split(file.Name(), ".")
|
||||||
parts := strings.Split(name[0], "_")
|
parts := strings.Split(name[0], "_")
|
||||||
|
|
@ -142,10 +157,10 @@ func (v *ImageStorage) PrepareSessionCanvases(ctx context.Context, sessID uint64
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
canvasID := fmt.Sprintf("%s_%s", parts[0], parts[1])
|
canvasID := fmt.Sprintf("%s_%s", parts[0], parts[1])
|
||||||
names[canvasID] = true
|
names[canvasID]++
|
||||||
}
|
}
|
||||||
|
|
||||||
for name := range names {
|
for name, number := range names {
|
||||||
msg := &messages.CustomEvent{
|
msg := &messages.CustomEvent{
|
||||||
Name: name,
|
Name: name,
|
||||||
Payload: path,
|
Payload: path,
|
||||||
|
|
@ -153,8 +168,12 @@ func (v *ImageStorage) PrepareSessionCanvases(ctx context.Context, sessID uint64
|
||||||
if err := v.producer.Produce(v.cfg.TopicCanvasTrigger, sessID, msg.Encode()); err != nil {
|
if err := v.producer.Produce(v.cfg.TopicCanvasTrigger, sessID, msg.Encode()); err != nil {
|
||||||
v.log.Error(ctx, "can't send canvas trigger: %s", err)
|
v.log.Error(ctx, "can't send canvas trigger: %s", err)
|
||||||
}
|
}
|
||||||
|
v.metrics.RecordImagesPerCanvas(float64(number))
|
||||||
}
|
}
|
||||||
v.log.Info(ctx, "session canvases (%d) prepared in %.3fs, session: %d", len(names), time.Since(start).Seconds(), sessID)
|
v.metrics.RecordCanvasesPerSession(float64(len(names)))
|
||||||
|
v.metrics.RecordPreparingDuration(time.Since(start).Seconds())
|
||||||
|
|
||||||
|
v.log.Debug(ctx, "session canvases (%d) prepared in %.3fs, session: %d", len(names), time.Since(start).Seconds(), sessID)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -181,7 +200,10 @@ func (v *ImageStorage) packCanvas(payload interface{}) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
v.log.Fatal(task.ctx, "failed to execute command, err: %s, stderr: %v", err, stderr.String())
|
v.log.Fatal(task.ctx, "failed to execute command, err: %s, stderr: %v", err, stderr.String())
|
||||||
}
|
}
|
||||||
v.log.Info(task.ctx, "canvas packed successfully in %.3fs, session: %d", time.Since(start).Seconds(), task.sessionID)
|
v.metrics.RecordArchivingDuration(time.Since(start).Seconds())
|
||||||
|
v.metrics.IncreaseTotalCreatedArchives()
|
||||||
|
|
||||||
|
v.log.Debug(task.ctx, "canvas packed successfully in %.3fs, session: %d", time.Since(start).Seconds(), task.sessionID)
|
||||||
v.uploaderPool.Submit(&uploadTask{ctx: task.ctx, path: archPath, name: sessionID + "/" + task.name + ".tar.zst"})
|
v.uploaderPool.Submit(&uploadTask{ctx: task.ctx, path: archPath, name: sessionID + "/" + task.name + ".tar.zst"})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -195,5 +217,8 @@ func (v *ImageStorage) sendToS3(payload interface{}) {
|
||||||
if err := v.objStorage.Upload(bytes.NewReader(video), task.name, "application/octet-stream", objectstorage.NoContentEncoding, objectstorage.Zstd); err != nil {
|
if err := v.objStorage.Upload(bytes.NewReader(video), task.name, "application/octet-stream", objectstorage.NoContentEncoding, objectstorage.Zstd); err != nil {
|
||||||
v.log.Fatal(task.ctx, "failed to upload canvas to storage: %s", err)
|
v.log.Fatal(task.ctx, "failed to upload canvas to storage: %s", err)
|
||||||
}
|
}
|
||||||
v.log.Info(task.ctx, "replay file (size: %d) uploaded successfully in %.3fs", len(video), time.Since(start).Seconds())
|
v.metrics.RecordUploadingDuration(time.Since(start).Seconds())
|
||||||
|
v.metrics.RecordArchiveSize(float64(len(video)))
|
||||||
|
|
||||||
|
v.log.Debug(task.ctx, "replay file (size: %d) uploaded successfully in %.3fs", len(video), time.Since(start).Seconds())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
192
backend/pkg/metrics/canvas/metrics.go
Normal file
192
backend/pkg/metrics/canvas/metrics.go
Normal file
|
|
@ -0,0 +1,192 @@
|
||||||
|
package canvas
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"openreplay/backend/pkg/metrics/common"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Canvas interface {
|
||||||
|
RecordCanvasImageSize(size float64)
|
||||||
|
IncreaseTotalSavedImages()
|
||||||
|
RecordImagesPerCanvas(number float64)
|
||||||
|
RecordCanvasesPerSession(number float64)
|
||||||
|
RecordPreparingDuration(duration float64)
|
||||||
|
IncreaseTotalCreatedArchives()
|
||||||
|
RecordArchivingDuration(duration float64)
|
||||||
|
RecordArchiveSize(size float64)
|
||||||
|
RecordUploadingDuration(duration float64)
|
||||||
|
List() []prometheus.Collector
|
||||||
|
}
|
||||||
|
|
||||||
|
type canvasImpl struct {
|
||||||
|
canvasesImageSize prometheus.Histogram
|
||||||
|
canvasesTotalSavedImages prometheus.Counter
|
||||||
|
canvasesImagesPerCanvas prometheus.Histogram
|
||||||
|
canvasesCanvasesPerSession prometheus.Histogram
|
||||||
|
canvasesPreparingDuration prometheus.Histogram
|
||||||
|
canvasesTotalCreatedArchives prometheus.Counter
|
||||||
|
canvasesArchivingDuration prometheus.Histogram
|
||||||
|
canvasesArchiveSize prometheus.Histogram
|
||||||
|
canvasesUploadingDuration prometheus.Histogram
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(serviceName string) Canvas {
|
||||||
|
return &canvasImpl{
|
||||||
|
canvasesImageSize: newImageSizeMetric(serviceName),
|
||||||
|
canvasesTotalSavedImages: newTotalSavedImages(serviceName),
|
||||||
|
canvasesImagesPerCanvas: newImagesPerCanvas(serviceName),
|
||||||
|
canvasesCanvasesPerSession: newCanvasesPerSession(serviceName),
|
||||||
|
canvasesPreparingDuration: newPreparingDuration(serviceName),
|
||||||
|
canvasesTotalCreatedArchives: newTotalCreatedArchives(serviceName),
|
||||||
|
canvasesArchivingDuration: newArchivingDuration(serviceName),
|
||||||
|
canvasesArchiveSize: newArchiveSize(serviceName),
|
||||||
|
canvasesUploadingDuration: newUploadingDuration(serviceName),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *canvasImpl) List() []prometheus.Collector {
|
||||||
|
return []prometheus.Collector{
|
||||||
|
c.canvasesImageSize,
|
||||||
|
c.canvasesTotalSavedImages,
|
||||||
|
c.canvasesImagesPerCanvas,
|
||||||
|
c.canvasesCanvasesPerSession,
|
||||||
|
c.canvasesPreparingDuration,
|
||||||
|
c.canvasesTotalCreatedArchives,
|
||||||
|
c.canvasesArchivingDuration,
|
||||||
|
c.canvasesArchiveSize,
|
||||||
|
c.canvasesUploadingDuration,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newImageSizeMetric(serviceName string) prometheus.Histogram {
|
||||||
|
return prometheus.NewHistogram(
|
||||||
|
prometheus.HistogramOpts{
|
||||||
|
Namespace: serviceName,
|
||||||
|
Name: "image_size_bytes",
|
||||||
|
Help: "A histogram displaying the size of each canvas image in bytes.",
|
||||||
|
Buckets: common.DefaultSizeBuckets,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *canvasImpl) RecordCanvasImageSize(size float64) {
|
||||||
|
c.canvasesImageSize.Observe(size)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTotalSavedImages(serviceName string) prometheus.Counter {
|
||||||
|
return prometheus.NewCounter(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Namespace: serviceName,
|
||||||
|
Name: "total_saved_images",
|
||||||
|
Help: "A counter displaying the total number of saved images.",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *canvasImpl) IncreaseTotalSavedImages() {
|
||||||
|
c.canvasesTotalSavedImages.Inc()
|
||||||
|
}
|
||||||
|
|
||||||
|
func newImagesPerCanvas(serviceName string) prometheus.Histogram {
|
||||||
|
return prometheus.NewHistogram(
|
||||||
|
prometheus.HistogramOpts{
|
||||||
|
Namespace: serviceName,
|
||||||
|
Name: "images_per_canvas",
|
||||||
|
Help: "A histogram displaying the number of images per canvas.",
|
||||||
|
Buckets: common.DefaultBuckets,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *canvasImpl) RecordImagesPerCanvas(number float64) {
|
||||||
|
c.canvasesImagesPerCanvas.Observe(number)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newCanvasesPerSession(serviceName string) prometheus.Histogram {
|
||||||
|
return prometheus.NewHistogram(
|
||||||
|
prometheus.HistogramOpts{
|
||||||
|
Namespace: serviceName,
|
||||||
|
Name: "canvases_per_session",
|
||||||
|
Help: "A histogram displaying the number of canvases per session.",
|
||||||
|
Buckets: common.DefaultBuckets,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *canvasImpl) RecordCanvasesPerSession(number float64) {
|
||||||
|
c.canvasesCanvasesPerSession.Observe(number)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPreparingDuration(serviceName string) prometheus.Histogram {
|
||||||
|
return prometheus.NewHistogram(
|
||||||
|
prometheus.HistogramOpts{
|
||||||
|
Namespace: serviceName,
|
||||||
|
Name: "preparing_duration_seconds",
|
||||||
|
Help: "A histogram displaying the duration of preparing the list of canvases for each session in seconds.",
|
||||||
|
Buckets: common.DefaultDurationBuckets,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *canvasImpl) RecordPreparingDuration(duration float64) {
|
||||||
|
c.canvasesPreparingDuration.Observe(duration)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTotalCreatedArchives(serviceName string) prometheus.Counter {
|
||||||
|
return prometheus.NewCounter(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Namespace: serviceName,
|
||||||
|
Name: "total_created_archives",
|
||||||
|
Help: "A counter displaying the total number of created canvas archives.",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *canvasImpl) IncreaseTotalCreatedArchives() {
|
||||||
|
c.canvasesTotalCreatedArchives.Inc()
|
||||||
|
}
|
||||||
|
|
||||||
|
func newArchivingDuration(serviceName string) prometheus.Histogram {
|
||||||
|
return prometheus.NewHistogram(
|
||||||
|
prometheus.HistogramOpts{
|
||||||
|
Namespace: serviceName,
|
||||||
|
Name: "archiving_duration_seconds",
|
||||||
|
Help: "A histogram displaying the duration of archiving for each canvas in seconds.",
|
||||||
|
Buckets: common.DefaultDurationBuckets,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *canvasImpl) RecordArchivingDuration(duration float64) {
|
||||||
|
c.canvasesArchivingDuration.Observe(duration)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newArchiveSize(serviceName string) prometheus.Histogram {
|
||||||
|
return prometheus.NewHistogram(
|
||||||
|
prometheus.HistogramOpts{
|
||||||
|
Namespace: serviceName,
|
||||||
|
Name: "archive_size_bytes",
|
||||||
|
Help: "A histogram displaying the size of each canvas archive in bytes.",
|
||||||
|
Buckets: common.DefaultSizeBuckets,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *canvasImpl) RecordArchiveSize(size float64) {
|
||||||
|
c.canvasesArchiveSize.Observe(size)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newUploadingDuration(serviceName string) prometheus.Histogram {
|
||||||
|
return prometheus.NewHistogram(
|
||||||
|
prometheus.HistogramOpts{
|
||||||
|
Namespace: serviceName,
|
||||||
|
Name: "uploading_duration_seconds",
|
||||||
|
Help: "A histogram displaying the duration of uploading for each canvas in seconds.",
|
||||||
|
Buckets: common.DefaultDurationBuckets,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *canvasImpl) RecordUploadingDuration(duration float64) {
|
||||||
|
c.canvasesUploadingDuration.Observe(duration)
|
||||||
|
}
|
||||||
Loading…
Add table
Reference in a new issue