feat(backend): added more transparent way to use different compression algorithms in storage
This commit is contained in:
parent
1cd8d311e4
commit
402cf8f631
2 changed files with 10 additions and 10 deletions
|
|
@ -23,7 +23,7 @@ type Config struct {
|
|||
MaxFileSize int64 `env:"MAX_FILE_SIZE,default=524288000"`
|
||||
UseSort bool `env:"USE_SESSION_SORT,default=true"`
|
||||
UseProfiler bool `env:"PROFILER_ENABLED,default=false"`
|
||||
UseBrotli bool `env:"USE_BROTLI,default=false"`
|
||||
UseBrotli bool `env:"USE_BROTLI,default=true"`
|
||||
}
|
||||
|
||||
func New() *Config {
|
||||
|
|
|
|||
|
|
@ -45,6 +45,7 @@ type Task struct {
|
|||
dome *bytes.Buffer
|
||||
dev *bytes.Buffer
|
||||
isBreakTask bool
|
||||
compression objectstorage.CompressionType
|
||||
}
|
||||
|
||||
func (t *Task) SetMob(mob []byte, tp FileType) {
|
||||
|
|
@ -113,6 +114,7 @@ func (s *Storage) Process(msg *messages.SessionEnd) (err error) {
|
|||
newTask := &Task{
|
||||
id: sessionID,
|
||||
key: msg.EncryptionKey,
|
||||
compression: objectstorage.NoCompression,
|
||||
}
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
|
|
@ -204,6 +206,7 @@ func (s *Storage) packSession(task *Task, tp FileType) {
|
|||
|
||||
// Prepare mob file
|
||||
mob := task.Mob(tp)
|
||||
task.compression = objectstorage.Gzip
|
||||
|
||||
if tp == DEV || len(mob) <= s.cfg.FileSplitSize {
|
||||
// Compression
|
||||
|
|
@ -270,6 +273,7 @@ func (s *Storage) packSession(task *Task, tp FileType) {
|
|||
func (s *Storage) packSessionBetter(task *Task, tp FileType) {
|
||||
// Prepare mob file
|
||||
mob := task.Mob(tp)
|
||||
task.compression = objectstorage.Brotli
|
||||
|
||||
if tp == DEV || len(mob) <= s.cfg.FileSplitSize {
|
||||
// Compression
|
||||
|
|
@ -377,17 +381,13 @@ func (s *Storage) uploadSession(task *Task) {
|
|||
uploadDome int64 = 0
|
||||
uploadDev int64 = 0
|
||||
)
|
||||
compression := objectstorage.NoCompression
|
||||
if task.key == "" {
|
||||
compression = objectstorage.Brotli
|
||||
}
|
||||
go func() {
|
||||
if task.doms != nil {
|
||||
// Record compression ratio
|
||||
metrics.RecordSessionCompressionRatio(task.domsRawSize/float64(task.doms.Len()), DOM.String())
|
||||
// Upload session to s3
|
||||
start := time.Now()
|
||||
if err := s.objStorage.Upload(task.doms, task.id+string(DOM)+"s", "application/octet-stream", compression); err != nil {
|
||||
if err := s.objStorage.Upload(task.doms, task.id+string(DOM)+"s", "application/octet-stream", task.compression); err != nil {
|
||||
log.Fatalf("Storage: start upload failed. %s", err)
|
||||
}
|
||||
uploadDoms = time.Now().Sub(start).Milliseconds()
|
||||
|
|
@ -400,7 +400,7 @@ func (s *Storage) uploadSession(task *Task) {
|
|||
metrics.RecordSessionCompressionRatio(task.domeRawSize/float64(task.dome.Len()), DOM.String())
|
||||
// Upload session to s3
|
||||
start := time.Now()
|
||||
if err := s.objStorage.Upload(task.dome, task.id+string(DOM)+"e", "application/octet-stream", compression); err != nil {
|
||||
if err := s.objStorage.Upload(task.dome, task.id+string(DOM)+"e", "application/octet-stream", task.compression); err != nil {
|
||||
log.Fatalf("Storage: start upload failed. %s", err)
|
||||
}
|
||||
uploadDome = time.Now().Sub(start).Milliseconds()
|
||||
|
|
@ -413,7 +413,7 @@ func (s *Storage) uploadSession(task *Task) {
|
|||
metrics.RecordSessionCompressionRatio(task.devRawSize/float64(task.dev.Len()), DEV.String())
|
||||
// Upload session to s3
|
||||
start := time.Now()
|
||||
if err := s.objStorage.Upload(task.dev, task.id+string(DEV), "application/octet-stream", compression); err != nil {
|
||||
if err := s.objStorage.Upload(task.dev, task.id+string(DEV), "application/octet-stream", task.compression); err != nil {
|
||||
log.Fatalf("Storage: start upload failed. %s", err)
|
||||
}
|
||||
uploadDev = time.Now().Sub(start).Milliseconds()
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue