* feat(backend): try to split mob files by ts

* feat(backend): removed debug code and used worker pool
This commit is contained in:
Alexander 2024-04-03 17:10:44 +02:00 committed by GitHub
parent a8e3f088bd
commit f7744a0c2c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 99 additions and 101 deletions

View file

@ -13,6 +13,7 @@ type Config struct {
objectstorage.ObjectsConfig
FSDir string `env:"FS_DIR,required"`
FileSplitSize int `env:"FILE_SPLIT_SIZE,required"`
FileSplitTime time.Duration `env:"FILE_SPLIT_TIME,default=15s"`
RetryTimeout time.Duration `env:"RETRY_TIMEOUT,default=2m"`
GroupStorage string `env:"GROUP_STORAGE,required"`
TopicTrigger string `env:"TOPIC_TRIGGER,required"`
@ -24,7 +25,7 @@ type Config struct {
MaxFileSize int64 `env:"MAX_FILE_SIZE,default=524288000"`
UseSort bool `env:"USE_SESSION_SORT,default=true"`
UseProfiler bool `env:"PROFILER_ENABLED,default=false"`
CompressionAlgo string `env:"COMPRESSION_ALGO,default=gzip"` // none, gzip, brotli, zstd
CompressionAlgo string `env:"COMPRESSION_ALGO,default=zstd"` // none, gzip, brotli, zstd
}
func New(log logger.Logger) *Config {

View file

@ -20,6 +20,7 @@ import (
"openreplay/backend/pkg/messages"
metrics "openreplay/backend/pkg/metrics/storage"
"openreplay/backend/pkg/objectstorage"
"openreplay/backend/pkg/pool"
)
type FileType string
@ -42,17 +43,18 @@ type Task struct {
key string
domRaw []byte
devRaw []byte
index int
domsRawSize float64
domeRawSize float64
devRawSize float64
doms *bytes.Buffer
dome *bytes.Buffer
dev *bytes.Buffer
isBreakTask bool
compression objectstorage.CompressionType
}
func (t *Task) SetMob(mob []byte, tp FileType) {
func (t *Task) SetMob(mob []byte, index int, tp FileType) {
t.index = index
if tp == DOM {
t.domRaw = mob
} else {
@ -60,27 +62,21 @@ func (t *Task) SetMob(mob []byte, tp FileType) {
}
}
func (t *Task) Mob(tp FileType) []byte {
func (t *Task) Mob(tp FileType) ([]byte, int) {
if tp == DOM {
return t.domRaw
}
return t.devRaw
}
func NewBreakTask() *Task {
return &Task{
isBreakTask: true,
return t.domRaw, t.index
}
return t.devRaw, -1
}
type Storage struct {
cfg *config.Config
log logger.Logger
objStorage objectstorage.ObjectStorage
startBytes []byte
compressionTasks chan *Task // brotli compression or gzip compression with encryption
uploadingTasks chan *Task // upload to s3
workersStopped chan struct{}
cfg *config.Config
log logger.Logger
objStorage objectstorage.ObjectStorage
startBytes []byte
splitTime uint64
processorPool pool.WorkerPool
uploaderPool pool.WorkerPool
}
func New(cfg *config.Config, log logger.Logger, objStorage objectstorage.ObjectStorage) (*Storage, error) {
@ -90,25 +86,29 @@ func New(cfg *config.Config, log logger.Logger, objStorage objectstorage.ObjectS
case objStorage == nil:
return nil, fmt.Errorf("object storage is empty")
}
newStorage := &Storage{
cfg: cfg,
log: log,
objStorage: objStorage,
startBytes: make([]byte, cfg.FileSplitSize),
compressionTasks: make(chan *Task, 1),
uploadingTasks: make(chan *Task, 1),
workersStopped: make(chan struct{}),
s := &Storage{
cfg: cfg,
log: log,
objStorage: objStorage,
startBytes: make([]byte, cfg.FileSplitSize),
splitTime: parseSplitTime(cfg.FileSplitTime),
}
go newStorage.compressionWorker()
go newStorage.uploadingWorker()
return newStorage, nil
s.processorPool = pool.NewPool(1, 1, s.doCompression)
s.uploaderPool = pool.NewPool(1, 1, s.uploadSession)
return s, nil
}
func parseSplitTime(splitTime time.Duration) uint64 {
dur := splitTime.Milliseconds()
if dur < 0 {
return 0
}
return uint64(dur)
}
func (s *Storage) Wait() {
// Send stop signal to the first worker
s.compressionTasks <- NewBreakTask()
// Wait stopped signal from the last workers
<-s.workersStopped
s.processorPool.Pause()
s.uploaderPool.Pause()
}
func (s *Storage) Process(ctx context.Context, msg *messages.SessionEnd) (err error) {
@ -147,12 +147,27 @@ func (s *Storage) Process(ctx context.Context, msg *messages.SessionEnd) (err er
return err
}
// Send new task to compression worker
s.compressionTasks <- newTask
s.processorPool.Submit(newTask)
return nil
}
func (s *Storage) openSession(ctx context.Context, sessID, filePath string, tp FileType) ([]byte, error) {
func (s *Storage) prepareSession(path string, tp FileType, task *Task) error {
// Open session file
startRead := time.Now()
mob, index, err := s.openSession(task.ctx, path, tp)
if err != nil {
return err
}
metrics.RecordSessionReadDuration(float64(time.Now().Sub(startRead).Milliseconds()), tp.String())
metrics.RecordSessionSize(float64(len(mob)), tp.String())
// Put opened session file into task struct
task.SetMob(mob, index, tp)
return nil
}
func (s *Storage) openSession(ctx context.Context, filePath string, tp FileType) ([]byte, int, error) {
if tp == DEV {
filePath += "devtools"
}
@ -160,26 +175,26 @@ func (s *Storage) openSession(ctx context.Context, sessID, filePath string, tp F
info, err := os.Stat(filePath)
if err == nil && info.Size() > s.cfg.MaxFileSize {
metrics.RecordSkippedSessionSize(float64(info.Size()), tp.String())
return nil, fmt.Errorf("big file, size: %d", info.Size())
return nil, -1, fmt.Errorf("big file, size: %d", info.Size())
}
// Read file into memory
raw, err := os.ReadFile(filePath)
if err != nil {
return nil, err
return nil, -1, err
}
if !s.cfg.UseSort {
return raw, nil
return raw, -1, nil
}
start := time.Now()
res, err := s.sortSessionMessages(ctx, sessID, raw)
mob, index, err := s.sortSessionMessages(ctx, tp, raw)
if err != nil {
return nil, fmt.Errorf("can't sort session, err: %s", err)
return nil, -1, fmt.Errorf("can't sort session, err: %s", err)
}
metrics.RecordSessionSortDuration(float64(time.Now().Sub(start).Milliseconds()), tp.String())
return res, nil
return mob, index, nil
}
func (s *Storage) sortSessionMessages(ctx context.Context, sessID string, raw []byte) ([]byte, error) {
func (s *Storage) sortSessionMessages(ctx context.Context, tp FileType, raw []byte) ([]byte, int, error) {
// Parse messages, sort by index and save result into slice of bytes
unsortedMessages, err := messages.SplitMessages(raw)
if err != nil {
@ -187,25 +202,11 @@ func (s *Storage) sortSessionMessages(ctx context.Context, sessID string, raw []
s.log.Warn(ctx, err.Error())
} else {
s.log.Error(ctx, "can't split session messages: %s", err)
return raw, nil
return raw, -1, nil
}
}
return messages.MergeMessages(raw, messages.SortMessages(unsortedMessages)), nil
}
func (s *Storage) prepareSession(path string, tp FileType, task *Task) error {
// Open session file
startRead := time.Now()
mob, err := s.openSession(task.ctx, task.id, path, tp)
if err != nil {
return err
}
metrics.RecordSessionReadDuration(float64(time.Now().Sub(startRead).Milliseconds()), tp.String())
metrics.RecordSessionSize(float64(len(mob)), tp.String())
// Put opened session file into task struct
task.SetMob(mob, tp)
return nil
mob, index := messages.MergeMessages(raw, messages.SortMessages(unsortedMessages), tp == DOM, s.splitTime)
return mob, index, nil
}
func (s *Storage) setTaskCompression(ctx context.Context) objectstorage.CompressionType {
@ -226,10 +227,10 @@ func (s *Storage) setTaskCompression(ctx context.Context) objectstorage.Compress
func (s *Storage) packSession(task *Task, tp FileType) {
// Prepare mob file
mob := task.Mob(tp)
mob, index := task.Mob(tp)
// For devtools of small dom file
if tp == DEV || len(mob) <= s.cfg.FileSplitSize {
// For devtools of short sessions
if tp == DEV || index == -1 {
// Compression
start := time.Now()
data := s.compress(task.ctx, mob, task.compression)
@ -259,7 +260,7 @@ func (s *Storage) packSession(task *Task, tp FileType) {
go func() {
// Compression
start := time.Now()
data := s.compress(task.ctx, mob[:s.cfg.FileSplitSize], task.compression)
data := s.compress(task.ctx, mob[:index], task.compression)
firstPart = time.Since(start).Milliseconds()
// Encryption
@ -268,7 +269,7 @@ func (s *Storage) packSession(task *Task, tp FileType) {
firstEncrypt = time.Since(start).Milliseconds()
// Record dom start raw size
task.domsRawSize = float64(s.cfg.FileSplitSize)
task.domsRawSize = float64(index)
// Finish task
wg.Done()
@ -278,7 +279,7 @@ func (s *Storage) packSession(task *Task, tp FileType) {
go func() {
// Compression
start := time.Now()
data := s.compress(task.ctx, mob[s.cfg.FileSplitSize:], task.compression)
data := s.compress(task.ctx, mob[index:], task.compression)
secondPart = time.Since(start).Milliseconds()
// Encryption
@ -287,7 +288,7 @@ func (s *Storage) packSession(task *Task, tp FileType) {
secondEncrypt = time.Since(start).Milliseconds()
// Record dom end raw size
task.domeRawSize = float64(len(mob) - s.cfg.FileSplitSize)
task.domeRawSize = float64(len(mob) - index)
// Finish task
wg.Done()
@ -369,7 +370,8 @@ func (s *Storage) compressZstd(ctx context.Context, data []byte) *bytes.Buffer {
return &out
}
func (s *Storage) uploadSession(task *Task) {
func (s *Storage) uploadSession(payload interface{}) {
task := payload.(*Task)
wg := &sync.WaitGroup{}
wg.Add(3)
var (
@ -422,7 +424,8 @@ func (s *Storage) uploadSession(task *Task) {
metrics.IncreaseStorageTotalSessions()
}
func (s *Storage) doCompression(task *Task) {
func (s *Storage) doCompression(payload interface{}) {
task := payload.(*Task)
wg := &sync.WaitGroup{}
wg.Add(2)
go func() {
@ -434,31 +437,5 @@ func (s *Storage) doCompression(task *Task) {
wg.Done()
}()
wg.Wait()
s.uploadingTasks <- task
}
func (s *Storage) compressionWorker() {
for {
select {
case task := <-s.compressionTasks:
if task.isBreakTask {
s.uploadingTasks <- task
continue
}
s.doCompression(task)
}
}
}
func (s *Storage) uploadingWorker() {
for {
select {
case task := <-s.uploadingTasks:
if task.isBreakTask {
s.workersStopped <- struct{}{}
continue
}
s.uploadSession(task)
}
}
s.uploaderPool.Submit(task)
}

View file

@ -95,21 +95,37 @@ func SortMessages(messages []*msgInfo) []*msgInfo {
return messages
}
func MergeMessages(data []byte, messages []*msgInfo) []byte {
func MergeMessages(data []byte, messages []*msgInfo, doSplit bool, splitDuration uint64) ([]byte, int) {
sortedSession := bytes.NewBuffer(make([]byte, 0, len(data)))
// Add maximum possible index value to the start of the session to inform player about new version of mob file
sortedSession.Write([]byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff})
var lastTsIndex int = -1 // not set
var (
firstTimestamp uint64 = 0
lastTsIndex = -1
splitIndex = -1
)
if splitDuration == 0 {
doSplit = false
}
for i, info := range messages {
if info.msgType == MsgTimestamp {
if firstTimestamp == 0 {
firstTimestamp = info.timestamp
}
// Save index of last timestamp message and continue to read next message
lastTsIndex = i
continue
}
// Write last timestamp message if it exists
if lastTsIndex != -1 {
// Try to split mob file just before timestamp message
if splitIndex < 0 && info.timestamp-firstTimestamp > splitDuration {
splitIndex = sortedSession.Len()
}
// Write last timestamp message to mob file
tsInfo := messages[lastTsIndex]
sortedSession.Write(data[tsInfo.start:tsInfo.end])
lastTsIndex = -1
@ -118,5 +134,9 @@ func MergeMessages(data []byte, messages []*msgInfo) []byte {
// Write current message
sortedSession.Write(data[info.start:info.end])
}
return sortedSession.Bytes()
if !doSplit {
splitIndex = -1
}
return sortedSession.Bytes(), splitIndex
}