From d74ed000fa8ce68f6dc49e1edbbb0b3165f5ea35 Mon Sep 17 00:00:00 2001 From: Alexander Date: Tue, 11 Apr 2023 16:44:26 +0200 Subject: [PATCH] Enable session encryption (#1121) * feat(backend): enable session encryption * feat(backend): fixed updated method name in failover algo * feat(backend): disable encryption by default --- backend/Dockerfile | 3 ++- backend/cmd/storage/main.go | 2 +- backend/internal/storage/storage.go | 19 +++++++++++++------ ee/backend/pkg/failover/failover.go | 2 +- 4 files changed, 17 insertions(+), 9 deletions(-) diff --git a/backend/Dockerfile b/backend/Dockerfile index 749900ba5..24a6f2503 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -84,7 +84,8 @@ ENV TZ=UTC \ CH_PASSWORD="" \ CH_DATABASE="default" \ # Max file size to process, default to 100MB - MAX_FILE_SIZE=100000000 + MAX_FILE_SIZE=100000000 \ + USE_ENCRYPTION=false RUN if [ "$SERVICE_NAME" = "http" ]; then \ diff --git a/backend/cmd/storage/main.go b/backend/cmd/storage/main.go index 2a1f6a402..13a76a181 100644 --- a/backend/cmd/storage/main.go +++ b/backend/cmd/storage/main.go @@ -46,7 +46,7 @@ func main() { messages.NewMessageIterator( func(msg messages.Message) { sesEnd := msg.(*messages.SessionEnd) - if err := srv.Upload(sesEnd); err != nil { + if err := srv.Process(sesEnd); err != nil { log.Printf("upload session err: %s, sessID: %d", err, msg.SessionID()) sessionFinder.Find(msg.SessionID(), sesEnd.Timestamp) } diff --git a/backend/internal/storage/storage.go b/backend/internal/storage/storage.go index b1e6b21fb..24ec9ece2 100644 --- a/backend/internal/storage/storage.go +++ b/backend/internal/storage/storage.go @@ -37,6 +37,7 @@ type Task struct { doms *bytes.Buffer dome *bytes.Buffer dev *bytes.Buffer + key string } type Storage struct { @@ -69,13 +70,14 @@ func (s *Storage) Wait() { <-s.ready } -func (s *Storage) Upload(msg *messages.SessionEnd) (err error) { +func (s *Storage) Process(msg *messages.SessionEnd) (err error) { // Generate file path sessionID := strconv.FormatUint(msg.SessionID(), 10) filePath := s.cfg.FSDir + "/" + sessionID // Prepare sessions newTask := &Task{ - id: sessionID, + id: sessionID, + key: msg.EncryptionKey, } wg := &sync.WaitGroup{} wg.Add(2) @@ -157,12 +159,12 @@ func (s *Storage) prepareSession(path string, tp FileType, task *Task) error { // Encode and compress session if tp == DEV { start := time.Now() - task.dev = s.compressSession(mob) + task.dev = s.packSession(mob, task.key) metrics.RecordSessionCompressDuration(float64(time.Now().Sub(start).Milliseconds()), tp.String()) } else { if len(mob) <= s.cfg.FileSplitSize { start := time.Now() - task.doms = s.compressSession(mob) + task.doms = s.packSession(mob, task.key) metrics.RecordSessionCompressDuration(float64(time.Now().Sub(start).Milliseconds()), tp.String()) return nil } @@ -171,13 +173,13 @@ func (s *Storage) prepareSession(path string, tp FileType, task *Task) error { var firstPart, secondPart int64 go func() { start := time.Now() - task.doms = s.compressSession(mob[:s.cfg.FileSplitSize]) + task.doms = s.packSession(mob[:s.cfg.FileSplitSize], task.key) firstPart = time.Now().Sub(start).Milliseconds() wg.Done() }() go func() { start := time.Now() - task.dome = s.compressSession(mob[s.cfg.FileSplitSize:]) + task.dome = s.packSession(mob[s.cfg.FileSplitSize:], task.key) secondPart = time.Now().Sub(start).Milliseconds() wg.Done() }() @@ -202,6 +204,11 @@ func (s *Storage) encryptSession(data []byte, encryptionKey string) []byte { return encryptedData } +func (s *Storage) packSession(raw []byte, key string) *bytes.Buffer { + data := s.encryptSession(raw, key) + return s.compressSession(data) +} + func (s *Storage) compressSession(data []byte) *bytes.Buffer { zippedMob := new(bytes.Buffer) z, _ := gzip.NewWriterLevel(zippedMob, gzip.BestSpeed) diff --git a/ee/backend/pkg/failover/failover.go b/ee/backend/pkg/failover/failover.go index 11ff7e4be..857b5381b 100644 --- a/ee/backend/pkg/failover/failover.go +++ b/ee/backend/pkg/failover/failover.go @@ -91,7 +91,7 @@ func (s *sessionFinderImpl) worker() { func (s *sessionFinderImpl) findSession(sessionID, timestamp, partition uint64) { sessEnd := &messages.SessionEnd{Timestamp: timestamp} sessEnd.SetSessionID(sessionID) - err := s.storage.Upload(sessEnd) + err := s.storage.Process(sessEnd) if err == nil { log.Printf("found session: %d in partition: %d, original: %d", sessionID, partition, sessionID%numberOfPartitions)