From 9afc95d894642a4f91685c8d7a3837231aedcc03 Mon Sep 17 00:00:00 2001 From: Alexander Date: Wed, 9 Nov 2022 10:52:47 +0100 Subject: [PATCH] Mob file backward compatibility (#804) * feat(backend): added mob file backward compatibility --- backend/cmd/sink/main.go | 30 ++++++++++-- backend/internal/sink/oswriter/oswriter.go | 55 +++++++++++++++++++++- backend/internal/storage/storage.go | 21 ++++++++- 3 files changed, 98 insertions(+), 8 deletions(-) diff --git a/backend/cmd/sink/main.go b/backend/cmd/sink/main.go index 1f34760e1..d3cc99e40 100644 --- a/backend/cmd/sink/main.go +++ b/backend/cmd/sink/main.go @@ -6,6 +6,7 @@ import ( "openreplay/backend/pkg/pprof" "os" "os/signal" + "strings" "syscall" "time" @@ -100,15 +101,36 @@ func main() { log.Printf("can't encode with index, err: %s", err) return } + wasWritten := false // To avoid timestamp duplicates in original mob file if messages.IsDOMType(msg.TypeID()) { if err := writer.WriteDOM(msg.SessionID(), data); err != nil { - log.Printf("DOM Writer error: %s, info: %s", err, msg.Meta().Batch().Info()) + if strings.Contains(err.Error(), "not a directory") { + // Trying to write data to mob file by original path + oldErr := writer.WriteMOB(msg.SessionID(), data) + if oldErr != nil { + log.Printf("MOB Writeer error: %s, prev DOM error: %s, info: %s", oldErr, err, msg.Meta().Batch().Info()) + } else { + wasWritten = true + } + } else { + log.Printf("DOM Writer error: %s, info: %s", err, msg.Meta().Batch().Info()) + } } } if !messages.IsDOMType(msg.TypeID()) || msg.TypeID() == messages.MsgTimestamp { // TODO: write only necessary timestamps if err := writer.WriteDEV(msg.SessionID(), data); err != nil { - log.Printf("Devtools Writer error: %s, info: %s", err, msg.Meta().Batch().Info()) + if strings.Contains(err.Error(), "not a directory") { + if !wasWritten { + // Trying to write data to mob file by original path + oldErr := writer.WriteMOB(msg.SessionID(), data) + if oldErr != nil { + log.Printf("MOB Writeer error: %s, prev DEV error: %s, info: %s", oldErr, err, msg.Meta().Batch().Info()) + } + } + } else { + log.Printf("Devtools Writer error: %s, info: %s", err, msg.Meta().Batch().Info()) + } } } @@ -136,8 +158,8 @@ func main() { select { case sig := <-sigchan: log.Printf("Caught signal %v: terminating\n", sig) - if err := writer.SyncAll(); err != nil { - log.Printf("sync error: %v\n", err) + if err := writer.CloseAll(); err != nil { + log.Printf("closeAll error: %v\n", err) } if err := consumer.Commit(); err != nil { log.Printf("can't commit messages: %s", err) diff --git a/backend/internal/sink/oswriter/oswriter.go b/backend/internal/sink/oswriter/oswriter.go index 83567c4a9..070540b1d 100644 --- a/backend/internal/sink/oswriter/oswriter.go +++ b/backend/internal/sink/oswriter/oswriter.go @@ -1,6 +1,8 @@ package oswriter import ( + "errors" + "log" "math" "os" "path/filepath" @@ -45,12 +47,22 @@ func (w *Writer) open(fname string) (*os.File, error) { // mkdir if not exist pathTo := w.dir + filepath.Dir(fname) - if _, err := os.Stat(pathTo); os.IsNotExist(err) { - os.MkdirAll(pathTo, 0755) + if info, err := os.Stat(pathTo); os.IsNotExist(err) { + if err := os.MkdirAll(pathTo, 0755); err != nil { + log.Printf("os.MkdirAll error: %s", err) + } + } else { + if err != nil { + return nil, err + } + if !info.IsDir() { + return nil, errors.New("not a directory") + } } file, err := os.OpenFile(w.dir+fname, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) if err != nil { + log.Printf("os.OpenFile error: %s", err) return nil, err } w.files[fname] = file @@ -82,6 +94,17 @@ func (w *Writer) WriteDEV(sid uint64, data []byte) error { return w.write(strconv.FormatUint(sid, 10)+"/devtools.mob", data) } +func (w *Writer) WriteMOB(sid uint64, data []byte) error { + // Use session id as a file name without directory + fname := strconv.FormatUint(sid, 10) + file, err := w.openWithoutDir(fname) + if err != nil { + return err + } + _, err = file.Write(data) + return err +} + func (w *Writer) write(fname string, data []byte) error { file, err := w.open(fname) if err != nil { @@ -91,6 +114,34 @@ func (w *Writer) write(fname string, data []byte) error { return err } +func (w *Writer) openWithoutDir(fname string) (*os.File, error) { + file, ok := w.files[fname] + if ok { + return file, nil + } + if len(w.atimes) == w.ulimit { + var m_k string + var m_t int64 = math.MaxInt64 + for k, t := range w.atimes { + if t < m_t { + m_k = k + m_t = t + } + } + if err := w.close(m_k); err != nil { + return nil, err + } + } + + file, err := os.OpenFile(w.dir+fname, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + return nil, err + } + w.files[fname] = file + w.atimes[fname] = time.Now().Unix() + return file, nil +} + func (w *Writer) SyncAll() error { for _, file := range w.files { if err := file.Sync(); err != nil { diff --git a/backend/internal/storage/storage.go b/backend/internal/storage/storage.go index a4738c10e..7fdc06c4f 100644 --- a/backend/internal/storage/storage.go +++ b/backend/internal/storage/storage.go @@ -13,6 +13,7 @@ import ( "openreplay/backend/pkg/storage" "os" "strconv" + "strings" "time" ) @@ -72,7 +73,12 @@ func New(cfg *config.Config, s3 *storage.S3, metrics *monitoring.Metrics) (*Stor func (s *Storage) UploadSessionFiles(msg *messages.SessionEnd) error { sessionDir := strconv.FormatUint(msg.SessionID(), 10) if err := s.uploadKey(msg.SessionID(), sessionDir+"/dom.mob", true, 5, msg.EncryptionKey); err != nil { - return err + oldErr := s.uploadKey(msg.SessionID(), sessionDir, true, 5, msg.EncryptionKey) + if oldErr != nil { + return fmt.Errorf("upload file error: %s. failed checking mob file using old path: %s", err, oldErr) + } + // Exit method anyway because we don't have dev tools separation in prev version + return nil } if err := s.uploadKey(msg.SessionID(), sessionDir+"/devtools.mob", false, 4, msg.EncryptionKey); err != nil { return err @@ -96,6 +102,12 @@ func (s *Storage) uploadKey(sessID uint64, key string, shouldSplit bool, retryCo } defer file.Close() + // Ignore "s" at the end of mob file name for "old" sessions + newVers := false + if strings.Contains(key, "/") { + newVers = true + } + var fileSize int64 = 0 fileInfo, err := file.Stat() if err != nil { @@ -103,6 +115,7 @@ func (s *Storage) uploadKey(sessID uint64, key string, shouldSplit bool, retryCo } else { fileSize = fileInfo.Size() } + var encryptedData []byte if shouldSplit { nRead, err := file.Read(s.startBytes) @@ -133,7 +146,11 @@ func (s *Storage) uploadKey(sessID uint64, key string, shouldSplit bool, retryCo } // Compress and save to s3 startReader := bytes.NewBuffer(encryptedData) - if err := s.s3.Upload(s.gzipFile(startReader), key+"s", "application/octet-stream", true); err != nil { + startKey := key + if newVers { + startKey += "s" + } + if err := s.s3.Upload(s.gzipFile(startReader), startKey, "application/octet-stream", true); err != nil { log.Fatalf("Storage: start upload failed. %v\n", err) } // TODO: fix possible error (if we read less then FileSplitSize)