From 44f9e4c1203055524a8b56e72a2acf181dd4d7e6 Mon Sep 17 00:00:00 2001 From: Alexander Date: Wed, 11 Jan 2023 16:23:10 +0100 Subject: [PATCH] [Storage] added sort operation for session messages (#921) * feat(backend): added sort operation for session messages --- backend/internal/config/storage/config.go | 1 + backend/internal/storage/storage.go | 45 +++++++++++++- backend/pkg/messages/bytes.go | 11 ++++ backend/pkg/messages/session-iterator.go | 71 +++++++++++++++++++++++ 4 files changed, 125 insertions(+), 3 deletions(-) create mode 100644 backend/pkg/messages/session-iterator.go diff --git a/backend/internal/config/storage/config.go b/backend/internal/config/storage/config.go index 63c595f62..ca4ff8028 100644 --- a/backend/internal/config/storage/config.go +++ b/backend/internal/config/storage/config.go @@ -21,6 +21,7 @@ type Config struct { ProducerCloseTimeout int `env:"PRODUCER_CLOSE_TIMEOUT,default=15000"` UseFailover bool `env:"USE_FAILOVER,default=false"` MaxFileSize int64 `env:"MAX_FILE_SIZE,default=524288000"` + UseSort bool `env:"USE_SESSION_SORT,default=true"` UseProfiler bool `env:"PROFILER_ENABLED,default=false"` } diff --git a/backend/internal/storage/storage.go b/backend/internal/storage/storage.go index 3b315561d..fbe9e2228 100644 --- a/backend/internal/storage/storage.go +++ b/backend/internal/storage/storage.go @@ -42,6 +42,8 @@ type Storage struct { sessionDEVSize syncfloat64.Histogram readingDOMTime syncfloat64.Histogram readingDEVTime syncfloat64.Histogram + sortingDOMTime syncfloat64.Histogram + sortingDEVTime syncfloat64.Histogram archivingDOMTime syncfloat64.Histogram archivingDEVTime syncfloat64.Histogram uploadingDOMTime syncfloat64.Histogram @@ -79,6 +81,14 @@ func New(cfg *config.Config, s3 *storage.S3, metrics *monitoring.Metrics) (*Stor if err != nil { log.Printf("can't create reading_duration metric: %s", err) } + sortingDOMTime, err := metrics.RegisterHistogram("sorting_duration") + if err != nil { + log.Printf("can't create reading_duration metric: %s", err) + } + sortingDEVTime, err := metrics.RegisterHistogram("sorting_dt_duration") + if err != nil { + log.Printf("can't create reading_duration metric: %s", err) + } archivingDOMTime, err := metrics.RegisterHistogram("archiving_duration") if err != nil { log.Printf("can't create archiving_duration metric: %s", err) @@ -104,6 +114,8 @@ func New(cfg *config.Config, s3 *storage.S3, metrics *monitoring.Metrics) (*Stor sessionDEVSize: sessionDevtoolsSize, readingDOMTime: readingDOMTime, readingDEVTime: readingDEVTime, + sortingDOMTime: sortingDOMTime, + sortingDEVTime: sortingDEVTime, archivingDOMTime: archivingDOMTime, archivingDEVTime: archivingDEVTime, uploadingDOMTime: uploadingDOMTime, @@ -156,14 +168,41 @@ func (s *Storage) Upload(msg *messages.SessionEnd) (err error) { return nil } -func (s *Storage) openSession(filePath string) ([]byte, error) { +func (s *Storage) openSession(filePath string, tp FileType) ([]byte, error) { // Check file size before download into memory info, err := os.Stat(filePath) if err == nil && info.Size() > s.cfg.MaxFileSize { return nil, fmt.Errorf("big file, size: %d", info.Size()) } // Read file into memory - return os.ReadFile(filePath) + raw, err := os.ReadFile(filePath) + if err != nil { + return nil, err + } + if !s.cfg.UseSort { + return raw, nil + } + start := time.Now() + res, err := s.sortSessionMessages(raw) + if err != nil { + return nil, fmt.Errorf("can't sort session, err: %s", err) + } + if tp == DOM { + s.sortingDOMTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds())) + } else { + s.sortingDEVTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds())) + } + return res, nil +} + +func (s *Storage) sortSessionMessages(raw []byte) ([]byte, error) { + // Parse messages, sort by index and save result into slice of bytes + unsortedMessages, err := messages.SplitMessages(raw) + if err != nil { + log.Printf("can't sort session, err: %s", err) + return raw, nil + } + return messages.MergeMessages(raw, messages.SortMessages(unsortedMessages)), nil } func (s *Storage) prepareSession(path string, tp FileType, task *Task) error { @@ -172,7 +211,7 @@ func (s *Storage) prepareSession(path string, tp FileType, task *Task) error { path += "devtools" } startRead := time.Now() - mob, err := s.openSession(path) + mob, err := s.openSession(path, tp) if err != nil { return err } diff --git a/backend/pkg/messages/bytes.go b/backend/pkg/messages/bytes.go index 0576201ea..00d161d97 100644 --- a/backend/pkg/messages/bytes.go +++ b/backend/pkg/messages/bytes.go @@ -1,6 +1,7 @@ package messages import ( + "encoding/binary" "errors" "fmt" "io" @@ -13,6 +14,7 @@ type BytesReader interface { ReadInt() (int64, error) ReadBoolean() (bool, error) ReadString() (string, error) + ReadIndex() (uint64, error) Data() []byte Pointer() int64 SetPointer(p int64) @@ -106,6 +108,15 @@ func (m *bytesReaderImpl) ReadString() (string, error) { return str, nil } +func (m *bytesReaderImpl) ReadIndex() (uint64, error) { + if len(m.data)-int(m.curr) < 8 { + return 0, fmt.Errorf("out of range") + } + size := binary.LittleEndian.Uint64(m.data[m.curr : m.curr+8]) + m.curr += 8 + return size, nil +} + func (m *bytesReaderImpl) Data() []byte { return m.data } diff --git a/backend/pkg/messages/session-iterator.go b/backend/pkg/messages/session-iterator.go new file mode 100644 index 000000000..45daae4b8 --- /dev/null +++ b/backend/pkg/messages/session-iterator.go @@ -0,0 +1,71 @@ +package messages + +import ( + "bytes" + "fmt" + "io" + "log" + "sort" +) + +type msgInfo struct { + index uint64 + start int64 + end int64 +} + +func SplitMessages(data []byte) ([]*msgInfo, error) { + messages := make([]*msgInfo, 0) + reader := NewBytesReader(data) + for { + // Get message start + msgStart := reader.Pointer() + if int(msgStart) >= len(data) { + return messages, nil + } + + // Read message index + msgIndex, err := reader.ReadIndex() + if err != nil { + if err != io.EOF { + log.Println(reader.Pointer(), msgStart) + return nil, fmt.Errorf("read message index err: %s", err) + } + return messages, nil + } + + // Read message type + msgType, err := reader.ReadUint() + if err != nil { + return nil, fmt.Errorf("read message type err: %s", err) + } + + // Read message body + _, err = ReadMessage(msgType, reader) + if err != nil { + return nil, fmt.Errorf("read message body err: %s", err) + } + + // Add new message info to messages slice + messages = append(messages, &msgInfo{ + index: msgIndex, + start: msgStart, + end: reader.Pointer(), + }) + } +} + +func SortMessages(messages []*msgInfo) []*msgInfo { + sort.SliceStable(messages, func(i, j int) bool { + return messages[i].index < messages[j].index + }) + return messages +} + +func MergeMessages(data []byte, messages []*msgInfo) []byte { + sortedSession := bytes.NewBuffer(make([]byte, 0, len(data))) + for _, info := range messages { + sortedSession.Write(data[info.start:info.end]) + } + return sortedSession.Bytes() +}