diff --git a/backend/internal/config/storage/config.go b/backend/internal/config/storage/config.go index 559b8dd77..c37e37b24 100644 --- a/backend/internal/config/storage/config.go +++ b/backend/internal/config/storage/config.go @@ -13,6 +13,7 @@ type Config struct { objectstorage.ObjectsConfig FSDir string `env:"FS_DIR,required"` FileSplitSize int `env:"FILE_SPLIT_SIZE,required"` + FileSplitTime time.Duration `env:"FILE_SPLIT_TIME,default=15s"` RetryTimeout time.Duration `env:"RETRY_TIMEOUT,default=2m"` GroupStorage string `env:"GROUP_STORAGE,required"` TopicTrigger string `env:"TOPIC_TRIGGER,required"` @@ -24,7 +25,7 @@ type Config struct { MaxFileSize int64 `env:"MAX_FILE_SIZE,default=524288000"` UseSort bool `env:"USE_SESSION_SORT,default=true"` UseProfiler bool `env:"PROFILER_ENABLED,default=false"` - CompressionAlgo string `env:"COMPRESSION_ALGO,default=gzip"` // none, gzip, brotli, zstd + CompressionAlgo string `env:"COMPRESSION_ALGO,default=zstd"` // none, gzip, brotli, zstd } func New(log logger.Logger) *Config { diff --git a/backend/internal/storage/storage.go b/backend/internal/storage/storage.go index 0829d4136..0c008ddde 100644 --- a/backend/internal/storage/storage.go +++ b/backend/internal/storage/storage.go @@ -20,6 +20,7 @@ import ( "openreplay/backend/pkg/messages" metrics "openreplay/backend/pkg/metrics/storage" "openreplay/backend/pkg/objectstorage" + "openreplay/backend/pkg/pool" ) type FileType string @@ -42,17 +43,18 @@ type Task struct { key string domRaw []byte devRaw []byte + index int domsRawSize float64 domeRawSize float64 devRawSize float64 doms *bytes.Buffer dome *bytes.Buffer dev *bytes.Buffer - isBreakTask bool compression objectstorage.CompressionType } -func (t *Task) SetMob(mob []byte, tp FileType) { +func (t *Task) SetMob(mob []byte, index int, tp FileType) { + t.index = index if tp == DOM { t.domRaw = mob } else { @@ -60,27 +62,21 @@ func (t *Task) SetMob(mob []byte, tp FileType) { } } -func (t *Task) Mob(tp FileType) []byte { +func (t *Task) Mob(tp FileType) ([]byte, int) { if tp == DOM { - return t.domRaw - } - return t.devRaw -} - -func NewBreakTask() *Task { - return &Task{ - isBreakTask: true, + return t.domRaw, t.index } + return t.devRaw, -1 } type Storage struct { - cfg *config.Config - log logger.Logger - objStorage objectstorage.ObjectStorage - startBytes []byte - compressionTasks chan *Task // brotli compression or gzip compression with encryption - uploadingTasks chan *Task // upload to s3 - workersStopped chan struct{} + cfg *config.Config + log logger.Logger + objStorage objectstorage.ObjectStorage + startBytes []byte + splitTime uint64 + processorPool pool.WorkerPool + uploaderPool pool.WorkerPool } func New(cfg *config.Config, log logger.Logger, objStorage objectstorage.ObjectStorage) (*Storage, error) { @@ -90,25 +86,29 @@ func New(cfg *config.Config, log logger.Logger, objStorage objectstorage.ObjectS case objStorage == nil: return nil, fmt.Errorf("object storage is empty") } - newStorage := &Storage{ - cfg: cfg, - log: log, - objStorage: objStorage, - startBytes: make([]byte, cfg.FileSplitSize), - compressionTasks: make(chan *Task, 1), - uploadingTasks: make(chan *Task, 1), - workersStopped: make(chan struct{}), + s := &Storage{ + cfg: cfg, + log: log, + objStorage: objStorage, + startBytes: make([]byte, cfg.FileSplitSize), + splitTime: parseSplitTime(cfg.FileSplitTime), } - go newStorage.compressionWorker() - go newStorage.uploadingWorker() - return newStorage, nil + s.processorPool = pool.NewPool(1, 1, s.doCompression) + s.uploaderPool = pool.NewPool(1, 1, s.uploadSession) + return s, nil +} + +func parseSplitTime(splitTime time.Duration) uint64 { + dur := splitTime.Milliseconds() + if dur < 0 { + return 0 + } + return uint64(dur) } func (s *Storage) Wait() { - // Send stop signal to the first worker - s.compressionTasks <- NewBreakTask() - // Wait stopped signal from the last workers - <-s.workersStopped + s.processorPool.Pause() + s.uploaderPool.Pause() } func (s *Storage) Process(ctx context.Context, msg *messages.SessionEnd) (err error) { @@ -147,12 +147,27 @@ func (s *Storage) Process(ctx context.Context, msg *messages.SessionEnd) (err er return err } - // Send new task to compression worker - s.compressionTasks <- newTask + s.processorPool.Submit(newTask) return nil } -func (s *Storage) openSession(ctx context.Context, sessID, filePath string, tp FileType) ([]byte, error) { +func (s *Storage) prepareSession(path string, tp FileType, task *Task) error { + // Open session file + startRead := time.Now() + mob, index, err := s.openSession(task.ctx, path, tp) + if err != nil { + return err + } + + metrics.RecordSessionReadDuration(float64(time.Now().Sub(startRead).Milliseconds()), tp.String()) + metrics.RecordSessionSize(float64(len(mob)), tp.String()) + + // Put opened session file into task struct + task.SetMob(mob, index, tp) + return nil +} + +func (s *Storage) openSession(ctx context.Context, filePath string, tp FileType) ([]byte, int, error) { if tp == DEV { filePath += "devtools" } @@ -160,26 +175,26 @@ func (s *Storage) openSession(ctx context.Context, sessID, filePath string, tp F info, err := os.Stat(filePath) if err == nil && info.Size() > s.cfg.MaxFileSize { metrics.RecordSkippedSessionSize(float64(info.Size()), tp.String()) - return nil, fmt.Errorf("big file, size: %d", info.Size()) + return nil, -1, fmt.Errorf("big file, size: %d", info.Size()) } // Read file into memory raw, err := os.ReadFile(filePath) if err != nil { - return nil, err + return nil, -1, err } if !s.cfg.UseSort { - return raw, nil + return raw, -1, nil } start := time.Now() - res, err := s.sortSessionMessages(ctx, sessID, raw) + mob, index, err := s.sortSessionMessages(ctx, tp, raw) if err != nil { - return nil, fmt.Errorf("can't sort session, err: %s", err) + return nil, -1, fmt.Errorf("can't sort session, err: %s", err) } metrics.RecordSessionSortDuration(float64(time.Now().Sub(start).Milliseconds()), tp.String()) - return res, nil + return mob, index, nil } -func (s *Storage) sortSessionMessages(ctx context.Context, sessID string, raw []byte) ([]byte, error) { +func (s *Storage) sortSessionMessages(ctx context.Context, tp FileType, raw []byte) ([]byte, int, error) { // Parse messages, sort by index and save result into slice of bytes unsortedMessages, err := messages.SplitMessages(raw) if err != nil { @@ -187,25 +202,11 @@ func (s *Storage) sortSessionMessages(ctx context.Context, sessID string, raw [] s.log.Warn(ctx, err.Error()) } else { s.log.Error(ctx, "can't split session messages: %s", err) - return raw, nil + return raw, -1, nil } } - return messages.MergeMessages(raw, messages.SortMessages(unsortedMessages)), nil -} - -func (s *Storage) prepareSession(path string, tp FileType, task *Task) error { - // Open session file - startRead := time.Now() - mob, err := s.openSession(task.ctx, task.id, path, tp) - if err != nil { - return err - } - metrics.RecordSessionReadDuration(float64(time.Now().Sub(startRead).Milliseconds()), tp.String()) - metrics.RecordSessionSize(float64(len(mob)), tp.String()) - - // Put opened session file into task struct - task.SetMob(mob, tp) - return nil + mob, index := messages.MergeMessages(raw, messages.SortMessages(unsortedMessages), tp == DOM, s.splitTime) + return mob, index, nil } func (s *Storage) setTaskCompression(ctx context.Context) objectstorage.CompressionType { @@ -226,10 +227,10 @@ func (s *Storage) setTaskCompression(ctx context.Context) objectstorage.Compress func (s *Storage) packSession(task *Task, tp FileType) { // Prepare mob file - mob := task.Mob(tp) + mob, index := task.Mob(tp) - // For devtools of small dom file - if tp == DEV || len(mob) <= s.cfg.FileSplitSize { + // For devtools of short sessions + if tp == DEV || index == -1 { // Compression start := time.Now() data := s.compress(task.ctx, mob, task.compression) @@ -259,7 +260,7 @@ func (s *Storage) packSession(task *Task, tp FileType) { go func() { // Compression start := time.Now() - data := s.compress(task.ctx, mob[:s.cfg.FileSplitSize], task.compression) + data := s.compress(task.ctx, mob[:index], task.compression) firstPart = time.Since(start).Milliseconds() // Encryption @@ -268,7 +269,7 @@ func (s *Storage) packSession(task *Task, tp FileType) { firstEncrypt = time.Since(start).Milliseconds() // Record dom start raw size - task.domsRawSize = float64(s.cfg.FileSplitSize) + task.domsRawSize = float64(index) // Finish task wg.Done() @@ -278,7 +279,7 @@ func (s *Storage) packSession(task *Task, tp FileType) { go func() { // Compression start := time.Now() - data := s.compress(task.ctx, mob[s.cfg.FileSplitSize:], task.compression) + data := s.compress(task.ctx, mob[index:], task.compression) secondPart = time.Since(start).Milliseconds() // Encryption @@ -287,7 +288,7 @@ func (s *Storage) packSession(task *Task, tp FileType) { secondEncrypt = time.Since(start).Milliseconds() // Record dom end raw size - task.domeRawSize = float64(len(mob) - s.cfg.FileSplitSize) + task.domeRawSize = float64(len(mob) - index) // Finish task wg.Done() @@ -369,7 +370,8 @@ func (s *Storage) compressZstd(ctx context.Context, data []byte) *bytes.Buffer { return &out } -func (s *Storage) uploadSession(task *Task) { +func (s *Storage) uploadSession(payload interface{}) { + task := payload.(*Task) wg := &sync.WaitGroup{} wg.Add(3) var ( @@ -422,7 +424,8 @@ func (s *Storage) uploadSession(task *Task) { metrics.IncreaseStorageTotalSessions() } -func (s *Storage) doCompression(task *Task) { +func (s *Storage) doCompression(payload interface{}) { + task := payload.(*Task) wg := &sync.WaitGroup{} wg.Add(2) go func() { @@ -434,31 +437,5 @@ func (s *Storage) doCompression(task *Task) { wg.Done() }() wg.Wait() - s.uploadingTasks <- task -} - -func (s *Storage) compressionWorker() { - for { - select { - case task := <-s.compressionTasks: - if task.isBreakTask { - s.uploadingTasks <- task - continue - } - s.doCompression(task) - } - } -} - -func (s *Storage) uploadingWorker() { - for { - select { - case task := <-s.uploadingTasks: - if task.isBreakTask { - s.workersStopped <- struct{}{} - continue - } - s.uploadSession(task) - } - } + s.uploaderPool.Submit(task) } diff --git a/backend/pkg/messages/session-iterator.go b/backend/pkg/messages/session-iterator.go index fa7a08142..4d0bfd1b3 100644 --- a/backend/pkg/messages/session-iterator.go +++ b/backend/pkg/messages/session-iterator.go @@ -95,21 +95,37 @@ func SortMessages(messages []*msgInfo) []*msgInfo { return messages } -func MergeMessages(data []byte, messages []*msgInfo) []byte { +func MergeMessages(data []byte, messages []*msgInfo, doSplit bool, splitDuration uint64) ([]byte, int) { sortedSession := bytes.NewBuffer(make([]byte, 0, len(data))) // Add maximum possible index value to the start of the session to inform player about new version of mob file sortedSession.Write([]byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}) - var lastTsIndex int = -1 // not set + var ( + firstTimestamp uint64 = 0 + lastTsIndex = -1 + splitIndex = -1 + ) + + if splitDuration == 0 { + doSplit = false + } + for i, info := range messages { if info.msgType == MsgTimestamp { + if firstTimestamp == 0 { + firstTimestamp = info.timestamp + } // Save index of last timestamp message and continue to read next message lastTsIndex = i continue } - // Write last timestamp message if it exists if lastTsIndex != -1 { + // Try to split mob file just before timestamp message + if splitIndex < 0 && info.timestamp-firstTimestamp > splitDuration { + splitIndex = sortedSession.Len() + } + // Write last timestamp message to mob file tsInfo := messages[lastTsIndex] sortedSession.Write(data[tsInfo.start:tsInfo.end]) lastTsIndex = -1 @@ -118,5 +134,9 @@ func MergeMessages(data []byte, messages []*msgInfo) []byte { // Write current message sortedSession.Write(data[info.start:info.end]) } - return sortedSession.Bytes() + + if !doSplit { + splitIndex = -1 + } + return sortedSession.Bytes(), splitIndex }