From bab5a819595ff7274d35bcf80b77d41003fe9f3f Mon Sep 17 00:00:00 2001 From: Alexander Date: Tue, 13 Dec 2022 14:35:41 +0100 Subject: [PATCH] [Storage] added workers perf improvements (#877) * feat(backend): added workers for storage service --- backend/cmd/storage/main.go | 9 +- backend/internal/storage/storage.go | 306 ++++++++++++++-------------- ee/backend/pkg/failover/failover.go | 2 +- 3 files changed, 161 insertions(+), 156 deletions(-) diff --git a/backend/cmd/storage/main.go b/backend/cmd/storage/main.go index 251ce82e2..267cb898d 100644 --- a/backend/cmd/storage/main.go +++ b/backend/cmd/storage/main.go @@ -44,7 +44,7 @@ func main() { messages.NewMessageIterator( func(msg messages.Message) { sesEnd := msg.(*messages.SessionEnd) - if err := srv.UploadSessionFiles(sesEnd); err != nil { + if err := srv.Upload(sesEnd); err != nil { log.Printf("can't find session: %d", msg.SessionID()) sessionFinder.Find(msg.SessionID(), sesEnd.Timestamp) } @@ -54,7 +54,7 @@ func main() { []int{messages.MsgSessionEnd}, true, ), - true, + false, cfg.MessageSizeLimit, ) @@ -69,10 +69,15 @@ func main() { case sig := <-sigchan: log.Printf("Caught signal %v: terminating\n", sig) sessionFinder.Stop() + srv.Wait() consumer.Close() os.Exit(0) case <-counterTick: go counter.Print() + srv.Wait() + if err := consumer.Commit(); err != nil { + log.Printf("can't commit messages: %s", err) + } case msg := <-consumer.Rebalanced(): log.Println(msg) default: diff --git a/backend/internal/storage/storage.go b/backend/internal/storage/storage.go index 12a37183f..594d97eea 100644 --- a/backend/internal/storage/storage.go +++ b/backend/internal/storage/storage.go @@ -2,20 +2,33 @@ package storage import ( "bytes" - "context" "fmt" + gzip "github.com/klauspost/pgzip" "go.opentelemetry.io/otel/metric/instrument/syncfloat64" "log" config "openreplay/backend/internal/config/storage" - "openreplay/backend/pkg/flakeid" "openreplay/backend/pkg/messages" "openreplay/backend/pkg/monitoring" "openreplay/backend/pkg/storage" "os" "strconv" - "time" + "sync" ) +type FileType string + +const ( + DOM FileType = "/dom.mob" + DEV FileType = "/devtools.mob" +) + +type Task struct { + id string + doms *bytes.Buffer + dome *bytes.Buffer + dev *bytes.Buffer +} + type Storage struct { cfg *config.Config s3 *storage.S3 @@ -27,6 +40,9 @@ type Storage struct { readingDOMTime syncfloat64.Histogram readingTime syncfloat64.Histogram archivingTime syncfloat64.Histogram + + tasks chan *Task + ready chan struct{} } func New(cfg *config.Config, s3 *storage.S3, metrics *monitoring.Metrics) (*Storage, error) { @@ -57,7 +73,7 @@ func New(cfg *config.Config, s3 *storage.S3, metrics *monitoring.Metrics) (*Stor if err != nil { log.Printf("can't create archiving_duration metric: %s", err) } - return &Storage{ + newStorage := &Storage{ cfg: cfg, s3: s3, startBytes: make([]byte, cfg.FileSplitSize), @@ -66,169 +82,153 @@ func New(cfg *config.Config, s3 *storage.S3, metrics *monitoring.Metrics) (*Stor sessionDevtoolsSize: sessionDevtoolsSize, readingTime: readingTime, archivingTime: archivingTime, - }, nil + tasks: make(chan *Task, 1), + ready: make(chan struct{}), + } + go newStorage.worker() + return newStorage, nil } -func (s *Storage) UploadSessionFiles(msg *messages.SessionEnd) error { - if err := s.uploadKey(msg.SessionID(), "/dom.mob", true, 5, msg.EncryptionKey); err != nil { - return err - } - if err := s.uploadKey(msg.SessionID(), "/devtools.mob", false, 4, msg.EncryptionKey); err != nil { - log.Printf("can't find devtools for session: %d, err: %s", msg.SessionID(), err) - } - return nil +func (s *Storage) Wait() { + <-s.ready } -// TODO: make a bit cleaner. -// TODO: Of course, I'll do! -func (s *Storage) uploadKey(sessID uint64, suffix string, shouldSplit bool, retryCount int, encryptionKey string) error { - if retryCount <= 0 { - return nil +func (s *Storage) Upload(msg *messages.SessionEnd) (err error) { + // Generate file path + sessionID := strconv.FormatUint(msg.SessionID(), 10) + filePath := s.cfg.FSDir + "/" + sessionID + // Prepare sessions + newTask := &Task{ + id: sessionID, } - start := time.Now() - fileName := strconv.FormatUint(sessID, 10) - mobFileName := fileName - if suffix == "/devtools.mob" { - mobFileName += "devtools" - } - filePath := s.cfg.FSDir + "/" + mobFileName + wg := &sync.WaitGroup{} + wg.Add(2) + go func() { + if prepErr := s.prepareSession(filePath, DOM, newTask); prepErr != nil { + err = fmt.Errorf("prepare session err: %s", prepErr) + } + wg.Done() + }() + go func() { + if prepErr := s.prepareSession(filePath, DOM, newTask); prepErr != nil { + err = fmt.Errorf("prepare session err: %s", prepErr) + } + wg.Done() + }() + wg.Wait() + // Send new task to worker + s.tasks <- newTask + // Unload worker + <-s.ready + return err +} +func (s *Storage) openSession(filePath string) ([]byte, error) { // Check file size before download into memory info, err := os.Stat(filePath) - if err == nil { - if info.Size() > s.cfg.MaxFileSize { - log.Printf("big file, size: %d, session: %d", info.Size(), sessID) - return nil - } + if err == nil && info.Size() > s.cfg.MaxFileSize { + return nil, fmt.Errorf("big file, size: %d", info.Size()) } - file, err := os.Open(filePath) + // Read file into memory + return os.ReadFile(filePath) +} + +func (s *Storage) prepareSession(path string, tp FileType, task *Task) error { + // Open mob file + if tp == DEV { + path += "devtools" + } + mob, err := s.openSession(path) if err != nil { - return fmt.Errorf("File open error: %v; sessID: %s, part: %d, sessStart: %s\n", - err, fileName, sessID%16, - time.UnixMilli(int64(flakeid.ExtractTimestamp(sessID))), - ) + return err } - defer file.Close() - - var fileSize int64 = 0 - fileInfo, err := file.Stat() - if err != nil { - log.Printf("can't get file info: %s", err) + if tp == DEV { + task.dev = s.compressSession(mob) } else { - fileSize = fileInfo.Size() - } - - var encryptedData []byte - fileName += suffix - if shouldSplit { - nRead, err := file.Read(s.startBytes) - if err != nil { - log.Printf("File read error: %s; sessID: %s, part: %d, sessStart: %s", - err, - fileName, - sessID%16, - time.UnixMilli(int64(flakeid.ExtractTimestamp(sessID))), - ) - time.AfterFunc(s.cfg.RetryTimeout, func() { - s.uploadKey(sessID, suffix, shouldSplit, retryCount-1, encryptionKey) - }) + if len(mob) <= s.cfg.FileSplitSize { + task.doms = s.compressSession(mob) return nil } - s.readingTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds())) - - start = time.Now() - // Encrypt session file if we have encryption key - if encryptionKey != "" { - encryptedData, err = EncryptData(s.startBytes[:nRead], []byte(encryptionKey)) - if err != nil { - log.Printf("can't encrypt data: %s", err) - encryptedData = s.startBytes[:nRead] - } - } else { - encryptedData = s.startBytes[:nRead] - } - // Compress and save to s3 - startReader := bytes.NewBuffer(encryptedData) - if err := s.s3.Upload(s.gzipFile(startReader), fileName+"s", "application/octet-stream", true); err != nil { - log.Fatalf("Storage: start upload failed. %v\n", err) - } - // TODO: fix possible error (if we read less then FileSplitSize) - if nRead == s.cfg.FileSplitSize { - restPartSize := fileSize - int64(nRead) - fileData := make([]byte, restPartSize) - nRead, err = file.Read(fileData) - if err != nil { - log.Printf("File read error: %s; sessID: %s, part: %d, sessStart: %s", - err, - fileName, - sessID%16, - time.UnixMilli(int64(flakeid.ExtractTimestamp(sessID))), - ) - return nil - } - if int64(nRead) != restPartSize { - log.Printf("can't read the rest part of file") - } - - // Encrypt session file if we have encryption key - if encryptionKey != "" { - encryptedData, err = EncryptData(fileData, []byte(encryptionKey)) - if err != nil { - log.Printf("can't encrypt data: %s", err) - encryptedData = fileData - } - } else { - encryptedData = fileData - } - // Compress and save to s3 - endReader := bytes.NewBuffer(encryptedData) - if err := s.s3.Upload(s.gzipFile(endReader), fileName+"e", "application/octet-stream", true); err != nil { - log.Fatalf("Storage: end upload failed. %v\n", err) - } - } - s.archivingTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds())) - } else { - start = time.Now() - fileData := make([]byte, fileSize) - nRead, err := file.Read(fileData) - if err != nil { - log.Printf("File read error: %s; sessID: %s, part: %d, sessStart: %s", - err, - fileName, - sessID%16, - time.UnixMilli(int64(flakeid.ExtractTimestamp(sessID))), - ) - return nil - } - if int64(nRead) != fileSize { - log.Printf("can't read the rest part of file") - } - - // Encrypt session file if we have encryption key - if encryptionKey != "" { - encryptedData, err = EncryptData(fileData, []byte(encryptionKey)) - if err != nil { - log.Printf("can't encrypt data: %s", err) - encryptedData = fileData - } - } else { - encryptedData = fileData - } - endReader := bytes.NewBuffer(encryptedData) - if err := s.s3.Upload(s.gzipFile(endReader), fileName, "application/octet-stream", true); err != nil { - log.Fatalf("Storage: end upload failed. %v\n", err) - } - s.archivingTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds())) + wg := &sync.WaitGroup{} + wg.Add(2) + go func() { + task.doms = s.compressSession(mob[:s.cfg.FileSplitSize]) + wg.Done() + }() + go func() { + task.dome = s.compressSession(mob[s.cfg.FileSplitSize:]) + wg.Done() + }() + wg.Wait() } - - // Save metrics - ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*200) - if shouldSplit { - s.totalSessions.Add(ctx, 1) - s.sessionDOMSize.Record(ctx, float64(fileSize)) - } else { - s.sessionDevtoolsSize.Record(ctx, float64(fileSize)) - } - return nil } + +func (s *Storage) encryptSession(data []byte, encryptionKey string) []byte { + var encryptedData []byte + var err error + if encryptionKey != "" { + encryptedData, err = EncryptData(data, []byte(encryptionKey)) + if err != nil { + log.Printf("can't encrypt data: %s", err) + encryptedData = data + } + } else { + encryptedData = data + } + return encryptedData +} + +func (s *Storage) compressSession(data []byte) *bytes.Buffer { + zippedMob := new(bytes.Buffer) + z, _ := gzip.NewWriterLevel(zippedMob, gzip.BestSpeed) + if _, err := z.Write(data); err != nil { + log.Printf("can't write session data to compressor: %s", err) + } + if err := z.Close(); err != nil { + log.Printf("can't close compressor: %s", err) + } + return zippedMob +} + +func (s *Storage) uploadSession(task *Task) { + wg := &sync.WaitGroup{} + wg.Add(3) + go func() { + if task.doms != nil { + if err := s.s3.Upload(task.doms, task.id+string(DOM)+"s", "application/octet-stream", true); err != nil { + log.Fatalf("Storage: start upload failed. %s", err) + } + } + wg.Done() + }() + go func() { + if task.dome != nil { + if err := s.s3.Upload(task.dome, task.id+string(DOM)+"e", "application/octet-stream", true); err != nil { + log.Fatalf("Storage: start upload failed. %s", err) + } + } + wg.Done() + }() + go func() { + if task.dev != nil { + if err := s.s3.Upload(task.dev, task.id+string(DEV), "application/octet-stream", true); err != nil { + log.Fatalf("Storage: start upload failed. %s", err) + } + } + wg.Done() + }() + wg.Wait() +} + +func (s *Storage) worker() { + for { + select { + case task := <-s.tasks: + s.uploadSession(task) + default: + // Signal that worker finished all tasks + s.ready <- struct{}{} + } + } +} diff --git a/ee/backend/pkg/failover/failover.go b/ee/backend/pkg/failover/failover.go index 1b9321afc..11ff7e4be 100644 --- a/ee/backend/pkg/failover/failover.go +++ b/ee/backend/pkg/failover/failover.go @@ -91,7 +91,7 @@ func (s *sessionFinderImpl) worker() { func (s *sessionFinderImpl) findSession(sessionID, timestamp, partition uint64) { sessEnd := &messages.SessionEnd{Timestamp: timestamp} sessEnd.SetSessionID(sessionID) - err := s.storage.UploadSessionFiles(sessEnd) + err := s.storage.Upload(sessEnd) if err == nil { log.Printf("found session: %d in partition: %d, original: %d", sessionID, partition, sessionID%numberOfPartitions)