From 528d1af1735d2d564cd12172ec1ba4cb883d0030 Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 4 May 2023 17:46:43 +0200 Subject: [PATCH] Compression worker (#1233) * feat(backend): added extra worker for session compression * feat(backend): debug logs * feat(backend): added compression ratio metric * feat(backend): reduced number of duplicate logs * feat(backend): rewrite workers managment --- backend/internal/storage/storage.go | 135 ++++++++++++++++------- backend/pkg/messages/session-iterator.go | 8 +- backend/pkg/metrics/storage/metrics.go | 15 +++ 3 files changed, 116 insertions(+), 42 deletions(-) diff --git a/backend/internal/storage/storage.go b/backend/internal/storage/storage.go index d62b06b76..74372d5a9 100644 --- a/backend/internal/storage/storage.go +++ b/backend/internal/storage/storage.go @@ -35,13 +35,17 @@ func (t FileType) String() string { } type Task struct { - id string - key string - domRaw []byte - devRaw []byte - doms *bytes.Buffer - dome *bytes.Buffer - dev *bytes.Buffer + id string + key string + domRaw []byte + devRaw []byte + domsRawSize float64 + domeRawSize float64 + devRawSize float64 + doms *bytes.Buffer + dome *bytes.Buffer + dev *bytes.Buffer + isBreakTask bool } func (t *Task) SetMob(mob []byte, tp FileType) { @@ -59,12 +63,19 @@ func (t *Task) Mob(tp FileType) []byte { return t.devRaw } +func NewBreakTask() *Task { + return &Task{ + isBreakTask: true, + } +} + type Storage struct { - cfg *config.Config - s3 *storage.S3 - startBytes []byte - tasks chan *Task - ready chan struct{} + cfg *config.Config + s3 *storage.S3 + startBytes []byte + compressionTasks chan *Task // brotli compression or gzip compression with encryption + uploadingTasks chan *Task // upload to s3 + workersStopped chan struct{} } func New(cfg *config.Config, s3 *storage.S3) (*Storage, error) { @@ -75,24 +86,30 @@ func New(cfg *config.Config, s3 *storage.S3) (*Storage, error) { return nil, fmt.Errorf("s3 storage is empty") } newStorage := &Storage{ - cfg: cfg, - s3: s3, - startBytes: make([]byte, cfg.FileSplitSize), - tasks: make(chan *Task, 1), - ready: make(chan struct{}), + cfg: cfg, + s3: s3, + startBytes: make([]byte, cfg.FileSplitSize), + compressionTasks: make(chan *Task, 1), + uploadingTasks: make(chan *Task, 1), + workersStopped: make(chan struct{}), } - go newStorage.worker() + go newStorage.compressionWorker() + go newStorage.uploadingWorker() return newStorage, nil } func (s *Storage) Wait() { - <-s.ready + // Send stop signal to the first worker + s.compressionTasks <- NewBreakTask() + // Wait stopped signal from the last workers + <-s.workersStopped } func (s *Storage) Process(msg *messages.SessionEnd) (err error) { // Generate file path sessionID := strconv.FormatUint(msg.SessionID(), 10) filePath := s.cfg.FSDir + "/" + sessionID + // Prepare sessions newTask := &Task{ id: sessionID, @@ -121,14 +138,13 @@ func (s *Storage) Process(msg *messages.SessionEnd) (err error) { } return err } - // Send new task to worker - s.tasks <- newTask - // Unload worker - <-s.ready + + // Send new task to compression worker + s.compressionTasks <- newTask return nil } -func (s *Storage) openSession(filePath string, tp FileType) ([]byte, error) { +func (s *Storage) openSession(sessID, filePath string, tp FileType) ([]byte, error) { if tp == DEV { filePath += "devtools" } @@ -147,7 +163,7 @@ func (s *Storage) openSession(filePath string, tp FileType) ([]byte, error) { return raw, nil } start := time.Now() - res, err := s.sortSessionMessages(raw) + res, err := s.sortSessionMessages(sessID, raw) if err != nil { return nil, fmt.Errorf("can't sort session, err: %s", err) } @@ -155,9 +171,9 @@ func (s *Storage) openSession(filePath string, tp FileType) ([]byte, error) { return res, nil } -func (s *Storage) sortSessionMessages(raw []byte) ([]byte, error) { +func (s *Storage) sortSessionMessages(sessID string, raw []byte) ([]byte, error) { // Parse messages, sort by index and save result into slice of bytes - unsortedMessages, err := messages.SplitMessages(raw) + unsortedMessages, err := messages.SplitMessages(sessID, raw) if err != nil { log.Printf("can't sort session, err: %s", err) return raw, nil @@ -168,7 +184,7 @@ func (s *Storage) sortSessionMessages(raw []byte) ([]byte, error) { func (s *Storage) prepareSession(path string, tp FileType, task *Task) error { // Open session file startRead := time.Now() - mob, err := s.openSession(path, tp) + mob, err := s.openSession(task.id, path, tp) if err != nil { return err } @@ -177,9 +193,6 @@ func (s *Storage) prepareSession(path string, tp FileType, task *Task) error { // Put opened session file into task struct task.SetMob(mob, tp) - - // Encrypt and compress session - s.packSession(task, tp) return nil } @@ -267,8 +280,12 @@ func (s *Storage) packSessionBetter(task *Task, tp FileType) { if tp == DOM { task.doms = result + // Record full dom (start) raw size + task.domsRawSize = float64(len(mob)) } else { task.dev = result + // Record dev raw size + task.devRawSize = float64(len(mob)) } return } @@ -284,7 +301,8 @@ func (s *Storage) packSessionBetter(task *Task, tp FileType) { start := time.Now() task.doms = s.compressSessionBetter(mob[:s.cfg.FileSplitSize]) firstPart = time.Since(start).Milliseconds() - + // Record dom start raw size + task.domsRawSize = float64(s.cfg.FileSplitSize) // Finish task wg.Done() }() @@ -294,7 +312,8 @@ func (s *Storage) packSessionBetter(task *Task, tp FileType) { start := time.Now() task.dome = s.compressSessionBetter(mob[s.cfg.FileSplitSize:]) secondPart = time.Since(start).Milliseconds() - + // Record dom end raw size + task.domeRawSize = float64(len(mob) - s.cfg.FileSplitSize) // Finish task wg.Done() }() @@ -365,6 +384,9 @@ func (s *Storage) uploadSession(task *Task) { } go func() { if task.doms != nil { + // Record compression ratio + metrics.RecordSessionCompressionRatio(float64(task.doms.Len())/task.domsRawSize, DOM.String()) + // Upload session to s3 start := time.Now() if err := s.s3.Upload(task.doms, task.id+string(DOM)+"s", "application/octet-stream", compression); err != nil { log.Fatalf("Storage: start upload failed. %s", err) @@ -375,6 +397,9 @@ func (s *Storage) uploadSession(task *Task) { }() go func() { if task.dome != nil { + // Record compression ratio + metrics.RecordSessionCompressionRatio(float64(task.dome.Len())/task.domeRawSize, DOM.String()) + // Upload session to s3 start := time.Now() if err := s.s3.Upload(task.dome, task.id+string(DOM)+"e", "application/octet-stream", compression); err != nil { log.Fatalf("Storage: start upload failed. %s", err) @@ -385,6 +410,9 @@ func (s *Storage) uploadSession(task *Task) { }() go func() { if task.dev != nil { + // Record compression ratio + metrics.RecordSessionCompressionRatio(float64(task.dev.Len())/task.devRawSize, DEV.String()) + // Upload session to s3 start := time.Now() if err := s.s3.Upload(task.dev, task.id+string(DEV), "application/octet-stream", compression); err != nil { log.Fatalf("Storage: start upload failed. %s", err) @@ -399,14 +427,43 @@ func (s *Storage) uploadSession(task *Task) { metrics.IncreaseStorageTotalSessions() } -func (s *Storage) worker() { +func (s *Storage) doCompression(task *Task) { + wg := &sync.WaitGroup{} + wg.Add(2) + go func() { + s.packSession(task, DOM) + wg.Done() + }() + go func() { + s.packSession(task, DEV) + wg.Done() + }() + wg.Wait() + s.uploadingTasks <- task +} + +func (s *Storage) compressionWorker() { for { select { - case task := <-s.tasks: - s.uploadSession(task) - default: - // Signal that worker finished all tasks - s.ready <- struct{}{} + case task := <-s.compressionTasks: + if task.isBreakTask { + s.uploadingTasks <- task + continue + } + s.doCompression(task) + } + } +} + +func (s *Storage) uploadingWorker() { + for { + select { + case task := <-s.uploadingTasks: + if task.isBreakTask { + s.workersStopped <- struct{}{} + continue + } + s.uploadSession(task) } } } diff --git a/backend/pkg/messages/session-iterator.go b/backend/pkg/messages/session-iterator.go index e008fa62a..dae3c3233 100644 --- a/backend/pkg/messages/session-iterator.go +++ b/backend/pkg/messages/session-iterator.go @@ -21,9 +21,10 @@ func (m *msgInfo) Print() string { return fmt.Sprintf("index: %d, start: %d, end: %d, type: %d, body: %s", m.index, m.start, m.end, m.msgType, m.body) } -func SplitMessages(data []byte) ([]*msgInfo, error) { +func SplitMessages(sessID string, data []byte) ([]*msgInfo, error) { messages := make([]*msgInfo, 0) indexes := make(map[uint64]bool) + hadDuplicates := false var lastTimestamp uint64 reader := NewBytesReader(data) for { @@ -55,8 +56,9 @@ func SplitMessages(data []byte) ([]*msgInfo, error) { return messages, fmt.Errorf("read message body err: %s", err) } - if _, ok := indexes[msgIndex]; ok { - log.Printf("duplicate message index: %d", msgIndex) + if _, ok := indexes[msgIndex]; ok && !hadDuplicates { + hadDuplicates = true + log.Printf("Session %s has duplicate messages", sessID) continue } indexes[msgIndex] = true diff --git a/backend/pkg/metrics/storage/metrics.go b/backend/pkg/metrics/storage/metrics.go index 455316a85..347224f7b 100644 --- a/backend/pkg/metrics/storage/metrics.go +++ b/backend/pkg/metrics/storage/metrics.go @@ -127,6 +127,20 @@ func RecordSessionUploadDuration(durMillis float64, fileType string) { storageSessionUploadDuration.WithLabelValues(fileType).Observe(durMillis / 1000.0) } +var storageSessionCompressionRatio = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "storage", + Name: "compression_ratio", + Help: "A histogram displaying the compression ratio of mob files for each session.", + Buckets: common.DefaultDurationBuckets, + }, + []string{"file_type"}, +) + +func RecordSessionCompressionRatio(ratio float64, fileType string) { + storageSessionCompressionRatio.WithLabelValues(fileType).Observe(ratio) +} + func List() []prometheus.Collector { return []prometheus.Collector{ storageSessionSize, @@ -136,5 +150,6 @@ func List() []prometheus.Collector { storageSessionEncryptionDuration, storageSessionCompressDuration, storageSessionUploadDuration, + storageSessionCompressionRatio, } }