[Storage] added workers perf improvements (#877)
* feat(backend): added workers for storage service
This commit is contained in:
parent
22b60af72c
commit
bab5a81959
3 changed files with 161 additions and 156 deletions
|
|
@ -44,7 +44,7 @@ func main() {
|
|||
messages.NewMessageIterator(
|
||||
func(msg messages.Message) {
|
||||
sesEnd := msg.(*messages.SessionEnd)
|
||||
if err := srv.UploadSessionFiles(sesEnd); err != nil {
|
||||
if err := srv.Upload(sesEnd); err != nil {
|
||||
log.Printf("can't find session: %d", msg.SessionID())
|
||||
sessionFinder.Find(msg.SessionID(), sesEnd.Timestamp)
|
||||
}
|
||||
|
|
@ -54,7 +54,7 @@ func main() {
|
|||
[]int{messages.MsgSessionEnd},
|
||||
true,
|
||||
),
|
||||
true,
|
||||
false,
|
||||
cfg.MessageSizeLimit,
|
||||
)
|
||||
|
||||
|
|
@ -69,10 +69,15 @@ func main() {
|
|||
case sig := <-sigchan:
|
||||
log.Printf("Caught signal %v: terminating\n", sig)
|
||||
sessionFinder.Stop()
|
||||
srv.Wait()
|
||||
consumer.Close()
|
||||
os.Exit(0)
|
||||
case <-counterTick:
|
||||
go counter.Print()
|
||||
srv.Wait()
|
||||
if err := consumer.Commit(); err != nil {
|
||||
log.Printf("can't commit messages: %s", err)
|
||||
}
|
||||
case msg := <-consumer.Rebalanced():
|
||||
log.Println(msg)
|
||||
default:
|
||||
|
|
|
|||
|
|
@ -2,20 +2,33 @@ package storage
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
gzip "github.com/klauspost/pgzip"
|
||||
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
|
||||
"log"
|
||||
config "openreplay/backend/internal/config/storage"
|
||||
"openreplay/backend/pkg/flakeid"
|
||||
"openreplay/backend/pkg/messages"
|
||||
"openreplay/backend/pkg/monitoring"
|
||||
"openreplay/backend/pkg/storage"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type FileType string
|
||||
|
||||
const (
|
||||
DOM FileType = "/dom.mob"
|
||||
DEV FileType = "/devtools.mob"
|
||||
)
|
||||
|
||||
type Task struct {
|
||||
id string
|
||||
doms *bytes.Buffer
|
||||
dome *bytes.Buffer
|
||||
dev *bytes.Buffer
|
||||
}
|
||||
|
||||
type Storage struct {
|
||||
cfg *config.Config
|
||||
s3 *storage.S3
|
||||
|
|
@ -27,6 +40,9 @@ type Storage struct {
|
|||
readingDOMTime syncfloat64.Histogram
|
||||
readingTime syncfloat64.Histogram
|
||||
archivingTime syncfloat64.Histogram
|
||||
|
||||
tasks chan *Task
|
||||
ready chan struct{}
|
||||
}
|
||||
|
||||
func New(cfg *config.Config, s3 *storage.S3, metrics *monitoring.Metrics) (*Storage, error) {
|
||||
|
|
@ -57,7 +73,7 @@ func New(cfg *config.Config, s3 *storage.S3, metrics *monitoring.Metrics) (*Stor
|
|||
if err != nil {
|
||||
log.Printf("can't create archiving_duration metric: %s", err)
|
||||
}
|
||||
return &Storage{
|
||||
newStorage := &Storage{
|
||||
cfg: cfg,
|
||||
s3: s3,
|
||||
startBytes: make([]byte, cfg.FileSplitSize),
|
||||
|
|
@ -66,169 +82,153 @@ func New(cfg *config.Config, s3 *storage.S3, metrics *monitoring.Metrics) (*Stor
|
|||
sessionDevtoolsSize: sessionDevtoolsSize,
|
||||
readingTime: readingTime,
|
||||
archivingTime: archivingTime,
|
||||
}, nil
|
||||
tasks: make(chan *Task, 1),
|
||||
ready: make(chan struct{}),
|
||||
}
|
||||
go newStorage.worker()
|
||||
return newStorage, nil
|
||||
}
|
||||
|
||||
func (s *Storage) UploadSessionFiles(msg *messages.SessionEnd) error {
|
||||
if err := s.uploadKey(msg.SessionID(), "/dom.mob", true, 5, msg.EncryptionKey); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.uploadKey(msg.SessionID(), "/devtools.mob", false, 4, msg.EncryptionKey); err != nil {
|
||||
log.Printf("can't find devtools for session: %d, err: %s", msg.SessionID(), err)
|
||||
}
|
||||
return nil
|
||||
func (s *Storage) Wait() {
|
||||
<-s.ready
|
||||
}
|
||||
|
||||
// TODO: make a bit cleaner.
|
||||
// TODO: Of course, I'll do!
|
||||
func (s *Storage) uploadKey(sessID uint64, suffix string, shouldSplit bool, retryCount int, encryptionKey string) error {
|
||||
if retryCount <= 0 {
|
||||
return nil
|
||||
func (s *Storage) Upload(msg *messages.SessionEnd) (err error) {
|
||||
// Generate file path
|
||||
sessionID := strconv.FormatUint(msg.SessionID(), 10)
|
||||
filePath := s.cfg.FSDir + "/" + sessionID
|
||||
// Prepare sessions
|
||||
newTask := &Task{
|
||||
id: sessionID,
|
||||
}
|
||||
start := time.Now()
|
||||
fileName := strconv.FormatUint(sessID, 10)
|
||||
mobFileName := fileName
|
||||
if suffix == "/devtools.mob" {
|
||||
mobFileName += "devtools"
|
||||
}
|
||||
filePath := s.cfg.FSDir + "/" + mobFileName
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
if prepErr := s.prepareSession(filePath, DOM, newTask); prepErr != nil {
|
||||
err = fmt.Errorf("prepare session err: %s", prepErr)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
go func() {
|
||||
if prepErr := s.prepareSession(filePath, DOM, newTask); prepErr != nil {
|
||||
err = fmt.Errorf("prepare session err: %s", prepErr)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
wg.Wait()
|
||||
// Send new task to worker
|
||||
s.tasks <- newTask
|
||||
// Unload worker
|
||||
<-s.ready
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Storage) openSession(filePath string) ([]byte, error) {
|
||||
// Check file size before download into memory
|
||||
info, err := os.Stat(filePath)
|
||||
if err == nil {
|
||||
if info.Size() > s.cfg.MaxFileSize {
|
||||
log.Printf("big file, size: %d, session: %d", info.Size(), sessID)
|
||||
return nil
|
||||
}
|
||||
if err == nil && info.Size() > s.cfg.MaxFileSize {
|
||||
return nil, fmt.Errorf("big file, size: %d", info.Size())
|
||||
}
|
||||
file, err := os.Open(filePath)
|
||||
// Read file into memory
|
||||
return os.ReadFile(filePath)
|
||||
}
|
||||
|
||||
func (s *Storage) prepareSession(path string, tp FileType, task *Task) error {
|
||||
// Open mob file
|
||||
if tp == DEV {
|
||||
path += "devtools"
|
||||
}
|
||||
mob, err := s.openSession(path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("File open error: %v; sessID: %s, part: %d, sessStart: %s\n",
|
||||
err, fileName, sessID%16,
|
||||
time.UnixMilli(int64(flakeid.ExtractTimestamp(sessID))),
|
||||
)
|
||||
return err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
var fileSize int64 = 0
|
||||
fileInfo, err := file.Stat()
|
||||
if err != nil {
|
||||
log.Printf("can't get file info: %s", err)
|
||||
if tp == DEV {
|
||||
task.dev = s.compressSession(mob)
|
||||
} else {
|
||||
fileSize = fileInfo.Size()
|
||||
}
|
||||
|
||||
var encryptedData []byte
|
||||
fileName += suffix
|
||||
if shouldSplit {
|
||||
nRead, err := file.Read(s.startBytes)
|
||||
if err != nil {
|
||||
log.Printf("File read error: %s; sessID: %s, part: %d, sessStart: %s",
|
||||
err,
|
||||
fileName,
|
||||
sessID%16,
|
||||
time.UnixMilli(int64(flakeid.ExtractTimestamp(sessID))),
|
||||
)
|
||||
time.AfterFunc(s.cfg.RetryTimeout, func() {
|
||||
s.uploadKey(sessID, suffix, shouldSplit, retryCount-1, encryptionKey)
|
||||
})
|
||||
if len(mob) <= s.cfg.FileSplitSize {
|
||||
task.doms = s.compressSession(mob)
|
||||
return nil
|
||||
}
|
||||
s.readingTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()))
|
||||
|
||||
start = time.Now()
|
||||
// Encrypt session file if we have encryption key
|
||||
if encryptionKey != "" {
|
||||
encryptedData, err = EncryptData(s.startBytes[:nRead], []byte(encryptionKey))
|
||||
if err != nil {
|
||||
log.Printf("can't encrypt data: %s", err)
|
||||
encryptedData = s.startBytes[:nRead]
|
||||
}
|
||||
} else {
|
||||
encryptedData = s.startBytes[:nRead]
|
||||
}
|
||||
// Compress and save to s3
|
||||
startReader := bytes.NewBuffer(encryptedData)
|
||||
if err := s.s3.Upload(s.gzipFile(startReader), fileName+"s", "application/octet-stream", true); err != nil {
|
||||
log.Fatalf("Storage: start upload failed. %v\n", err)
|
||||
}
|
||||
// TODO: fix possible error (if we read less then FileSplitSize)
|
||||
if nRead == s.cfg.FileSplitSize {
|
||||
restPartSize := fileSize - int64(nRead)
|
||||
fileData := make([]byte, restPartSize)
|
||||
nRead, err = file.Read(fileData)
|
||||
if err != nil {
|
||||
log.Printf("File read error: %s; sessID: %s, part: %d, sessStart: %s",
|
||||
err,
|
||||
fileName,
|
||||
sessID%16,
|
||||
time.UnixMilli(int64(flakeid.ExtractTimestamp(sessID))),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
if int64(nRead) != restPartSize {
|
||||
log.Printf("can't read the rest part of file")
|
||||
}
|
||||
|
||||
// Encrypt session file if we have encryption key
|
||||
if encryptionKey != "" {
|
||||
encryptedData, err = EncryptData(fileData, []byte(encryptionKey))
|
||||
if err != nil {
|
||||
log.Printf("can't encrypt data: %s", err)
|
||||
encryptedData = fileData
|
||||
}
|
||||
} else {
|
||||
encryptedData = fileData
|
||||
}
|
||||
// Compress and save to s3
|
||||
endReader := bytes.NewBuffer(encryptedData)
|
||||
if err := s.s3.Upload(s.gzipFile(endReader), fileName+"e", "application/octet-stream", true); err != nil {
|
||||
log.Fatalf("Storage: end upload failed. %v\n", err)
|
||||
}
|
||||
}
|
||||
s.archivingTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()))
|
||||
} else {
|
||||
start = time.Now()
|
||||
fileData := make([]byte, fileSize)
|
||||
nRead, err := file.Read(fileData)
|
||||
if err != nil {
|
||||
log.Printf("File read error: %s; sessID: %s, part: %d, sessStart: %s",
|
||||
err,
|
||||
fileName,
|
||||
sessID%16,
|
||||
time.UnixMilli(int64(flakeid.ExtractTimestamp(sessID))),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
if int64(nRead) != fileSize {
|
||||
log.Printf("can't read the rest part of file")
|
||||
}
|
||||
|
||||
// Encrypt session file if we have encryption key
|
||||
if encryptionKey != "" {
|
||||
encryptedData, err = EncryptData(fileData, []byte(encryptionKey))
|
||||
if err != nil {
|
||||
log.Printf("can't encrypt data: %s", err)
|
||||
encryptedData = fileData
|
||||
}
|
||||
} else {
|
||||
encryptedData = fileData
|
||||
}
|
||||
endReader := bytes.NewBuffer(encryptedData)
|
||||
if err := s.s3.Upload(s.gzipFile(endReader), fileName, "application/octet-stream", true); err != nil {
|
||||
log.Fatalf("Storage: end upload failed. %v\n", err)
|
||||
}
|
||||
s.archivingTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()))
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
task.doms = s.compressSession(mob[:s.cfg.FileSplitSize])
|
||||
wg.Done()
|
||||
}()
|
||||
go func() {
|
||||
task.dome = s.compressSession(mob[s.cfg.FileSplitSize:])
|
||||
wg.Done()
|
||||
}()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// Save metrics
|
||||
ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*200)
|
||||
if shouldSplit {
|
||||
s.totalSessions.Add(ctx, 1)
|
||||
s.sessionDOMSize.Record(ctx, float64(fileSize))
|
||||
} else {
|
||||
s.sessionDevtoolsSize.Record(ctx, float64(fileSize))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Storage) encryptSession(data []byte, encryptionKey string) []byte {
|
||||
var encryptedData []byte
|
||||
var err error
|
||||
if encryptionKey != "" {
|
||||
encryptedData, err = EncryptData(data, []byte(encryptionKey))
|
||||
if err != nil {
|
||||
log.Printf("can't encrypt data: %s", err)
|
||||
encryptedData = data
|
||||
}
|
||||
} else {
|
||||
encryptedData = data
|
||||
}
|
||||
return encryptedData
|
||||
}
|
||||
|
||||
func (s *Storage) compressSession(data []byte) *bytes.Buffer {
|
||||
zippedMob := new(bytes.Buffer)
|
||||
z, _ := gzip.NewWriterLevel(zippedMob, gzip.BestSpeed)
|
||||
if _, err := z.Write(data); err != nil {
|
||||
log.Printf("can't write session data to compressor: %s", err)
|
||||
}
|
||||
if err := z.Close(); err != nil {
|
||||
log.Printf("can't close compressor: %s", err)
|
||||
}
|
||||
return zippedMob
|
||||
}
|
||||
|
||||
func (s *Storage) uploadSession(task *Task) {
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(3)
|
||||
go func() {
|
||||
if task.doms != nil {
|
||||
if err := s.s3.Upload(task.doms, task.id+string(DOM)+"s", "application/octet-stream", true); err != nil {
|
||||
log.Fatalf("Storage: start upload failed. %s", err)
|
||||
}
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
go func() {
|
||||
if task.dome != nil {
|
||||
if err := s.s3.Upload(task.dome, task.id+string(DOM)+"e", "application/octet-stream", true); err != nil {
|
||||
log.Fatalf("Storage: start upload failed. %s", err)
|
||||
}
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
go func() {
|
||||
if task.dev != nil {
|
||||
if err := s.s3.Upload(task.dev, task.id+string(DEV), "application/octet-stream", true); err != nil {
|
||||
log.Fatalf("Storage: start upload failed. %s", err)
|
||||
}
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (s *Storage) worker() {
|
||||
for {
|
||||
select {
|
||||
case task := <-s.tasks:
|
||||
s.uploadSession(task)
|
||||
default:
|
||||
// Signal that worker finished all tasks
|
||||
s.ready <- struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -91,7 +91,7 @@ func (s *sessionFinderImpl) worker() {
|
|||
func (s *sessionFinderImpl) findSession(sessionID, timestamp, partition uint64) {
|
||||
sessEnd := &messages.SessionEnd{Timestamp: timestamp}
|
||||
sessEnd.SetSessionID(sessionID)
|
||||
err := s.storage.UploadSessionFiles(sessEnd)
|
||||
err := s.storage.Upload(sessEnd)
|
||||
if err == nil {
|
||||
log.Printf("found session: %d in partition: %d, original: %d",
|
||||
sessionID, partition, sessionID%numberOfPartitions)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue