[Sink] splitting mob files without folders (#824)

* feat(backend): split mob file into 2 without folders
This commit is contained in:
Alexander 2022-11-17 16:15:25 +01:00 committed by GitHub
parent 84d4de65a3
commit 91709ac909
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 130 additions and 148 deletions

View file

@ -3,10 +3,8 @@ package main
import (
"context"
"log"
"openreplay/backend/pkg/pprof"
"os"
"os/signal"
"strings"
"syscall"
"time"
@ -16,6 +14,7 @@ import (
"openreplay/backend/internal/storage"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/monitoring"
"openreplay/backend/pkg/pprof"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/url/assets"
)
@ -64,6 +63,9 @@ func main() {
if err := producer.Produce(cfg.TopicTrigger, msg.SessionID(), msg.Encode()); err != nil {
log.Printf("can't send SessionEnd to trigger topic: %s; sessID: %d", err, msg.SessionID())
}
if err := writer.Close(msg.SessionID()); err != nil {
log.Printf("can't close session file: %s", err)
}
return
}
@ -98,39 +100,18 @@ func main() {
// Write encoded message with index to session file
data := msg.EncodeWithIndex()
if data == nil {
log.Printf("can't encode with index, err: %s", err)
return
}
wasWritten := false // To avoid timestamp duplicates in original mob file
// Write message to file
if messages.IsDOMType(msg.TypeID()) {
if err := writer.WriteDOM(msg.SessionID(), data); err != nil {
if strings.Contains(err.Error(), "not a directory") {
// Trying to write data to mob file by original path
oldErr := writer.WriteMOB(msg.SessionID(), data)
if oldErr != nil {
log.Printf("MOB Writeer error: %s, prev DOM error: %s, info: %s", oldErr, err, msg.Meta().Batch().Info())
} else {
wasWritten = true
}
} else {
log.Printf("DOM Writer error: %s, info: %s", err, msg.Meta().Batch().Info())
}
log.Printf("Writer error: %v\n", err)
}
}
if !messages.IsDOMType(msg.TypeID()) || msg.TypeID() == messages.MsgTimestamp {
// TODO: write only necessary timestamps
if err := writer.WriteDEV(msg.SessionID(), data); err != nil {
if strings.Contains(err.Error(), "not a directory") {
if !wasWritten {
// Trying to write data to mob file by original path
oldErr := writer.WriteMOB(msg.SessionID(), data)
if oldErr != nil {
log.Printf("MOB Writeer error: %s, prev DEV error: %s, info: %s", oldErr, err, msg.Meta().Batch().Info())
}
}
} else {
log.Printf("Devtools Writer error: %s, info: %s", err, msg.Meta().Batch().Info())
}
log.Printf("Writer error: %v\n", err)
}
}
@ -167,13 +148,17 @@ func main() {
consumer.Close()
os.Exit(0)
case <-tick:
counter.Print()
s := time.Now()
if err := writer.SyncAll(); err != nil {
log.Fatalf("sync error: %v\n", err)
}
counter.Print()
dur := time.Now().Sub(s).Milliseconds()
s = time.Now()
if err := consumer.Commit(); err != nil {
log.Printf("can't commit messages: %s", err)
}
log.Printf("sync: %d, commit: %d, writer: %s", dur, time.Now().Sub(s).Milliseconds(), writer.Info())
default:
err := consumer.ConsumeNext()
if err != nil {

View file

@ -11,7 +11,6 @@ type Config struct {
S3Region string `env:"AWS_REGION_WEB,required"`
S3Bucket string `env:"S3_BUCKET_WEB,required"`
FSDir string `env:"FS_DIR,required"`
FSCleanHRS int `env:"FS_CLEAN_HRS,required"`
FileSplitSize int `env:"FILE_SPLIT_SIZE,required"`
RetryTimeout time.Duration `env:"RETRY_TIMEOUT,default=2m"`
GroupStorage string `env:"GROUP_STORAGE,required"`
@ -21,6 +20,7 @@ type Config struct {
DeleteTimeout time.Duration `env:"DELETE_TIMEOUT,default=48h"`
ProducerCloseTimeout int `env:"PRODUCER_CLOSE_TIMEOUT,default=15000"`
UseFailover bool `env:"USE_FAILOVER,default=false"`
MaxFileSize int64 `env:"MAX_FILE_SIZE,default=524288000"`
}
func New() *Config {

View file

@ -1,38 +1,53 @@
package oswriter
import (
"errors"
"log"
"fmt"
"math"
"os"
"path/filepath"
"strconv"
"time"
)
type FileType int
const (
DOM FileType = 1
DEV FileType = 2
)
type Writer struct {
ulimit int
dir string
files map[string]*os.File
atimes map[string]int64
ulimit int
dir string
files map[uint64]*os.File
devtools map[uint64]*os.File
atimes map[uint64]int64
}
func NewWriter(ulimit uint16, dir string) *Writer {
return &Writer{
ulimit: int(ulimit),
dir: dir + "/",
files: make(map[string]*os.File),
atimes: make(map[string]int64),
ulimit: int(ulimit),
dir: dir + "/",
files: make(map[uint64]*os.File, 1024),
devtools: make(map[uint64]*os.File, 1024),
atimes: make(map[uint64]int64, 1024),
}
}
func (w *Writer) open(fname string) (*os.File, error) {
file, ok := w.files[fname]
if ok {
return file, nil
func (w *Writer) open(key uint64, mode FileType) (*os.File, error) {
if mode == DOM {
file, ok := w.files[key]
if ok {
return file, nil
}
} else {
file, ok := w.devtools[key]
if ok {
return file, nil
}
}
if len(w.atimes) == w.ulimit {
var m_k string
if len(w.atimes) >= w.ulimit {
var m_k uint64
var m_t int64 = math.MaxInt64
for k, t := range w.atimes {
if t < m_t {
@ -40,38 +55,30 @@ func (w *Writer) open(fname string) (*os.File, error) {
m_t = t
}
}
if err := w.close(m_k); err != nil {
if err := w.Close(m_k); err != nil {
return nil, err
}
}
// mkdir if not exist
pathTo := w.dir + filepath.Dir(fname)
if info, err := os.Stat(pathTo); os.IsNotExist(err) {
if err := os.MkdirAll(pathTo, 0755); err != nil {
log.Printf("os.MkdirAll error: %s", err)
}
} else {
if err != nil {
return nil, err
}
if !info.IsDir() {
return nil, errors.New("not a directory")
}
filePath := w.dir + strconv.FormatUint(key, 10)
if mode == DEV {
filePath += "devtools"
}
file, err := os.OpenFile(w.dir+fname, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
log.Printf("os.OpenFile error: %s", err)
return nil, err
}
w.files[fname] = file
w.atimes[fname] = time.Now().Unix()
if mode == DOM {
w.files[key] = file
} else {
w.devtools[key] = file
}
w.atimes[key] = time.Now().Unix()
return file, nil
}
func (w *Writer) close(fname string) error {
file := w.files[fname]
func (w *Writer) Close(key uint64) error {
// Close dom file
file := w.files[key]
if file == nil {
return nil
}
@ -81,23 +88,33 @@ func (w *Writer) close(fname string) error {
if err := file.Close(); err != nil {
return err
}
delete(w.files, fname)
delete(w.atimes, fname)
delete(w.files, key)
delete(w.atimes, key)
// Close dev file
file = w.devtools[key]
if file == nil {
return nil
}
if err := file.Sync(); err != nil {
return err
}
if err := file.Close(); err != nil {
return err
}
delete(w.devtools, key)
return nil
}
func (w *Writer) WriteDOM(sid uint64, data []byte) error {
return w.write(strconv.FormatUint(sid, 10)+"/dom.mob", data)
func (w *Writer) WriteDOM(key uint64, data []byte) error {
return w.Write(key, DOM, data)
}
func (w *Writer) WriteDEV(sid uint64, data []byte) error {
return w.write(strconv.FormatUint(sid, 10)+"/devtools.mob", data)
func (w *Writer) WriteDEV(key uint64, data []byte) error {
return w.Write(key, DEV, data)
}
func (w *Writer) WriteMOB(sid uint64, data []byte) error {
// Use session id as a file name without directory
fname := strconv.FormatUint(sid, 10)
file, err := w.openWithoutDir(fname)
func (w *Writer) Write(key uint64, mode FileType, data []byte) error {
file, err := w.open(key, mode)
if err != nil {
return err
}
@ -105,49 +122,17 @@ func (w *Writer) WriteMOB(sid uint64, data []byte) error {
return err
}
func (w *Writer) write(fname string, data []byte) error {
file, err := w.open(fname)
if err != nil {
return err
}
_, err = file.Write(data)
return err
}
func (w *Writer) openWithoutDir(fname string) (*os.File, error) {
file, ok := w.files[fname]
if ok {
return file, nil
}
if len(w.atimes) == w.ulimit {
var m_k string
var m_t int64 = math.MaxInt64
for k, t := range w.atimes {
if t < m_t {
m_k = k
m_t = t
}
}
if err := w.close(m_k); err != nil {
return nil, err
}
}
file, err := os.OpenFile(w.dir+fname, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return nil, err
}
w.files[fname] = file
w.atimes[fname] = time.Now().Unix()
return file, nil
}
func (w *Writer) SyncAll() error {
for _, file := range w.files {
if err := file.Sync(); err != nil {
return err
}
}
for _, file := range w.devtools {
if err := file.Sync(); err != nil {
return err
}
}
return nil
}
@ -161,6 +146,19 @@ func (w *Writer) CloseAll() error {
}
}
w.files = nil
for _, file := range w.devtools {
if err := file.Sync(); err != nil {
return err
}
if err := file.Close(); err != nil {
return err
}
}
w.devtools = nil
w.atimes = nil
return nil
}
func (w *Writer) Info() string {
return fmt.Sprintf("dom: %d, dev: %d", len(w.files), len(w.devtools))
}

View file

@ -13,7 +13,6 @@ import (
"openreplay/backend/pkg/storage"
"os"
"strconv"
"strings"
"time"
)
@ -71,43 +70,46 @@ func New(cfg *config.Config, s3 *storage.S3, metrics *monitoring.Metrics) (*Stor
}
func (s *Storage) UploadSessionFiles(msg *messages.SessionEnd) error {
sessionDir := strconv.FormatUint(msg.SessionID(), 10)
if err := s.uploadKey(msg.SessionID(), sessionDir+"/dom.mob", true, 5, msg.EncryptionKey); err != nil {
oldErr := s.uploadKey(msg.SessionID(), sessionDir, true, 5, msg.EncryptionKey)
if oldErr != nil {
return fmt.Errorf("upload file error: %s. failed checking mob file using old path: %s", err, oldErr)
}
// Exit method anyway because we don't have dev tools separation in prev version
return nil
}
if err := s.uploadKey(msg.SessionID(), sessionDir+"/devtools.mob", false, 4, msg.EncryptionKey); err != nil {
if err := s.uploadKey(msg.SessionID(), "/dom.mob", true, 5, msg.EncryptionKey); err != nil {
return err
}
if err := s.uploadKey(msg.SessionID(), "/devtools.mob", false, 4, msg.EncryptionKey); err != nil {
log.Printf("can't find devtools for session: %d, err: %s", msg.SessionID(), err)
}
return nil
}
// TODO: make a bit cleaner
func (s *Storage) uploadKey(sessID uint64, key string, shouldSplit bool, retryCount int, encryptionKey string) error {
// TODO: make a bit cleaner.
// TODO: Of course, I'll do!
func (s *Storage) uploadKey(sessID uint64, suffix string, shouldSplit bool, retryCount int, encryptionKey string) error {
if retryCount <= 0 {
return nil
}
start := time.Now()
file, err := os.Open(s.cfg.FSDir + "/" + key)
fileName := strconv.FormatUint(sessID, 10)
mobFileName := fileName
if suffix == "/devtools.mob" {
mobFileName += "devtools"
}
filePath := s.cfg.FSDir + "/" + mobFileName
// Check file size before download into memory
info, err := os.Stat(filePath)
if err != nil {
if info.Size() > s.cfg.MaxFileSize {
log.Printf("big file, size: %d, session: %d", info.Size(), sessID)
return nil
}
}
file, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("File open error: %v; sessID: %s, part: %d, sessStart: %s\n",
err, key, sessID%16,
err, fileName, sessID%16,
time.UnixMilli(int64(flakeid.ExtractTimestamp(sessID))),
)
}
defer file.Close()
// Ignore "s" at the end of mob file name for "old" sessions
newVers := false
if strings.Contains(key, "/") {
newVers = true
}
var fileSize int64 = 0
fileInfo, err := file.Stat()
if err != nil {
@ -117,17 +119,18 @@ func (s *Storage) uploadKey(sessID uint64, key string, shouldSplit bool, retryCo
}
var encryptedData []byte
fileName += suffix
if shouldSplit {
nRead, err := file.Read(s.startBytes)
if err != nil {
log.Printf("File read error: %s; sessID: %s, part: %d, sessStart: %s",
err,
key,
fileName,
sessID%16,
time.UnixMilli(int64(flakeid.ExtractTimestamp(sessID))),
)
time.AfterFunc(s.cfg.RetryTimeout, func() {
s.uploadKey(sessID, key, shouldSplit, retryCount-1, encryptionKey)
s.uploadKey(sessID, suffix, shouldSplit, retryCount-1, encryptionKey)
})
return nil
}
@ -146,11 +149,7 @@ func (s *Storage) uploadKey(sessID uint64, key string, shouldSplit bool, retryCo
}
// Compress and save to s3
startReader := bytes.NewBuffer(encryptedData)
startKey := key
if newVers {
startKey += "s"
}
if err := s.s3.Upload(s.gzipFile(startReader), startKey, "application/octet-stream", true); err != nil {
if err := s.s3.Upload(s.gzipFile(startReader), fileName+"s", "application/octet-stream", true); err != nil {
log.Fatalf("Storage: start upload failed. %v\n", err)
}
// TODO: fix possible error (if we read less then FileSplitSize)
@ -161,7 +160,7 @@ func (s *Storage) uploadKey(sessID uint64, key string, shouldSplit bool, retryCo
if err != nil {
log.Printf("File read error: %s; sessID: %s, part: %d, sessStart: %s",
err,
key,
fileName,
sessID%16,
time.UnixMilli(int64(flakeid.ExtractTimestamp(sessID))),
)
@ -183,7 +182,7 @@ func (s *Storage) uploadKey(sessID uint64, key string, shouldSplit bool, retryCo
}
// Compress and save to s3
endReader := bytes.NewBuffer(encryptedData)
if err := s.s3.Upload(s.gzipFile(endReader), key+"e", "application/octet-stream", true); err != nil {
if err := s.s3.Upload(s.gzipFile(endReader), fileName+"e", "application/octet-stream", true); err != nil {
log.Fatalf("Storage: end upload failed. %v\n", err)
}
}
@ -195,7 +194,7 @@ func (s *Storage) uploadKey(sessID uint64, key string, shouldSplit bool, retryCo
if err != nil {
log.Printf("File read error: %s; sessID: %s, part: %d, sessStart: %s",
err,
key,
fileName,
sessID%16,
time.UnixMilli(int64(flakeid.ExtractTimestamp(sessID))),
)
@ -216,7 +215,7 @@ func (s *Storage) uploadKey(sessID uint64, key string, shouldSplit bool, retryCo
encryptedData = fileData
}
endReader := bytes.NewBuffer(encryptedData)
if err := s.s3.Upload(s.gzipFile(endReader), key+"s", "application/octet-stream", true); err != nil {
if err := s.s3.Upload(s.gzipFile(endReader), fileName, "application/octet-stream", true); err != nil {
log.Fatalf("Storage: end upload failed. %v\n", err)
}
s.archivingTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()))