Mob file backward compatibility (#804)

* feat(backend): added mob file backward compatibility
This commit is contained in:
Alexander 2022-11-09 10:52:47 +01:00 committed by GitHub
parent d65041fc30
commit 9afc95d894
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 98 additions and 8 deletions

View file

@ -6,6 +6,7 @@ import (
"openreplay/backend/pkg/pprof"
"os"
"os/signal"
"strings"
"syscall"
"time"
@ -100,15 +101,36 @@ func main() {
log.Printf("can't encode with index, err: %s", err)
return
}
wasWritten := false // To avoid timestamp duplicates in original mob file
if messages.IsDOMType(msg.TypeID()) {
if err := writer.WriteDOM(msg.SessionID(), data); err != nil {
log.Printf("DOM Writer error: %s, info: %s", err, msg.Meta().Batch().Info())
if strings.Contains(err.Error(), "not a directory") {
// Trying to write data to mob file by original path
oldErr := writer.WriteMOB(msg.SessionID(), data)
if oldErr != nil {
log.Printf("MOB Writeer error: %s, prev DOM error: %s, info: %s", oldErr, err, msg.Meta().Batch().Info())
} else {
wasWritten = true
}
} else {
log.Printf("DOM Writer error: %s, info: %s", err, msg.Meta().Batch().Info())
}
}
}
if !messages.IsDOMType(msg.TypeID()) || msg.TypeID() == messages.MsgTimestamp {
// TODO: write only necessary timestamps
if err := writer.WriteDEV(msg.SessionID(), data); err != nil {
log.Printf("Devtools Writer error: %s, info: %s", err, msg.Meta().Batch().Info())
if strings.Contains(err.Error(), "not a directory") {
if !wasWritten {
// Trying to write data to mob file by original path
oldErr := writer.WriteMOB(msg.SessionID(), data)
if oldErr != nil {
log.Printf("MOB Writeer error: %s, prev DEV error: %s, info: %s", oldErr, err, msg.Meta().Batch().Info())
}
}
} else {
log.Printf("Devtools Writer error: %s, info: %s", err, msg.Meta().Batch().Info())
}
}
}
@ -136,8 +158,8 @@ func main() {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
if err := writer.SyncAll(); err != nil {
log.Printf("sync error: %v\n", err)
if err := writer.CloseAll(); err != nil {
log.Printf("closeAll error: %v\n", err)
}
if err := consumer.Commit(); err != nil {
log.Printf("can't commit messages: %s", err)

View file

@ -1,6 +1,8 @@
package oswriter
import (
"errors"
"log"
"math"
"os"
"path/filepath"
@ -45,12 +47,22 @@ func (w *Writer) open(fname string) (*os.File, error) {
// mkdir if not exist
pathTo := w.dir + filepath.Dir(fname)
if _, err := os.Stat(pathTo); os.IsNotExist(err) {
os.MkdirAll(pathTo, 0755)
if info, err := os.Stat(pathTo); os.IsNotExist(err) {
if err := os.MkdirAll(pathTo, 0755); err != nil {
log.Printf("os.MkdirAll error: %s", err)
}
} else {
if err != nil {
return nil, err
}
if !info.IsDir() {
return nil, errors.New("not a directory")
}
}
file, err := os.OpenFile(w.dir+fname, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
log.Printf("os.OpenFile error: %s", err)
return nil, err
}
w.files[fname] = file
@ -82,6 +94,17 @@ func (w *Writer) WriteDEV(sid uint64, data []byte) error {
return w.write(strconv.FormatUint(sid, 10)+"/devtools.mob", data)
}
func (w *Writer) WriteMOB(sid uint64, data []byte) error {
// Use session id as a file name without directory
fname := strconv.FormatUint(sid, 10)
file, err := w.openWithoutDir(fname)
if err != nil {
return err
}
_, err = file.Write(data)
return err
}
func (w *Writer) write(fname string, data []byte) error {
file, err := w.open(fname)
if err != nil {
@ -91,6 +114,34 @@ func (w *Writer) write(fname string, data []byte) error {
return err
}
func (w *Writer) openWithoutDir(fname string) (*os.File, error) {
file, ok := w.files[fname]
if ok {
return file, nil
}
if len(w.atimes) == w.ulimit {
var m_k string
var m_t int64 = math.MaxInt64
for k, t := range w.atimes {
if t < m_t {
m_k = k
m_t = t
}
}
if err := w.close(m_k); err != nil {
return nil, err
}
}
file, err := os.OpenFile(w.dir+fname, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return nil, err
}
w.files[fname] = file
w.atimes[fname] = time.Now().Unix()
return file, nil
}
func (w *Writer) SyncAll() error {
for _, file := range w.files {
if err := file.Sync(); err != nil {

View file

@ -13,6 +13,7 @@ import (
"openreplay/backend/pkg/storage"
"os"
"strconv"
"strings"
"time"
)
@ -72,7 +73,12 @@ func New(cfg *config.Config, s3 *storage.S3, metrics *monitoring.Metrics) (*Stor
func (s *Storage) UploadSessionFiles(msg *messages.SessionEnd) error {
sessionDir := strconv.FormatUint(msg.SessionID(), 10)
if err := s.uploadKey(msg.SessionID(), sessionDir+"/dom.mob", true, 5, msg.EncryptionKey); err != nil {
return err
oldErr := s.uploadKey(msg.SessionID(), sessionDir, true, 5, msg.EncryptionKey)
if oldErr != nil {
return fmt.Errorf("upload file error: %s. failed checking mob file using old path: %s", err, oldErr)
}
// Exit method anyway because we don't have dev tools separation in prev version
return nil
}
if err := s.uploadKey(msg.SessionID(), sessionDir+"/devtools.mob", false, 4, msg.EncryptionKey); err != nil {
return err
@ -96,6 +102,12 @@ func (s *Storage) uploadKey(sessID uint64, key string, shouldSplit bool, retryCo
}
defer file.Close()
// Ignore "s" at the end of mob file name for "old" sessions
newVers := false
if strings.Contains(key, "/") {
newVers = true
}
var fileSize int64 = 0
fileInfo, err := file.Stat()
if err != nil {
@ -103,6 +115,7 @@ func (s *Storage) uploadKey(sessID uint64, key string, shouldSplit bool, retryCo
} else {
fileSize = fileInfo.Size()
}
var encryptedData []byte
if shouldSplit {
nRead, err := file.Read(s.startBytes)
@ -133,7 +146,11 @@ func (s *Storage) uploadKey(sessID uint64, key string, shouldSplit bool, retryCo
}
// Compress and save to s3
startReader := bytes.NewBuffer(encryptedData)
if err := s.s3.Upload(s.gzipFile(startReader), key+"s", "application/octet-stream", true); err != nil {
startKey := key
if newVers {
startKey += "s"
}
if err := s.s3.Upload(s.gzipFile(startReader), startKey, "application/octet-stream", true); err != nil {
log.Fatalf("Storage: start upload failed. %v\n", err)
}
// TODO: fix possible error (if we read less then FileSplitSize)