Enable session encryption (#1121)
* feat(backend): enable session encryption * feat(backend): fixed updated method name in failover algo * feat(backend): disable encryption by default
This commit is contained in:
parent
e22845049d
commit
d74ed000fa
4 changed files with 17 additions and 9 deletions
|
|
@ -84,7 +84,8 @@ ENV TZ=UTC \
|
|||
CH_PASSWORD="" \
|
||||
CH_DATABASE="default" \
|
||||
# Max file size to process, default to 100MB
|
||||
MAX_FILE_SIZE=100000000
|
||||
MAX_FILE_SIZE=100000000 \
|
||||
USE_ENCRYPTION=false
|
||||
|
||||
|
||||
RUN if [ "$SERVICE_NAME" = "http" ]; then \
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ func main() {
|
|||
messages.NewMessageIterator(
|
||||
func(msg messages.Message) {
|
||||
sesEnd := msg.(*messages.SessionEnd)
|
||||
if err := srv.Upload(sesEnd); err != nil {
|
||||
if err := srv.Process(sesEnd); err != nil {
|
||||
log.Printf("upload session err: %s, sessID: %d", err, msg.SessionID())
|
||||
sessionFinder.Find(msg.SessionID(), sesEnd.Timestamp)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,6 +37,7 @@ type Task struct {
|
|||
doms *bytes.Buffer
|
||||
dome *bytes.Buffer
|
||||
dev *bytes.Buffer
|
||||
key string
|
||||
}
|
||||
|
||||
type Storage struct {
|
||||
|
|
@ -69,13 +70,14 @@ func (s *Storage) Wait() {
|
|||
<-s.ready
|
||||
}
|
||||
|
||||
func (s *Storage) Upload(msg *messages.SessionEnd) (err error) {
|
||||
func (s *Storage) Process(msg *messages.SessionEnd) (err error) {
|
||||
// Generate file path
|
||||
sessionID := strconv.FormatUint(msg.SessionID(), 10)
|
||||
filePath := s.cfg.FSDir + "/" + sessionID
|
||||
// Prepare sessions
|
||||
newTask := &Task{
|
||||
id: sessionID,
|
||||
id: sessionID,
|
||||
key: msg.EncryptionKey,
|
||||
}
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
|
|
@ -157,12 +159,12 @@ func (s *Storage) prepareSession(path string, tp FileType, task *Task) error {
|
|||
// Encode and compress session
|
||||
if tp == DEV {
|
||||
start := time.Now()
|
||||
task.dev = s.compressSession(mob)
|
||||
task.dev = s.packSession(mob, task.key)
|
||||
metrics.RecordSessionCompressDuration(float64(time.Now().Sub(start).Milliseconds()), tp.String())
|
||||
} else {
|
||||
if len(mob) <= s.cfg.FileSplitSize {
|
||||
start := time.Now()
|
||||
task.doms = s.compressSession(mob)
|
||||
task.doms = s.packSession(mob, task.key)
|
||||
metrics.RecordSessionCompressDuration(float64(time.Now().Sub(start).Milliseconds()), tp.String())
|
||||
return nil
|
||||
}
|
||||
|
|
@ -171,13 +173,13 @@ func (s *Storage) prepareSession(path string, tp FileType, task *Task) error {
|
|||
var firstPart, secondPart int64
|
||||
go func() {
|
||||
start := time.Now()
|
||||
task.doms = s.compressSession(mob[:s.cfg.FileSplitSize])
|
||||
task.doms = s.packSession(mob[:s.cfg.FileSplitSize], task.key)
|
||||
firstPart = time.Now().Sub(start).Milliseconds()
|
||||
wg.Done()
|
||||
}()
|
||||
go func() {
|
||||
start := time.Now()
|
||||
task.dome = s.compressSession(mob[s.cfg.FileSplitSize:])
|
||||
task.dome = s.packSession(mob[s.cfg.FileSplitSize:], task.key)
|
||||
secondPart = time.Now().Sub(start).Milliseconds()
|
||||
wg.Done()
|
||||
}()
|
||||
|
|
@ -202,6 +204,11 @@ func (s *Storage) encryptSession(data []byte, encryptionKey string) []byte {
|
|||
return encryptedData
|
||||
}
|
||||
|
||||
func (s *Storage) packSession(raw []byte, key string) *bytes.Buffer {
|
||||
data := s.encryptSession(raw, key)
|
||||
return s.compressSession(data)
|
||||
}
|
||||
|
||||
func (s *Storage) compressSession(data []byte) *bytes.Buffer {
|
||||
zippedMob := new(bytes.Buffer)
|
||||
z, _ := gzip.NewWriterLevel(zippedMob, gzip.BestSpeed)
|
||||
|
|
|
|||
|
|
@ -91,7 +91,7 @@ func (s *sessionFinderImpl) worker() {
|
|||
func (s *sessionFinderImpl) findSession(sessionID, timestamp, partition uint64) {
|
||||
sessEnd := &messages.SessionEnd{Timestamp: timestamp}
|
||||
sessEnd.SetSessionID(sessionID)
|
||||
err := s.storage.Upload(sessEnd)
|
||||
err := s.storage.Process(sessEnd)
|
||||
if err == nil {
|
||||
log.Printf("found session: %d in partition: %d, original: %d",
|
||||
sessionID, partition, sessionID%numberOfPartitions)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue