Compression worker (#1233)

* feat(backend): added extra worker for session compression

* feat(backend): debug logs

* feat(backend): added compression ratio metric

* feat(backend): reduced number of duplicate logs

* feat(backend): rewrite workers managment
This commit is contained in:
Alexander 2023-05-04 17:46:43 +02:00 committed by GitHub
parent 742c038ecd
commit 528d1af173
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 116 additions and 42 deletions

View file

@ -35,13 +35,17 @@ func (t FileType) String() string {
}
type Task struct {
id string
key string
domRaw []byte
devRaw []byte
doms *bytes.Buffer
dome *bytes.Buffer
dev *bytes.Buffer
id string
key string
domRaw []byte
devRaw []byte
domsRawSize float64
domeRawSize float64
devRawSize float64
doms *bytes.Buffer
dome *bytes.Buffer
dev *bytes.Buffer
isBreakTask bool
}
func (t *Task) SetMob(mob []byte, tp FileType) {
@ -59,12 +63,19 @@ func (t *Task) Mob(tp FileType) []byte {
return t.devRaw
}
func NewBreakTask() *Task {
return &Task{
isBreakTask: true,
}
}
type Storage struct {
cfg *config.Config
s3 *storage.S3
startBytes []byte
tasks chan *Task
ready chan struct{}
cfg *config.Config
s3 *storage.S3
startBytes []byte
compressionTasks chan *Task // brotli compression or gzip compression with encryption
uploadingTasks chan *Task // upload to s3
workersStopped chan struct{}
}
func New(cfg *config.Config, s3 *storage.S3) (*Storage, error) {
@ -75,24 +86,30 @@ func New(cfg *config.Config, s3 *storage.S3) (*Storage, error) {
return nil, fmt.Errorf("s3 storage is empty")
}
newStorage := &Storage{
cfg: cfg,
s3: s3,
startBytes: make([]byte, cfg.FileSplitSize),
tasks: make(chan *Task, 1),
ready: make(chan struct{}),
cfg: cfg,
s3: s3,
startBytes: make([]byte, cfg.FileSplitSize),
compressionTasks: make(chan *Task, 1),
uploadingTasks: make(chan *Task, 1),
workersStopped: make(chan struct{}),
}
go newStorage.worker()
go newStorage.compressionWorker()
go newStorage.uploadingWorker()
return newStorage, nil
}
func (s *Storage) Wait() {
<-s.ready
// Send stop signal to the first worker
s.compressionTasks <- NewBreakTask()
// Wait stopped signal from the last workers
<-s.workersStopped
}
func (s *Storage) Process(msg *messages.SessionEnd) (err error) {
// Generate file path
sessionID := strconv.FormatUint(msg.SessionID(), 10)
filePath := s.cfg.FSDir + "/" + sessionID
// Prepare sessions
newTask := &Task{
id: sessionID,
@ -121,14 +138,13 @@ func (s *Storage) Process(msg *messages.SessionEnd) (err error) {
}
return err
}
// Send new task to worker
s.tasks <- newTask
// Unload worker
<-s.ready
// Send new task to compression worker
s.compressionTasks <- newTask
return nil
}
func (s *Storage) openSession(filePath string, tp FileType) ([]byte, error) {
func (s *Storage) openSession(sessID, filePath string, tp FileType) ([]byte, error) {
if tp == DEV {
filePath += "devtools"
}
@ -147,7 +163,7 @@ func (s *Storage) openSession(filePath string, tp FileType) ([]byte, error) {
return raw, nil
}
start := time.Now()
res, err := s.sortSessionMessages(raw)
res, err := s.sortSessionMessages(sessID, raw)
if err != nil {
return nil, fmt.Errorf("can't sort session, err: %s", err)
}
@ -155,9 +171,9 @@ func (s *Storage) openSession(filePath string, tp FileType) ([]byte, error) {
return res, nil
}
func (s *Storage) sortSessionMessages(raw []byte) ([]byte, error) {
func (s *Storage) sortSessionMessages(sessID string, raw []byte) ([]byte, error) {
// Parse messages, sort by index and save result into slice of bytes
unsortedMessages, err := messages.SplitMessages(raw)
unsortedMessages, err := messages.SplitMessages(sessID, raw)
if err != nil {
log.Printf("can't sort session, err: %s", err)
return raw, nil
@ -168,7 +184,7 @@ func (s *Storage) sortSessionMessages(raw []byte) ([]byte, error) {
func (s *Storage) prepareSession(path string, tp FileType, task *Task) error {
// Open session file
startRead := time.Now()
mob, err := s.openSession(path, tp)
mob, err := s.openSession(task.id, path, tp)
if err != nil {
return err
}
@ -177,9 +193,6 @@ func (s *Storage) prepareSession(path string, tp FileType, task *Task) error {
// Put opened session file into task struct
task.SetMob(mob, tp)
// Encrypt and compress session
s.packSession(task, tp)
return nil
}
@ -267,8 +280,12 @@ func (s *Storage) packSessionBetter(task *Task, tp FileType) {
if tp == DOM {
task.doms = result
// Record full dom (start) raw size
task.domsRawSize = float64(len(mob))
} else {
task.dev = result
// Record dev raw size
task.devRawSize = float64(len(mob))
}
return
}
@ -284,7 +301,8 @@ func (s *Storage) packSessionBetter(task *Task, tp FileType) {
start := time.Now()
task.doms = s.compressSessionBetter(mob[:s.cfg.FileSplitSize])
firstPart = time.Since(start).Milliseconds()
// Record dom start raw size
task.domsRawSize = float64(s.cfg.FileSplitSize)
// Finish task
wg.Done()
}()
@ -294,7 +312,8 @@ func (s *Storage) packSessionBetter(task *Task, tp FileType) {
start := time.Now()
task.dome = s.compressSessionBetter(mob[s.cfg.FileSplitSize:])
secondPart = time.Since(start).Milliseconds()
// Record dom end raw size
task.domeRawSize = float64(len(mob) - s.cfg.FileSplitSize)
// Finish task
wg.Done()
}()
@ -365,6 +384,9 @@ func (s *Storage) uploadSession(task *Task) {
}
go func() {
if task.doms != nil {
// Record compression ratio
metrics.RecordSessionCompressionRatio(float64(task.doms.Len())/task.domsRawSize, DOM.String())
// Upload session to s3
start := time.Now()
if err := s.s3.Upload(task.doms, task.id+string(DOM)+"s", "application/octet-stream", compression); err != nil {
log.Fatalf("Storage: start upload failed. %s", err)
@ -375,6 +397,9 @@ func (s *Storage) uploadSession(task *Task) {
}()
go func() {
if task.dome != nil {
// Record compression ratio
metrics.RecordSessionCompressionRatio(float64(task.dome.Len())/task.domeRawSize, DOM.String())
// Upload session to s3
start := time.Now()
if err := s.s3.Upload(task.dome, task.id+string(DOM)+"e", "application/octet-stream", compression); err != nil {
log.Fatalf("Storage: start upload failed. %s", err)
@ -385,6 +410,9 @@ func (s *Storage) uploadSession(task *Task) {
}()
go func() {
if task.dev != nil {
// Record compression ratio
metrics.RecordSessionCompressionRatio(float64(task.dev.Len())/task.devRawSize, DEV.String())
// Upload session to s3
start := time.Now()
if err := s.s3.Upload(task.dev, task.id+string(DEV), "application/octet-stream", compression); err != nil {
log.Fatalf("Storage: start upload failed. %s", err)
@ -399,14 +427,43 @@ func (s *Storage) uploadSession(task *Task) {
metrics.IncreaseStorageTotalSessions()
}
func (s *Storage) worker() {
func (s *Storage) doCompression(task *Task) {
wg := &sync.WaitGroup{}
wg.Add(2)
go func() {
s.packSession(task, DOM)
wg.Done()
}()
go func() {
s.packSession(task, DEV)
wg.Done()
}()
wg.Wait()
s.uploadingTasks <- task
}
func (s *Storage) compressionWorker() {
for {
select {
case task := <-s.tasks:
s.uploadSession(task)
default:
// Signal that worker finished all tasks
s.ready <- struct{}{}
case task := <-s.compressionTasks:
if task.isBreakTask {
s.uploadingTasks <- task
continue
}
s.doCompression(task)
}
}
}
func (s *Storage) uploadingWorker() {
for {
select {
case task := <-s.uploadingTasks:
if task.isBreakTask {
s.workersStopped <- struct{}{}
continue
}
s.uploadSession(task)
}
}
}

View file

@ -21,9 +21,10 @@ func (m *msgInfo) Print() string {
return fmt.Sprintf("index: %d, start: %d, end: %d, type: %d, body: %s", m.index, m.start, m.end, m.msgType, m.body)
}
func SplitMessages(data []byte) ([]*msgInfo, error) {
func SplitMessages(sessID string, data []byte) ([]*msgInfo, error) {
messages := make([]*msgInfo, 0)
indexes := make(map[uint64]bool)
hadDuplicates := false
var lastTimestamp uint64
reader := NewBytesReader(data)
for {
@ -55,8 +56,9 @@ func SplitMessages(data []byte) ([]*msgInfo, error) {
return messages, fmt.Errorf("read message body err: %s", err)
}
if _, ok := indexes[msgIndex]; ok {
log.Printf("duplicate message index: %d", msgIndex)
if _, ok := indexes[msgIndex]; ok && !hadDuplicates {
hadDuplicates = true
log.Printf("Session %s has duplicate messages", sessID)
continue
}
indexes[msgIndex] = true

View file

@ -127,6 +127,20 @@ func RecordSessionUploadDuration(durMillis float64, fileType string) {
storageSessionUploadDuration.WithLabelValues(fileType).Observe(durMillis / 1000.0)
}
var storageSessionCompressionRatio = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "storage",
Name: "compression_ratio",
Help: "A histogram displaying the compression ratio of mob files for each session.",
Buckets: common.DefaultDurationBuckets,
},
[]string{"file_type"},
)
func RecordSessionCompressionRatio(ratio float64, fileType string) {
storageSessionCompressionRatio.WithLabelValues(fileType).Observe(ratio)
}
func List() []prometheus.Collector {
return []prometheus.Collector{
storageSessionSize,
@ -136,5 +150,6 @@ func List() []prometheus.Collector {
storageSessionEncryptionDuration,
storageSessionCompressDuration,
storageSessionUploadDuration,
storageSessionCompressionRatio,
}
}