feat(backend): fixed storage metrics
This commit is contained in:
parent
d3ea2ce3d6
commit
1be50a4b5b
1 changed files with 56 additions and 18 deletions
|
|
@ -2,6 +2,7 @@ package storage
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
gzip "github.com/klauspost/pgzip"
|
||||
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
|
||||
|
|
@ -14,6 +15,7 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type FileType string
|
||||
|
|
@ -35,12 +37,13 @@ type Storage struct {
|
|||
s3 *storage.S3
|
||||
startBytes []byte
|
||||
|
||||
totalSessions syncfloat64.Counter
|
||||
sessionDOMSize syncfloat64.Histogram
|
||||
sessionDevtoolsSize syncfloat64.Histogram
|
||||
readingDOMTime syncfloat64.Histogram
|
||||
readingTime syncfloat64.Histogram
|
||||
archivingTime syncfloat64.Histogram
|
||||
totalSessions syncfloat64.Counter
|
||||
sessionDOMSize syncfloat64.Histogram
|
||||
sessionDEVSize syncfloat64.Histogram
|
||||
readingDOMTime syncfloat64.Histogram
|
||||
readingDEVTime syncfloat64.Histogram
|
||||
archivingDOMTime syncfloat64.Histogram
|
||||
archivingDEVTime syncfloat64.Histogram
|
||||
|
||||
tasks chan *Task
|
||||
ready chan struct{}
|
||||
|
|
@ -66,25 +69,35 @@ func New(cfg *config.Config, s3 *storage.S3, metrics *monitoring.Metrics) (*Stor
|
|||
if err != nil {
|
||||
log.Printf("can't create sessions_dt_size metric: %s", err)
|
||||
}
|
||||
readingTime, err := metrics.RegisterHistogram("reading_duration")
|
||||
readingDOMTime, err := metrics.RegisterHistogram("reading_duration")
|
||||
if err != nil {
|
||||
log.Printf("can't create reading_duration metric: %s", err)
|
||||
}
|
||||
archivingTime, err := metrics.RegisterHistogram("archiving_duration")
|
||||
readingDEVTime, err := metrics.RegisterHistogram("reading_dt_duration")
|
||||
if err != nil {
|
||||
log.Printf("can't create reading_duration metric: %s", err)
|
||||
}
|
||||
archivingDOMTime, err := metrics.RegisterHistogram("archiving_duration")
|
||||
if err != nil {
|
||||
log.Printf("can't create archiving_duration metric: %s", err)
|
||||
}
|
||||
archivingDEVTime, err := metrics.RegisterHistogram("archiving_dt_duration")
|
||||
if err != nil {
|
||||
log.Printf("can't create archiving_duration metric: %s", err)
|
||||
}
|
||||
newStorage := &Storage{
|
||||
cfg: cfg,
|
||||
s3: s3,
|
||||
startBytes: make([]byte, cfg.FileSplitSize),
|
||||
totalSessions: totalSessions,
|
||||
sessionDOMSize: sessionDOMSize,
|
||||
sessionDevtoolsSize: sessionDevtoolsSize,
|
||||
readingTime: readingTime,
|
||||
archivingTime: archivingTime,
|
||||
tasks: make(chan *Task, 1),
|
||||
ready: make(chan struct{}),
|
||||
cfg: cfg,
|
||||
s3: s3,
|
||||
startBytes: make([]byte, cfg.FileSplitSize),
|
||||
totalSessions: totalSessions,
|
||||
sessionDOMSize: sessionDOMSize,
|
||||
sessionDEVSize: sessionDevtoolsSize,
|
||||
readingDOMTime: readingDOMTime,
|
||||
readingDEVTime: readingDEVTime,
|
||||
archivingDOMTime: archivingDOMTime,
|
||||
archivingDEVTime: archivingDEVTime,
|
||||
tasks: make(chan *Task, 1),
|
||||
ready: make(chan struct{}),
|
||||
}
|
||||
go newStorage.worker()
|
||||
return newStorage, nil
|
||||
|
|
@ -146,28 +159,50 @@ func (s *Storage) prepareSession(path string, tp FileType, task *Task) error {
|
|||
if tp == DEV {
|
||||
path += "devtools"
|
||||
}
|
||||
startRead := time.Now()
|
||||
mob, err := s.openSession(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
durRead := time.Now().Sub(startRead).Milliseconds()
|
||||
// Send metrics
|
||||
ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*200)
|
||||
if tp == DOM {
|
||||
s.sessionDOMSize.Record(ctx, float64(len(mob)))
|
||||
s.readingDOMTime.Record(ctx, float64(durRead))
|
||||
} else {
|
||||
s.sessionDEVSize.Record(ctx, float64(len(mob)))
|
||||
s.readingDEVTime.Record(ctx, float64(durRead))
|
||||
}
|
||||
// Encode and compress session
|
||||
if tp == DEV {
|
||||
startCompress := time.Now()
|
||||
task.dev = s.compressSession(mob)
|
||||
s.archivingDEVTime.Record(ctx, float64(time.Now().Sub(startCompress).Milliseconds()))
|
||||
} else {
|
||||
if len(mob) <= s.cfg.FileSplitSize {
|
||||
startCompress := time.Now()
|
||||
task.doms = s.compressSession(mob)
|
||||
s.archivingDOMTime.Record(ctx, float64(time.Now().Sub(startCompress).Milliseconds()))
|
||||
return nil
|
||||
}
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
var firstPart, secondPart int64
|
||||
go func() {
|
||||
start := time.Now()
|
||||
task.doms = s.compressSession(mob[:s.cfg.FileSplitSize])
|
||||
firstPart = time.Now().Sub(start).Milliseconds()
|
||||
wg.Done()
|
||||
}()
|
||||
go func() {
|
||||
start := time.Now()
|
||||
task.dome = s.compressSession(mob[s.cfg.FileSplitSize:])
|
||||
secondPart = time.Now().Sub(start).Milliseconds()
|
||||
wg.Done()
|
||||
}()
|
||||
wg.Wait()
|
||||
s.archivingDOMTime.Record(ctx, float64(firstPart+secondPart))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -227,6 +262,9 @@ func (s *Storage) uploadSession(task *Task) {
|
|||
wg.Done()
|
||||
}()
|
||||
wg.Wait()
|
||||
// Record metrics
|
||||
ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*200)
|
||||
s.totalSessions.Add(ctx, 1)
|
||||
}
|
||||
|
||||
func (s *Storage) worker() {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue