From 91709ac90981916cdcbc61a93852ffb34348417a Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 17 Nov 2022 16:15:25 +0100 Subject: [PATCH] [Sink] splitting mob files without folders (#824) * feat(backend): split mob file into 2 without folders --- backend/cmd/sink/main.go | 41 ++--- backend/internal/config/storage/config.go | 2 +- backend/internal/sink/oswriter/oswriter.go | 170 ++++++++++----------- backend/internal/storage/storage.go | 65 ++++---- 4 files changed, 130 insertions(+), 148 deletions(-) diff --git a/backend/cmd/sink/main.go b/backend/cmd/sink/main.go index d3cc99e40..675d965c9 100644 --- a/backend/cmd/sink/main.go +++ b/backend/cmd/sink/main.go @@ -3,10 +3,8 @@ package main import ( "context" "log" - "openreplay/backend/pkg/pprof" "os" "os/signal" - "strings" "syscall" "time" @@ -16,6 +14,7 @@ import ( "openreplay/backend/internal/storage" "openreplay/backend/pkg/messages" "openreplay/backend/pkg/monitoring" + "openreplay/backend/pkg/pprof" "openreplay/backend/pkg/queue" "openreplay/backend/pkg/url/assets" ) @@ -64,6 +63,9 @@ func main() { if err := producer.Produce(cfg.TopicTrigger, msg.SessionID(), msg.Encode()); err != nil { log.Printf("can't send SessionEnd to trigger topic: %s; sessID: %d", err, msg.SessionID()) } + if err := writer.Close(msg.SessionID()); err != nil { + log.Printf("can't close session file: %s", err) + } return } @@ -98,39 +100,18 @@ func main() { // Write encoded message with index to session file data := msg.EncodeWithIndex() if data == nil { - log.Printf("can't encode with index, err: %s", err) return } - wasWritten := false // To avoid timestamp duplicates in original mob file + + // Write message to file if messages.IsDOMType(msg.TypeID()) { if err := writer.WriteDOM(msg.SessionID(), data); err != nil { - if strings.Contains(err.Error(), "not a directory") { - // Trying to write data to mob file by original path - oldErr := writer.WriteMOB(msg.SessionID(), data) - if oldErr != nil { - log.Printf("MOB Writeer error: %s, prev DOM error: %s, info: %s", oldErr, err, msg.Meta().Batch().Info()) - } else { - wasWritten = true - } - } else { - log.Printf("DOM Writer error: %s, info: %s", err, msg.Meta().Batch().Info()) - } + log.Printf("Writer error: %v\n", err) } } if !messages.IsDOMType(msg.TypeID()) || msg.TypeID() == messages.MsgTimestamp { - // TODO: write only necessary timestamps if err := writer.WriteDEV(msg.SessionID(), data); err != nil { - if strings.Contains(err.Error(), "not a directory") { - if !wasWritten { - // Trying to write data to mob file by original path - oldErr := writer.WriteMOB(msg.SessionID(), data) - if oldErr != nil { - log.Printf("MOB Writeer error: %s, prev DEV error: %s, info: %s", oldErr, err, msg.Meta().Batch().Info()) - } - } - } else { - log.Printf("Devtools Writer error: %s, info: %s", err, msg.Meta().Batch().Info()) - } + log.Printf("Writer error: %v\n", err) } } @@ -167,13 +148,17 @@ func main() { consumer.Close() os.Exit(0) case <-tick: + counter.Print() + s := time.Now() if err := writer.SyncAll(); err != nil { log.Fatalf("sync error: %v\n", err) } - counter.Print() + dur := time.Now().Sub(s).Milliseconds() + s = time.Now() if err := consumer.Commit(); err != nil { log.Printf("can't commit messages: %s", err) } + log.Printf("sync: %d, commit: %d, writer: %s", dur, time.Now().Sub(s).Milliseconds(), writer.Info()) default: err := consumer.ConsumeNext() if err != nil { diff --git a/backend/internal/config/storage/config.go b/backend/internal/config/storage/config.go index fdf29b7db..6083f0249 100644 --- a/backend/internal/config/storage/config.go +++ b/backend/internal/config/storage/config.go @@ -11,7 +11,6 @@ type Config struct { S3Region string `env:"AWS_REGION_WEB,required"` S3Bucket string `env:"S3_BUCKET_WEB,required"` FSDir string `env:"FS_DIR,required"` - FSCleanHRS int `env:"FS_CLEAN_HRS,required"` FileSplitSize int `env:"FILE_SPLIT_SIZE,required"` RetryTimeout time.Duration `env:"RETRY_TIMEOUT,default=2m"` GroupStorage string `env:"GROUP_STORAGE,required"` @@ -21,6 +20,7 @@ type Config struct { DeleteTimeout time.Duration `env:"DELETE_TIMEOUT,default=48h"` ProducerCloseTimeout int `env:"PRODUCER_CLOSE_TIMEOUT,default=15000"` UseFailover bool `env:"USE_FAILOVER,default=false"` + MaxFileSize int64 `env:"MAX_FILE_SIZE,default=524288000"` } func New() *Config { diff --git a/backend/internal/sink/oswriter/oswriter.go b/backend/internal/sink/oswriter/oswriter.go index 070540b1d..ec42d7668 100644 --- a/backend/internal/sink/oswriter/oswriter.go +++ b/backend/internal/sink/oswriter/oswriter.go @@ -1,38 +1,53 @@ package oswriter import ( - "errors" - "log" + "fmt" "math" "os" - "path/filepath" "strconv" "time" ) +type FileType int + +const ( + DOM FileType = 1 + DEV FileType = 2 +) + type Writer struct { - ulimit int - dir string - files map[string]*os.File - atimes map[string]int64 + ulimit int + dir string + files map[uint64]*os.File + devtools map[uint64]*os.File + atimes map[uint64]int64 } func NewWriter(ulimit uint16, dir string) *Writer { return &Writer{ - ulimit: int(ulimit), - dir: dir + "/", - files: make(map[string]*os.File), - atimes: make(map[string]int64), + ulimit: int(ulimit), + dir: dir + "/", + files: make(map[uint64]*os.File, 1024), + devtools: make(map[uint64]*os.File, 1024), + atimes: make(map[uint64]int64, 1024), } } -func (w *Writer) open(fname string) (*os.File, error) { - file, ok := w.files[fname] - if ok { - return file, nil +func (w *Writer) open(key uint64, mode FileType) (*os.File, error) { + if mode == DOM { + file, ok := w.files[key] + if ok { + return file, nil + } + } else { + file, ok := w.devtools[key] + if ok { + return file, nil + } } - if len(w.atimes) == w.ulimit { - var m_k string + + if len(w.atimes) >= w.ulimit { + var m_k uint64 var m_t int64 = math.MaxInt64 for k, t := range w.atimes { if t < m_t { @@ -40,38 +55,30 @@ func (w *Writer) open(fname string) (*os.File, error) { m_t = t } } - if err := w.close(m_k); err != nil { + if err := w.Close(m_k); err != nil { return nil, err } } - - // mkdir if not exist - pathTo := w.dir + filepath.Dir(fname) - if info, err := os.Stat(pathTo); os.IsNotExist(err) { - if err := os.MkdirAll(pathTo, 0755); err != nil { - log.Printf("os.MkdirAll error: %s", err) - } - } else { - if err != nil { - return nil, err - } - if !info.IsDir() { - return nil, errors.New("not a directory") - } + filePath := w.dir + strconv.FormatUint(key, 10) + if mode == DEV { + filePath += "devtools" } - - file, err := os.OpenFile(w.dir+fname, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) if err != nil { - log.Printf("os.OpenFile error: %s", err) return nil, err } - w.files[fname] = file - w.atimes[fname] = time.Now().Unix() + if mode == DOM { + w.files[key] = file + } else { + w.devtools[key] = file + } + w.atimes[key] = time.Now().Unix() return file, nil } -func (w *Writer) close(fname string) error { - file := w.files[fname] +func (w *Writer) Close(key uint64) error { + // Close dom file + file := w.files[key] if file == nil { return nil } @@ -81,23 +88,33 @@ func (w *Writer) close(fname string) error { if err := file.Close(); err != nil { return err } - delete(w.files, fname) - delete(w.atimes, fname) + delete(w.files, key) + delete(w.atimes, key) + // Close dev file + file = w.devtools[key] + if file == nil { + return nil + } + if err := file.Sync(); err != nil { + return err + } + if err := file.Close(); err != nil { + return err + } + delete(w.devtools, key) return nil } -func (w *Writer) WriteDOM(sid uint64, data []byte) error { - return w.write(strconv.FormatUint(sid, 10)+"/dom.mob", data) +func (w *Writer) WriteDOM(key uint64, data []byte) error { + return w.Write(key, DOM, data) } -func (w *Writer) WriteDEV(sid uint64, data []byte) error { - return w.write(strconv.FormatUint(sid, 10)+"/devtools.mob", data) +func (w *Writer) WriteDEV(key uint64, data []byte) error { + return w.Write(key, DEV, data) } -func (w *Writer) WriteMOB(sid uint64, data []byte) error { - // Use session id as a file name without directory - fname := strconv.FormatUint(sid, 10) - file, err := w.openWithoutDir(fname) +func (w *Writer) Write(key uint64, mode FileType, data []byte) error { + file, err := w.open(key, mode) if err != nil { return err } @@ -105,49 +122,17 @@ func (w *Writer) WriteMOB(sid uint64, data []byte) error { return err } -func (w *Writer) write(fname string, data []byte) error { - file, err := w.open(fname) - if err != nil { - return err - } - _, err = file.Write(data) - return err -} - -func (w *Writer) openWithoutDir(fname string) (*os.File, error) { - file, ok := w.files[fname] - if ok { - return file, nil - } - if len(w.atimes) == w.ulimit { - var m_k string - var m_t int64 = math.MaxInt64 - for k, t := range w.atimes { - if t < m_t { - m_k = k - m_t = t - } - } - if err := w.close(m_k); err != nil { - return nil, err - } - } - - file, err := os.OpenFile(w.dir+fname, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) - if err != nil { - return nil, err - } - w.files[fname] = file - w.atimes[fname] = time.Now().Unix() - return file, nil -} - func (w *Writer) SyncAll() error { for _, file := range w.files { if err := file.Sync(); err != nil { return err } } + for _, file := range w.devtools { + if err := file.Sync(); err != nil { + return err + } + } return nil } @@ -161,6 +146,19 @@ func (w *Writer) CloseAll() error { } } w.files = nil + for _, file := range w.devtools { + if err := file.Sync(); err != nil { + return err + } + if err := file.Close(); err != nil { + return err + } + } + w.devtools = nil w.atimes = nil return nil } + +func (w *Writer) Info() string { + return fmt.Sprintf("dom: %d, dev: %d", len(w.files), len(w.devtools)) +} diff --git a/backend/internal/storage/storage.go b/backend/internal/storage/storage.go index 7fdc06c4f..9959cc4dd 100644 --- a/backend/internal/storage/storage.go +++ b/backend/internal/storage/storage.go @@ -13,7 +13,6 @@ import ( "openreplay/backend/pkg/storage" "os" "strconv" - "strings" "time" ) @@ -71,43 +70,46 @@ func New(cfg *config.Config, s3 *storage.S3, metrics *monitoring.Metrics) (*Stor } func (s *Storage) UploadSessionFiles(msg *messages.SessionEnd) error { - sessionDir := strconv.FormatUint(msg.SessionID(), 10) - if err := s.uploadKey(msg.SessionID(), sessionDir+"/dom.mob", true, 5, msg.EncryptionKey); err != nil { - oldErr := s.uploadKey(msg.SessionID(), sessionDir, true, 5, msg.EncryptionKey) - if oldErr != nil { - return fmt.Errorf("upload file error: %s. failed checking mob file using old path: %s", err, oldErr) - } - // Exit method anyway because we don't have dev tools separation in prev version - return nil - } - if err := s.uploadKey(msg.SessionID(), sessionDir+"/devtools.mob", false, 4, msg.EncryptionKey); err != nil { + if err := s.uploadKey(msg.SessionID(), "/dom.mob", true, 5, msg.EncryptionKey); err != nil { return err } + if err := s.uploadKey(msg.SessionID(), "/devtools.mob", false, 4, msg.EncryptionKey); err != nil { + log.Printf("can't find devtools for session: %d, err: %s", msg.SessionID(), err) + } return nil } -// TODO: make a bit cleaner -func (s *Storage) uploadKey(sessID uint64, key string, shouldSplit bool, retryCount int, encryptionKey string) error { +// TODO: make a bit cleaner. +// TODO: Of course, I'll do! +func (s *Storage) uploadKey(sessID uint64, suffix string, shouldSplit bool, retryCount int, encryptionKey string) error { if retryCount <= 0 { return nil } - start := time.Now() - file, err := os.Open(s.cfg.FSDir + "/" + key) + fileName := strconv.FormatUint(sessID, 10) + mobFileName := fileName + if suffix == "/devtools.mob" { + mobFileName += "devtools" + } + filePath := s.cfg.FSDir + "/" + mobFileName + + // Check file size before download into memory + info, err := os.Stat(filePath) + if err != nil { + if info.Size() > s.cfg.MaxFileSize { + log.Printf("big file, size: %d, session: %d", info.Size(), sessID) + return nil + } + } + file, err := os.Open(filePath) if err != nil { return fmt.Errorf("File open error: %v; sessID: %s, part: %d, sessStart: %s\n", - err, key, sessID%16, + err, fileName, sessID%16, time.UnixMilli(int64(flakeid.ExtractTimestamp(sessID))), ) } defer file.Close() - // Ignore "s" at the end of mob file name for "old" sessions - newVers := false - if strings.Contains(key, "/") { - newVers = true - } - var fileSize int64 = 0 fileInfo, err := file.Stat() if err != nil { @@ -117,17 +119,18 @@ func (s *Storage) uploadKey(sessID uint64, key string, shouldSplit bool, retryCo } var encryptedData []byte + fileName += suffix if shouldSplit { nRead, err := file.Read(s.startBytes) if err != nil { log.Printf("File read error: %s; sessID: %s, part: %d, sessStart: %s", err, - key, + fileName, sessID%16, time.UnixMilli(int64(flakeid.ExtractTimestamp(sessID))), ) time.AfterFunc(s.cfg.RetryTimeout, func() { - s.uploadKey(sessID, key, shouldSplit, retryCount-1, encryptionKey) + s.uploadKey(sessID, suffix, shouldSplit, retryCount-1, encryptionKey) }) return nil } @@ -146,11 +149,7 @@ func (s *Storage) uploadKey(sessID uint64, key string, shouldSplit bool, retryCo } // Compress and save to s3 startReader := bytes.NewBuffer(encryptedData) - startKey := key - if newVers { - startKey += "s" - } - if err := s.s3.Upload(s.gzipFile(startReader), startKey, "application/octet-stream", true); err != nil { + if err := s.s3.Upload(s.gzipFile(startReader), fileName+"s", "application/octet-stream", true); err != nil { log.Fatalf("Storage: start upload failed. %v\n", err) } // TODO: fix possible error (if we read less then FileSplitSize) @@ -161,7 +160,7 @@ func (s *Storage) uploadKey(sessID uint64, key string, shouldSplit bool, retryCo if err != nil { log.Printf("File read error: %s; sessID: %s, part: %d, sessStart: %s", err, - key, + fileName, sessID%16, time.UnixMilli(int64(flakeid.ExtractTimestamp(sessID))), ) @@ -183,7 +182,7 @@ func (s *Storage) uploadKey(sessID uint64, key string, shouldSplit bool, retryCo } // Compress and save to s3 endReader := bytes.NewBuffer(encryptedData) - if err := s.s3.Upload(s.gzipFile(endReader), key+"e", "application/octet-stream", true); err != nil { + if err := s.s3.Upload(s.gzipFile(endReader), fileName+"e", "application/octet-stream", true); err != nil { log.Fatalf("Storage: end upload failed. %v\n", err) } } @@ -195,7 +194,7 @@ func (s *Storage) uploadKey(sessID uint64, key string, shouldSplit bool, retryCo if err != nil { log.Printf("File read error: %s; sessID: %s, part: %d, sessStart: %s", err, - key, + fileName, sessID%16, time.UnixMilli(int64(flakeid.ExtractTimestamp(sessID))), ) @@ -216,7 +215,7 @@ func (s *Storage) uploadKey(sessID uint64, key string, shouldSplit bool, retryCo encryptedData = fileData } endReader := bytes.NewBuffer(encryptedData) - if err := s.s3.Upload(s.gzipFile(endReader), key+"s", "application/octet-stream", true); err != nil { + if err := s.s3.Upload(s.gzipFile(endReader), fileName, "application/octet-stream", true); err != nil { log.Fatalf("Storage: end upload failed. %v\n", err) } s.archivingTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()))