feat(backend): enabled ecnryption and added metrics (#1160)
This commit is contained in:
parent
0610965130
commit
6f195a0ff0
3 changed files with 105 additions and 46 deletions
|
|
@ -84,7 +84,8 @@ ENV TZ=UTC \
|
|||
CH_PASSWORD="" \
|
||||
CH_DATABASE="default" \
|
||||
# Max file size to process, default to 100MB
|
||||
MAX_FILE_SIZE=100000000
|
||||
MAX_FILE_SIZE=100000000 \
|
||||
USE_ENCRYPTION=true
|
||||
|
||||
|
||||
RUN if [ "$SERVICE_NAME" = "http" ]; then \
|
||||
|
|
|
|||
|
|
@ -33,10 +33,28 @@ func (t FileType) String() string {
|
|||
}
|
||||
|
||||
type Task struct {
|
||||
id string
|
||||
doms *bytes.Buffer
|
||||
dome *bytes.Buffer
|
||||
dev *bytes.Buffer
|
||||
id string
|
||||
key string
|
||||
domRaw []byte
|
||||
devRaw []byte
|
||||
doms *bytes.Buffer
|
||||
dome *bytes.Buffer
|
||||
dev *bytes.Buffer
|
||||
}
|
||||
|
||||
func (t *Task) SetMob(mob []byte, tp FileType) {
|
||||
if tp == DOM {
|
||||
t.domRaw = mob
|
||||
} else {
|
||||
t.devRaw = mob
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Task) Mob(tp FileType) []byte {
|
||||
if tp == DOM {
|
||||
return t.domRaw
|
||||
}
|
||||
return t.devRaw
|
||||
}
|
||||
|
||||
type Storage struct {
|
||||
|
|
@ -75,7 +93,8 @@ func (s *Storage) Upload(msg *messages.SessionEnd) (err error) {
|
|||
filePath := s.cfg.FSDir + "/" + sessionID
|
||||
// Prepare sessions
|
||||
newTask := &Task{
|
||||
id: sessionID,
|
||||
id: sessionID,
|
||||
key: msg.EncryptionKey,
|
||||
}
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
|
|
@ -108,6 +127,9 @@ func (s *Storage) Upload(msg *messages.SessionEnd) (err error) {
|
|||
}
|
||||
|
||||
func (s *Storage) openSession(filePath string, tp FileType) ([]byte, error) {
|
||||
if tp == DEV {
|
||||
filePath += "devtools"
|
||||
}
|
||||
// Check file size before download into memory
|
||||
info, err := os.Stat(filePath)
|
||||
if err == nil && info.Size() > s.cfg.MaxFileSize {
|
||||
|
|
@ -142,51 +164,87 @@ func (s *Storage) sortSessionMessages(raw []byte) ([]byte, error) {
|
|||
}
|
||||
|
||||
func (s *Storage) prepareSession(path string, tp FileType, task *Task) error {
|
||||
// Open mob file
|
||||
if tp == DEV {
|
||||
path += "devtools"
|
||||
}
|
||||
// Open session file
|
||||
startRead := time.Now()
|
||||
mob, err := s.openSession(path, tp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
metrics.RecordSessionSize(float64(len(mob)), tp.String())
|
||||
metrics.RecordSessionReadDuration(float64(time.Now().Sub(startRead).Milliseconds()), tp.String())
|
||||
metrics.RecordSessionSize(float64(len(mob)), tp.String())
|
||||
|
||||
// Encode and compress session
|
||||
if tp == DEV {
|
||||
start := time.Now()
|
||||
task.dev = s.compressSession(mob)
|
||||
metrics.RecordSessionCompressDuration(float64(time.Now().Sub(start).Milliseconds()), tp.String())
|
||||
} else {
|
||||
if len(mob) <= s.cfg.FileSplitSize {
|
||||
start := time.Now()
|
||||
task.doms = s.compressSession(mob)
|
||||
metrics.RecordSessionCompressDuration(float64(time.Now().Sub(start).Milliseconds()), tp.String())
|
||||
return nil
|
||||
}
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
var firstPart, secondPart int64
|
||||
go func() {
|
||||
start := time.Now()
|
||||
task.doms = s.compressSession(mob[:s.cfg.FileSplitSize])
|
||||
firstPart = time.Now().Sub(start).Milliseconds()
|
||||
wg.Done()
|
||||
}()
|
||||
go func() {
|
||||
start := time.Now()
|
||||
task.dome = s.compressSession(mob[s.cfg.FileSplitSize:])
|
||||
secondPart = time.Now().Sub(start).Milliseconds()
|
||||
wg.Done()
|
||||
}()
|
||||
wg.Wait()
|
||||
metrics.RecordSessionCompressDuration(float64(firstPart+secondPart), tp.String())
|
||||
}
|
||||
// Put opened session file into task struct
|
||||
task.SetMob(mob, tp)
|
||||
|
||||
// Encrypt and compress session
|
||||
s.packSession(task, tp)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Storage) packSession(task *Task, tp FileType) {
|
||||
// Prepare mob file
|
||||
mob := task.Mob(tp)
|
||||
|
||||
if tp == DEV || len(mob) <= s.cfg.FileSplitSize {
|
||||
// Encryption
|
||||
start := time.Now()
|
||||
data := s.encryptSession(mob, task.key)
|
||||
metrics.RecordSessionEncryptionDuration(float64(time.Now().Sub(start).Milliseconds()), tp.String())
|
||||
// Compression
|
||||
start = time.Now()
|
||||
result := s.compressSession(data)
|
||||
metrics.RecordSessionCompressDuration(float64(time.Now().Sub(start).Milliseconds()), tp.String())
|
||||
|
||||
if tp == DOM {
|
||||
task.doms = result
|
||||
} else {
|
||||
task.dev = result
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Prepare two workers
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
var firstPart, secondPart, firstEncrypt, secondEncrypt int64
|
||||
|
||||
// DomStart part
|
||||
go func() {
|
||||
// Encryption
|
||||
start := time.Now()
|
||||
data := s.encryptSession(mob[:s.cfg.FileSplitSize], task.key)
|
||||
firstEncrypt = time.Since(start).Milliseconds()
|
||||
|
||||
// Compression
|
||||
start = time.Now()
|
||||
task.doms = s.compressSession(data)
|
||||
firstPart = time.Since(start).Milliseconds()
|
||||
|
||||
// Finish task
|
||||
wg.Done()
|
||||
}()
|
||||
// DomEnd part
|
||||
go func() {
|
||||
// Encryption
|
||||
start := time.Now()
|
||||
data := s.encryptSession(mob[s.cfg.FileSplitSize:], task.key)
|
||||
secondEncrypt = time.Since(start).Milliseconds()
|
||||
|
||||
// Compression
|
||||
start = time.Now()
|
||||
task.dome = s.compressSession(data)
|
||||
secondPart = time.Since(start).Milliseconds()
|
||||
|
||||
// Finish task
|
||||
wg.Done()
|
||||
}()
|
||||
wg.Wait()
|
||||
|
||||
// Record metrics
|
||||
metrics.RecordSessionEncryptionDuration(float64(firstEncrypt+secondEncrypt), tp.String())
|
||||
metrics.RecordSessionCompressDuration(float64(firstPart+secondPart), tp.String())
|
||||
}
|
||||
|
||||
func (s *Storage) encryptSession(data []byte, encryptionKey string) []byte {
|
||||
var encryptedData []byte
|
||||
var err error
|
||||
|
|
|
|||
|
|
@ -85,18 +85,18 @@ func RecordSessionSortDuration(durMillis float64, fileType string) {
|
|||
storageSessionSortDuration.WithLabelValues(fileType).Observe(durMillis / 1000.0)
|
||||
}
|
||||
|
||||
var storageSessionEncodeDuration = prometheus.NewHistogramVec(
|
||||
var storageSessionEncryptionDuration = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: "storage",
|
||||
Name: "encode_duration_seconds",
|
||||
Name: "encryption_duration_seconds",
|
||||
Help: "A histogram displaying the duration of encoding for each session in seconds.",
|
||||
Buckets: common.DefaultDurationBuckets,
|
||||
},
|
||||
[]string{"file_type"},
|
||||
)
|
||||
|
||||
func RecordSessionEncodeDuration(durMillis float64, fileType string) {
|
||||
storageSessionEncodeDuration.WithLabelValues(fileType).Observe(durMillis / 1000.0)
|
||||
func RecordSessionEncryptionDuration(durMillis float64, fileType string) {
|
||||
storageSessionEncryptionDuration.WithLabelValues(fileType).Observe(durMillis / 1000.0)
|
||||
}
|
||||
|
||||
var storageSessionCompressDuration = prometheus.NewHistogramVec(
|
||||
|
|
@ -133,7 +133,7 @@ func List() []prometheus.Collector {
|
|||
storageTotalSessions,
|
||||
storageSessionReadDuration,
|
||||
storageSessionSortDuration,
|
||||
storageSessionEncodeDuration,
|
||||
storageSessionEncryptionDuration,
|
||||
storageSessionCompressDuration,
|
||||
storageSessionUploadDuration,
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue