[Storage] added sort operation for session messages (#921)

* feat(backend): added sort operation for session messages
This commit is contained in:
Alexander 2023-01-11 16:23:10 +01:00 committed by GitHub
parent 269ea69206
commit 44f9e4c120
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 125 additions and 3 deletions

View file

@ -21,6 +21,7 @@ type Config struct {
ProducerCloseTimeout int `env:"PRODUCER_CLOSE_TIMEOUT,default=15000"`
UseFailover bool `env:"USE_FAILOVER,default=false"`
MaxFileSize int64 `env:"MAX_FILE_SIZE,default=524288000"`
UseSort bool `env:"USE_SESSION_SORT,default=true"`
UseProfiler bool `env:"PROFILER_ENABLED,default=false"`
}

View file

@ -42,6 +42,8 @@ type Storage struct {
sessionDEVSize syncfloat64.Histogram
readingDOMTime syncfloat64.Histogram
readingDEVTime syncfloat64.Histogram
sortingDOMTime syncfloat64.Histogram
sortingDEVTime syncfloat64.Histogram
archivingDOMTime syncfloat64.Histogram
archivingDEVTime syncfloat64.Histogram
uploadingDOMTime syncfloat64.Histogram
@ -79,6 +81,14 @@ func New(cfg *config.Config, s3 *storage.S3, metrics *monitoring.Metrics) (*Stor
if err != nil {
log.Printf("can't create reading_duration metric: %s", err)
}
sortingDOMTime, err := metrics.RegisterHistogram("sorting_duration")
if err != nil {
log.Printf("can't create reading_duration metric: %s", err)
}
sortingDEVTime, err := metrics.RegisterHistogram("sorting_dt_duration")
if err != nil {
log.Printf("can't create reading_duration metric: %s", err)
}
archivingDOMTime, err := metrics.RegisterHistogram("archiving_duration")
if err != nil {
log.Printf("can't create archiving_duration metric: %s", err)
@ -104,6 +114,8 @@ func New(cfg *config.Config, s3 *storage.S3, metrics *monitoring.Metrics) (*Stor
sessionDEVSize: sessionDevtoolsSize,
readingDOMTime: readingDOMTime,
readingDEVTime: readingDEVTime,
sortingDOMTime: sortingDOMTime,
sortingDEVTime: sortingDEVTime,
archivingDOMTime: archivingDOMTime,
archivingDEVTime: archivingDEVTime,
uploadingDOMTime: uploadingDOMTime,
@ -156,14 +168,41 @@ func (s *Storage) Upload(msg *messages.SessionEnd) (err error) {
return nil
}
func (s *Storage) openSession(filePath string) ([]byte, error) {
func (s *Storage) openSession(filePath string, tp FileType) ([]byte, error) {
// Check file size before download into memory
info, err := os.Stat(filePath)
if err == nil && info.Size() > s.cfg.MaxFileSize {
return nil, fmt.Errorf("big file, size: %d", info.Size())
}
// Read file into memory
return os.ReadFile(filePath)
raw, err := os.ReadFile(filePath)
if err != nil {
return nil, err
}
if !s.cfg.UseSort {
return raw, nil
}
start := time.Now()
res, err := s.sortSessionMessages(raw)
if err != nil {
return nil, fmt.Errorf("can't sort session, err: %s", err)
}
if tp == DOM {
s.sortingDOMTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()))
} else {
s.sortingDEVTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()))
}
return res, nil
}
func (s *Storage) sortSessionMessages(raw []byte) ([]byte, error) {
// Parse messages, sort by index and save result into slice of bytes
unsortedMessages, err := messages.SplitMessages(raw)
if err != nil {
log.Printf("can't sort session, err: %s", err)
return raw, nil
}
return messages.MergeMessages(raw, messages.SortMessages(unsortedMessages)), nil
}
func (s *Storage) prepareSession(path string, tp FileType, task *Task) error {
@ -172,7 +211,7 @@ func (s *Storage) prepareSession(path string, tp FileType, task *Task) error {
path += "devtools"
}
startRead := time.Now()
mob, err := s.openSession(path)
mob, err := s.openSession(path, tp)
if err != nil {
return err
}

View file

@ -1,6 +1,7 @@
package messages
import (
"encoding/binary"
"errors"
"fmt"
"io"
@ -13,6 +14,7 @@ type BytesReader interface {
ReadInt() (int64, error)
ReadBoolean() (bool, error)
ReadString() (string, error)
ReadIndex() (uint64, error)
Data() []byte
Pointer() int64
SetPointer(p int64)
@ -106,6 +108,15 @@ func (m *bytesReaderImpl) ReadString() (string, error) {
return str, nil
}
func (m *bytesReaderImpl) ReadIndex() (uint64, error) {
if len(m.data)-int(m.curr) < 8 {
return 0, fmt.Errorf("out of range")
}
size := binary.LittleEndian.Uint64(m.data[m.curr : m.curr+8])
m.curr += 8
return size, nil
}
func (m *bytesReaderImpl) Data() []byte {
return m.data
}

View file

@ -0,0 +1,71 @@
package messages
import (
"bytes"
"fmt"
"io"
"log"
"sort"
)
type msgInfo struct {
index uint64
start int64
end int64
}
func SplitMessages(data []byte) ([]*msgInfo, error) {
messages := make([]*msgInfo, 0)
reader := NewBytesReader(data)
for {
// Get message start
msgStart := reader.Pointer()
if int(msgStart) >= len(data) {
return messages, nil
}
// Read message index
msgIndex, err := reader.ReadIndex()
if err != nil {
if err != io.EOF {
log.Println(reader.Pointer(), msgStart)
return nil, fmt.Errorf("read message index err: %s", err)
}
return messages, nil
}
// Read message type
msgType, err := reader.ReadUint()
if err != nil {
return nil, fmt.Errorf("read message type err: %s", err)
}
// Read message body
_, err = ReadMessage(msgType, reader)
if err != nil {
return nil, fmt.Errorf("read message body err: %s", err)
}
// Add new message info to messages slice
messages = append(messages, &msgInfo{
index: msgIndex,
start: msgStart,
end: reader.Pointer(),
})
}
}
func SortMessages(messages []*msgInfo) []*msgInfo {
sort.SliceStable(messages, func(i, j int) bool {
return messages[i].index < messages[j].index
})
return messages
}
func MergeMessages(data []byte, messages []*msgInfo) []byte {
sortedSession := bytes.NewBuffer(make([]byte, 0, len(data)))
for _, info := range messages {
sortedSession.Write(data[info.start:info.end])
}
return sortedSession.Bytes()
}