diff --git a/backend/cmd/sink/main.go b/backend/cmd/sink/main.go index 0999e3936..74e0b1db1 100644 --- a/backend/cmd/sink/main.go +++ b/backend/cmd/sink/main.go @@ -1,7 +1,9 @@ package main import ( + "bytes" "context" + "encoding/binary" "log" "openreplay/backend/pkg/pprof" "os" @@ -54,7 +56,38 @@ func main() { log.Printf("can't create messages_size metric: %s", err) } + var ( + sessionID uint64 + messageIndex = make([]byte, 8) + domBuffer = bytes.NewBuffer(make([]byte, 1024)) + devBuffer = bytes.NewBuffer(make([]byte, 1024)) + ) + + // Reset buffers + domBuffer.Reset() + devBuffer.Reset() + msgHandler := func(msg messages.Message) { + // Check batchEnd signal (nil message) + if msg == nil { + // Skip empty buffers + if domBuffer.Len() <= 0 && devBuffer.Len() <= 0 { + return + } + + // Write buffered batches to the session + if err := writer.Write(sessionID, domBuffer.Bytes(), devBuffer.Bytes()); err != nil { + log.Printf("writer error: %s", err) + return + } + + // Prepare buffer for the next batch + domBuffer.Reset() + devBuffer.Reset() + sessionID = 0 + return + } + // [METRICS] Increase the number of processed messages totalMessages.Add(context.Background(), 1) @@ -101,10 +134,57 @@ func main() { return } - // Write message to file - if err := writer.Write(msg); err != nil { - log.Printf("writer error: %s", err) - return + // Write message to the batch buffer + if sessionID == 0 { + sessionID = msg.SessionID() + } + + // Encode message index + binary.LittleEndian.PutUint64(messageIndex, msg.Meta().Index) + + var ( + n int + err error + ) + + // Add message to dom buffer + if messages.IsDOMType(msg.TypeID()) { + // Write message index + n, err = domBuffer.Write(messageIndex) + if err != nil { + log.Printf("domBuffer index write err: %s", err) + } + if n != len(messageIndex) { + log.Printf("domBuffer index not full write: %d/%d", n, len(messageIndex)) + } + // Write message body + n, err = domBuffer.Write(msg.Encode()) + if err != nil { + log.Printf("domBuffer message write err: %s", err) + } + if n != len(msg.Encode()) { + log.Printf("domBuffer message not full write: %d/%d", n, len(messageIndex)) + } + } + + // Add message to dev buffer + if !messages.IsDOMType(msg.TypeID()) || msg.TypeID() == messages.MsgTimestamp { + // Write message index + n, err = devBuffer.Write(messageIndex) + if err != nil { + log.Printf("devBuffer index write err: %s", err) + } + if n != len(messageIndex) { + log.Printf("devBuffer index not full write: %d/%d", n, len(messageIndex)) + } + // Write message body + n, err = devBuffer.Write(msg.Encode()) + if err != nil { + log.Printf("devBuffer message write err: %s", err) + } + if n != len(msg.Encode()) { + log.Printf("devBuffer message not full write: %d/%d", n, len(messageIndex)) + } } // [METRICS] Increase the number of written to the files messages and the message size @@ -117,7 +197,7 @@ func main() { []string{ cfg.TopicRawWeb, }, - messages.NewMessageIterator(msgHandler, nil, false), + messages.NewSinkMessageIterator(msgHandler, nil, false), false, cfg.MessageSizeLimit, ) diff --git a/backend/internal/sink/sessionwriter/session.go b/backend/internal/sink/sessionwriter/session.go index 8cf8881de..cd74db27d 100644 --- a/backend/internal/sink/sessionwriter/session.go +++ b/backend/internal/sink/sessionwriter/session.go @@ -1,19 +1,15 @@ package sessionwriter import ( - "encoding/binary" "fmt" "strconv" "sync" - - "openreplay/backend/pkg/messages" ) type Session struct { lock *sync.Mutex dom *File dev *File - index []byte updated bool } @@ -37,40 +33,22 @@ func NewSession(sessID uint64, workDir string, bufSize int) (*Session, error) { lock: &sync.Mutex{}, dom: dom, dev: dev, - index: make([]byte, 8), updated: false, }, nil } -func (s *Session) Write(msg messages.Message) error { +func (s *Session) Write(domBuffer, devBuffer []byte) error { s.lock.Lock() defer s.lock.Unlock() - // Encode message index - binary.LittleEndian.PutUint64(s.index, msg.Meta().Index) - - // Write message to dom.mob file - if messages.IsDOMType(msg.TypeID()) { - // Write message index - if err := s.dom.Write(s.index); err != nil { - return err - } - // Write message body - if err := s.dom.Write(msg.Encode()); err != nil { - return err - } + // Write dom buffer to the file (file buffer) + if err := s.dom.Write(domBuffer); err != nil { + return err } - s.updated = true - // Write message to dev.mob file - if !messages.IsDOMType(msg.TypeID()) || msg.TypeID() == messages.MsgTimestamp { - // Write message index - if err := s.dev.Write(s.index); err != nil { - return err - } - // Write message body - if err := s.dev.Write(msg.Encode()); err != nil { - return err - } + + // Write dev buffer to the file (file buffer) + if err := s.dev.Write(devBuffer); err != nil { + return err } return nil } diff --git a/backend/internal/sink/sessionwriter/writer.go b/backend/internal/sink/sessionwriter/writer.go index 7da1ae878..b7d2a2c68 100644 --- a/backend/internal/sink/sessionwriter/writer.go +++ b/backend/internal/sink/sessionwriter/writer.go @@ -5,8 +5,6 @@ import ( "log" "sync" "time" - - "openreplay/backend/pkg/messages" ) type SessionWriter struct { @@ -35,11 +33,8 @@ func NewWriter(filesLimit uint16, workingDir string, fileBuffer int, syncTimeout return w } -func (w *SessionWriter) Write(msg messages.Message) (err error) { - var ( - sess *Session - sid = msg.SessionID() - ) +func (w *SessionWriter) Write(sid uint64, domBuffer, devBuffer []byte) (err error) { + var sess *Session // Load session sessObj, ok := w.sessions.Load(sid) @@ -65,7 +60,7 @@ func (w *SessionWriter) Write(msg messages.Message) (err error) { } // Write data to session - return sess.Write(msg) + return sess.Write(domBuffer, devBuffer) } func (w *SessionWriter) sync(sid uint64) error { diff --git a/backend/pkg/messages/iterator-sink.go b/backend/pkg/messages/iterator-sink.go new file mode 100644 index 000000000..20eb66a6a --- /dev/null +++ b/backend/pkg/messages/iterator-sink.go @@ -0,0 +1,178 @@ +package messages + +import ( + "fmt" + "log" +) + +type sinkMessageIteratorImpl struct { + filter map[int]struct{} + preFilter map[int]struct{} + handler MessageHandler + autoDecode bool + version uint64 + size uint64 + canSkip bool + broken bool + messageInfo *message + batchInfo *BatchInfo + urls *pageLocations +} + +func NewSinkMessageIterator(messageHandler MessageHandler, messageFilter []int, autoDecode bool) MessageIterator { + iter := &sinkMessageIteratorImpl{ + handler: messageHandler, + autoDecode: autoDecode, + urls: NewPageLocations(), + } + if len(messageFilter) != 0 { + filter := make(map[int]struct{}, len(messageFilter)) + for _, msgType := range messageFilter { + filter[msgType] = struct{}{} + } + iter.filter = filter + } + iter.preFilter = map[int]struct{}{ + MsgBatchMetadata: {}, MsgBatchMeta: {}, MsgTimestamp: {}, + MsgSessionStart: {}, MsgSessionEnd: {}, MsgSetPageLocation: {}, + MsgSessionEndDeprecated: {}} + return iter +} + +func (i *sinkMessageIteratorImpl) prepareVars(batchInfo *BatchInfo) { + i.batchInfo = batchInfo + i.messageInfo = &message{batch: batchInfo} + i.version = 0 + i.canSkip = false + i.broken = false + i.size = 0 +} + +func (i *sinkMessageIteratorImpl) sendBatchEnd() { + i.handler(nil) +} + +func (i *sinkMessageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) { + // Create new message reader + reader := NewMessageReader(batchData) + + // Pre-decode batch data + if err := reader.Parse(); err != nil { + log.Printf("pre-decode batch err: %s, info: %s", err, batchInfo.Info()) + return + } + + // Prepare iterator before processing messages in batch + i.prepareVars(batchInfo) + + for reader.Next() { + // Increase message index (can be overwritten by batch info message) + i.messageInfo.Index++ + + msg := reader.Message() + + // Preprocess "system" messages + if _, ok := i.preFilter[msg.TypeID()]; ok { + msg = msg.Decode() + if msg == nil { + log.Printf("decode error, type: %d, info: %s", msg.TypeID(), i.batchInfo.Info()) + break + } + msg = transformDeprecated(msg) + if err := i.preprocessing(msg); err != nil { + log.Printf("message preprocessing err: %s", err) + break + } + } + + // Skip messages we don't have in filter + if i.filter != nil { + if _, ok := i.filter[msg.TypeID()]; !ok { + continue + } + } + + if i.autoDecode { + msg = msg.Decode() + if msg == nil { + log.Printf("decode error, type: %d, info: %s", msg.TypeID(), i.batchInfo.Info()) + break + } + } + + // Set meta information for message + msg.Meta().SetMeta(i.messageInfo) + + // Process message + i.handler(msg) + } + + // Inform sink about end of batch + i.sendBatchEnd() +} + +func (i *sinkMessageIteratorImpl) zeroTsLog(msgType string) { + log.Printf("zero timestamp in %s, info: %s", msgType, i.batchInfo.Info()) +} + +func (i *sinkMessageIteratorImpl) preprocessing(msg Message) error { + switch m := msg.(type) { + case *BatchMetadata: + if i.messageInfo.Index > 1 { // Might be several 0-0 BatchMeta in a row without an error though + return fmt.Errorf("batchMetadata found at the end of the batch, info: %s", i.batchInfo.Info()) + } + if m.Version > 1 { + return fmt.Errorf("incorrect batch version: %d, skip current batch, info: %s", i.version, i.batchInfo.Info()) + } + i.messageInfo.Index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha) + i.messageInfo.Timestamp = m.Timestamp + if m.Timestamp == 0 { + i.zeroTsLog("BatchMetadata") + } + i.messageInfo.Url = m.Location + i.version = m.Version + i.batchInfo.version = m.Version + + case *BatchMeta: // Is not required to be present in batch since IOS doesn't have it (though we might change it) + if i.messageInfo.Index > 1 { // Might be several 0-0 BatchMeta in a row without an error though + return fmt.Errorf("batchMeta found at the end of the batch, info: %s", i.batchInfo.Info()) + } + i.messageInfo.Index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha) + i.messageInfo.Timestamp = m.Timestamp + if m.Timestamp == 0 { + i.zeroTsLog("BatchMeta") + } + // Try to get saved session's page url + if savedURL := i.urls.Get(i.messageInfo.batch.sessionID); savedURL != "" { + i.messageInfo.Url = savedURL + } + + case *Timestamp: + i.messageInfo.Timestamp = int64(m.Timestamp) + if m.Timestamp == 0 { + i.zeroTsLog("Timestamp") + } + + case *SessionStart: + i.messageInfo.Timestamp = int64(m.Timestamp) + if m.Timestamp == 0 { + i.zeroTsLog("SessionStart") + log.Printf("zero session start, project: %d, UA: %s, tracker: %s, info: %s", + m.ProjectID, m.UserAgent, m.TrackerVersion, i.batchInfo.Info()) + } + + case *SessionEnd: + i.messageInfo.Timestamp = int64(m.Timestamp) + if m.Timestamp == 0 { + i.zeroTsLog("SessionEnd") + } + // Delete session from urls cache layer + i.urls.Delete(i.messageInfo.batch.sessionID) + + case *SetPageLocation: + i.messageInfo.Url = m.URL + // Save session page url in cache for using in next batches + i.urls.Set(i.messageInfo.batch.sessionID, m.URL) + } + return nil +}