[Backend] Zstd support for mob files (#1732)

* feat(backend): added zstd compression support to storage service

* feat(backend): try to ignore content-encoding for zstd compression
This commit is contained in:
Alexander 2023-12-04 16:00:59 +01:00 committed by GitHub
parent d04fb30048
commit a68351d509
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 72 additions and 83 deletions

View file

@ -23,7 +23,7 @@ type Config struct {
MaxFileSize int64 `env:"MAX_FILE_SIZE,default=524288000"` MaxFileSize int64 `env:"MAX_FILE_SIZE,default=524288000"`
UseSort bool `env:"USE_SESSION_SORT,default=true"` UseSort bool `env:"USE_SESSION_SORT,default=true"`
UseProfiler bool `env:"PROFILER_ENABLED,default=false"` UseProfiler bool `env:"PROFILER_ENABLED,default=false"`
UseBrotli bool `env:"USE_BROTLI,default=true"` CompressionAlgo string `env:"COMPRESSION_ALGO,default=gzip"` // none, gzip, brotli, zstd
} }
func New() *Config { func New() *Config {

View file

@ -13,6 +13,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/klauspost/compress/zstd"
gzip "github.com/klauspost/pgzip" gzip "github.com/klauspost/pgzip"
config "openreplay/backend/internal/config/storage" config "openreplay/backend/internal/config/storage"
"openreplay/backend/pkg/messages" "openreplay/backend/pkg/messages"
@ -114,7 +115,7 @@ func (s *Storage) Process(msg *messages.SessionEnd) (err error) {
newTask := &Task{ newTask := &Task{
id: sessionID, id: sessionID,
key: msg.EncryptionKey, key: msg.EncryptionKey,
compression: objectstorage.NoCompression, compression: s.setTaskCompression(),
} }
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
wg.Add(2) wg.Add(2)
@ -197,21 +198,31 @@ func (s *Storage) prepareSession(path string, tp FileType, task *Task) error {
return nil return nil
} }
func (s *Storage) packSession(task *Task, tp FileType) { func (s *Storage) setTaskCompression() objectstorage.CompressionType {
// If encryption key is empty, pack session using better algorithm switch s.cfg.CompressionAlgo {
if task.key == "" && s.cfg.UseBrotli { case "none":
s.packSessionBetter(task, tp) return objectstorage.NoCompression
return case "gzip":
return objectstorage.Gzip
case "brotli":
return objectstorage.Brotli
case "zstd":
return objectstorage.Zstd
default:
log.Printf("unknown compression algorithm: %s", s.cfg.CompressionAlgo)
return objectstorage.NoCompression
} }
}
func (s *Storage) packSession(task *Task, tp FileType) {
// Prepare mob file // Prepare mob file
mob := task.Mob(tp) mob := task.Mob(tp)
task.compression = objectstorage.Gzip
// For devtools of small dom file
if tp == DEV || len(mob) <= s.cfg.FileSplitSize { if tp == DEV || len(mob) <= s.cfg.FileSplitSize {
// Compression // Compression
start := time.Now() start := time.Now()
data := s.compressSession(mob) data := s.compress(mob, task.compression)
metrics.RecordSessionCompressDuration(float64(time.Now().Sub(start).Milliseconds()), tp.String()) metrics.RecordSessionCompressDuration(float64(time.Now().Sub(start).Milliseconds()), tp.String())
// Encryption // Encryption
@ -221,13 +232,15 @@ func (s *Storage) packSession(task *Task, tp FileType) {
if tp == DOM { if tp == DOM {
task.doms = bytes.NewBuffer(result) task.doms = bytes.NewBuffer(result)
task.domsRawSize = float64(len(mob))
} else { } else {
task.dev = bytes.NewBuffer(result) task.dev = bytes.NewBuffer(result)
task.devRawSize = float64(len(mob))
} }
return return
} }
// Prepare two workers // Prepare two workers for two parts (start and end) of dom file
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
wg.Add(2) wg.Add(2)
var firstPart, secondPart, firstEncrypt, secondEncrypt int64 var firstPart, secondPart, firstEncrypt, secondEncrypt int64
@ -236,7 +249,7 @@ func (s *Storage) packSession(task *Task, tp FileType) {
go func() { go func() {
// Compression // Compression
start := time.Now() start := time.Now()
data := s.compressSession(mob[:s.cfg.FileSplitSize]) data := s.compress(mob[:s.cfg.FileSplitSize], task.compression)
firstPart = time.Since(start).Milliseconds() firstPart = time.Since(start).Milliseconds()
// Encryption // Encryption
@ -244,14 +257,18 @@ func (s *Storage) packSession(task *Task, tp FileType) {
task.doms = bytes.NewBuffer(s.encryptSession(data.Bytes(), task.key)) task.doms = bytes.NewBuffer(s.encryptSession(data.Bytes(), task.key))
firstEncrypt = time.Since(start).Milliseconds() firstEncrypt = time.Since(start).Milliseconds()
// Record dom start raw size
task.domsRawSize = float64(s.cfg.FileSplitSize)
// Finish task // Finish task
wg.Done() wg.Done()
}() }()
// DomEnd part // DomEnd part
go func() { go func() {
// Compression // Compression
start := time.Now() start := time.Now()
data := s.compressSession(mob[s.cfg.FileSplitSize:]) data := s.compress(mob[s.cfg.FileSplitSize:], task.compression)
secondPart = time.Since(start).Milliseconds() secondPart = time.Since(start).Milliseconds()
// Encryption // Encryption
@ -259,64 +276,9 @@ func (s *Storage) packSession(task *Task, tp FileType) {
task.dome = bytes.NewBuffer(s.encryptSession(data.Bytes(), task.key)) task.dome = bytes.NewBuffer(s.encryptSession(data.Bytes(), task.key))
secondEncrypt = time.Since(start).Milliseconds() secondEncrypt = time.Since(start).Milliseconds()
// Finish task
wg.Done()
}()
wg.Wait()
// Record metrics
metrics.RecordSessionEncryptionDuration(float64(firstEncrypt+secondEncrypt), tp.String())
metrics.RecordSessionCompressDuration(float64(firstPart+secondPart), tp.String())
}
// packSessionBetter is a new version of packSession that uses brotli compression (only if we are not using encryption)
func (s *Storage) packSessionBetter(task *Task, tp FileType) {
// Prepare mob file
mob := task.Mob(tp)
task.compression = objectstorage.Brotli
if tp == DEV || len(mob) <= s.cfg.FileSplitSize {
// Compression
start := time.Now()
result := s.compressSessionBetter(mob)
metrics.RecordSessionCompressDuration(float64(time.Now().Sub(start).Milliseconds()), tp.String())
if tp == DOM {
task.doms = result
// Record full dom (start) raw size
task.domsRawSize = float64(len(mob))
} else {
task.dev = result
// Record dev raw size
task.devRawSize = float64(len(mob))
}
return
}
// Prepare two workers
wg := &sync.WaitGroup{}
wg.Add(2)
var firstPart, secondPart, firstEncrypt, secondEncrypt int64
// DomStart part
go func() {
// Compression
start := time.Now()
task.doms = s.compressSessionBetter(mob[:s.cfg.FileSplitSize])
firstPart = time.Since(start).Milliseconds()
// Record dom start raw size
task.domsRawSize = float64(s.cfg.FileSplitSize)
// Finish task
wg.Done()
}()
// DomEnd part
go func() {
// Compression
start := time.Now()
task.dome = s.compressSessionBetter(mob[s.cfg.FileSplitSize:])
secondPart = time.Since(start).Milliseconds()
// Record dom end raw size // Record dom end raw size
task.domeRawSize = float64(len(mob) - s.cfg.FileSplitSize) task.domeRawSize = float64(len(mob) - s.cfg.FileSplitSize)
// Finish task // Finish task
wg.Done() wg.Done()
}() }()
@ -328,21 +290,33 @@ func (s *Storage) packSessionBetter(task *Task, tp FileType) {
} }
func (s *Storage) encryptSession(data []byte, encryptionKey string) []byte { func (s *Storage) encryptSession(data []byte, encryptionKey string) []byte {
var encryptedData []byte if encryptionKey == "" {
var err error // no encryption, just return the same data
if encryptionKey != "" { return data
encryptedData, err = EncryptData(data, []byte(encryptionKey)) }
if err != nil { encryptedData, err := EncryptData(data, []byte(encryptionKey))
log.Printf("can't encrypt data: %s", err) if err != nil {
encryptedData = data log.Printf("can't encrypt data: %s", err)
}
} else {
encryptedData = data encryptedData = data
} }
return encryptedData return encryptedData
} }
func (s *Storage) compressSession(data []byte) *bytes.Buffer { func (s *Storage) compress(data []byte, compressionType objectstorage.CompressionType) *bytes.Buffer {
switch compressionType {
case objectstorage.Gzip:
return s.compressGzip(data)
case objectstorage.Brotli:
return s.compressBrotli(data)
case objectstorage.Zstd:
return s.compressZstd(data)
default:
// no compression, just return the same data
return bytes.NewBuffer(data)
}
}
func (s *Storage) compressGzip(data []byte) *bytes.Buffer {
zippedMob := new(bytes.Buffer) zippedMob := new(bytes.Buffer)
z, _ := gzip.NewWriterLevel(zippedMob, gzip.DefaultCompression) z, _ := gzip.NewWriterLevel(zippedMob, gzip.DefaultCompression)
if _, err := z.Write(data); err != nil { if _, err := z.Write(data); err != nil {
@ -354,7 +328,7 @@ func (s *Storage) compressSession(data []byte) *bytes.Buffer {
return zippedMob return zippedMob
} }
func (s *Storage) compressSessionBetter(data []byte) *bytes.Buffer { func (s *Storage) compressBrotli(data []byte) *bytes.Buffer {
out := bytes.Buffer{} out := bytes.Buffer{}
writer := brotli.NewWriterOptions(&out, brotli.WriterOptions{Quality: brotli.DefaultCompression}) writer := brotli.NewWriterOptions(&out, brotli.WriterOptions{Quality: brotli.DefaultCompression})
in := bytes.NewReader(data) in := bytes.NewReader(data)
@ -373,6 +347,18 @@ func (s *Storage) compressSessionBetter(data []byte) *bytes.Buffer {
return &out return &out
} }
func (s *Storage) compressZstd(data []byte) *bytes.Buffer {
var out bytes.Buffer
w, _ := zstd.NewWriter(&out)
if _, err := w.Write(data); err != nil {
log.Printf("can't write session data to compressor: %s", err)
}
if err := w.Close(); err != nil {
log.Printf("can't close compressor: %s", err)
}
return &out
}
func (s *Storage) uploadSession(task *Task) { func (s *Storage) uploadSession(task *Task) {
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
wg.Add(3) wg.Add(3)

View file

@ -11,6 +11,7 @@ const (
NoCompression CompressionType = iota NoCompression CompressionType = iota
Gzip Gzip
Brotli Brotli
Zstd
) )
type ObjectStorage interface { type ObjectStorage interface {

View file

@ -73,11 +73,13 @@ func (s *storageImpl) Upload(reader io.Reader, key string, contentType string, c
var contentEncoding *string var contentEncoding *string
switch compression { switch compression {
case objectstorage.Gzip: case objectstorage.Gzip:
gzipStr := "gzip" encodeStr := "gzip"
contentEncoding = &gzipStr contentEncoding = &encodeStr
case objectstorage.Brotli: case objectstorage.Brotli:
gzipStr := "br" encodeStr := "br"
contentEncoding = &gzipStr contentEncoding = &encodeStr
case objectstorage.Zstd:
// Have to ignore contentEncoding for Zstd (otherwise will be an error in browser)
} }
_, err := s.uploader.Upload(&s3manager.UploadInput{ _, err := s.uploader.Upload(&s3manager.UploadInput{