From 861302bba7971cf92ddc746325deb4eb608e4b38 Mon Sep 17 00:00:00 2001 From: Alexander Date: Wed, 18 Jan 2023 14:43:24 +0100 Subject: [PATCH] [Ender] new message iterator (#929) * feat(backend): added new message iterator especially for ender --- backend/cmd/ender/main.go | 2 +- backend/pkg/messages/iterator-ender.go | 179 +++++++++++++++++++++++++ 2 files changed, 180 insertions(+), 1 deletion(-) create mode 100644 backend/pkg/messages/iterator-ender.go diff --git a/backend/cmd/ender/main.go b/backend/cmd/ender/main.go index be582c2fd..10644456c 100644 --- a/backend/cmd/ender/main.go +++ b/backend/cmd/ender/main.go @@ -42,7 +42,7 @@ func main() { consumer := queue.NewConsumer( cfg.GroupEnder, []string{cfg.TopicRawWeb}, - messages.NewMessageIterator( + messages.NewEnderMessageIterator( func(msg messages.Message) { sessions.UpdateSession(msg) }, []int{messages.MsgTimestamp}, false), diff --git a/backend/pkg/messages/iterator-ender.go b/backend/pkg/messages/iterator-ender.go new file mode 100644 index 000000000..15183ef51 --- /dev/null +++ b/backend/pkg/messages/iterator-ender.go @@ -0,0 +1,179 @@ +package messages + +import ( + "fmt" + "log" +) + +type enderMessageIteratorImpl struct { + filter map[int]struct{} + preFilter map[int]struct{} + handler MessageHandler + autoDecode bool + version uint64 + size uint64 + canSkip bool + broken bool + messageInfo *message + batchInfo *BatchInfo + urls *pageLocations +} + +func NewEnderMessageIterator(messageHandler MessageHandler, messageFilter []int, autoDecode bool) MessageIterator { + iter := &enderMessageIteratorImpl{ + handler: messageHandler, + autoDecode: autoDecode, + urls: NewPageLocations(), + } + if len(messageFilter) != 0 { + filter := make(map[int]struct{}, len(messageFilter)) + for _, msgType := range messageFilter { + filter[msgType] = struct{}{} + } + iter.filter = filter + } + iter.preFilter = map[int]struct{}{ + MsgBatchMetadata: {}, MsgBatchMeta: {}, MsgTimestamp: {}, + MsgSessionStart: {}, MsgSessionEnd: {}, MsgSetPageLocation: {}, + MsgSessionEndDeprecated: {}} + return iter +} + +func (i *enderMessageIteratorImpl) prepareVars(batchInfo *BatchInfo) { + i.batchInfo = batchInfo + i.messageInfo = &message{batch: batchInfo} + i.version = 0 + i.canSkip = false + i.broken = false + i.size = 0 +} + +func (i *enderMessageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) { + // Create new message reader + reader := NewMessageReader(batchData) + + // Pre-decode batch data + if err := reader.Parse(); err != nil { + log.Printf("pre-decode batch err: %s, info: %s", err, batchInfo.Info()) + return + } + + // Prepare iterator before processing messages in batch + i.prepareVars(batchInfo) + + // Store last timestamp message here + var lastMessage Message + + for reader.Next() { + // Increase message index (can be overwritten by batch info message) + i.messageInfo.Index++ + + msg := reader.Message() + + // Preprocess "system" messages + if _, ok := i.preFilter[msg.TypeID()]; ok { + msg = msg.Decode() + if msg == nil { + log.Printf("decode error, type: %d, info: %s", msg.TypeID(), i.batchInfo.Info()) + return + } + msg = transformDeprecated(msg) + if err := i.preprocessing(msg); err != nil { + log.Printf("message preprocessing err: %s", err) + return + } + } + + // Skip messages we don't have in filter + if i.filter != nil { + if _, ok := i.filter[msg.TypeID()]; !ok { + continue + } + } + + if i.autoDecode { + msg = msg.Decode() + if msg == nil { + log.Printf("decode error, type: %d, info: %s", msg.TypeID(), i.batchInfo.Info()) + return + } + } + + // Set meta information for message + msg.Meta().SetMeta(i.messageInfo) + + // Update last timestamp message + lastMessage = msg + } + + if lastMessage != nil { + i.handler(lastMessage) + } + +} + +func (i *enderMessageIteratorImpl) zeroTsLog(msgType string) { + log.Printf("zero timestamp in %s, info: %s", msgType, i.batchInfo.Info()) +} + +func (i *enderMessageIteratorImpl) preprocessing(msg Message) error { + switch m := msg.(type) { + case *BatchMetadata: + if i.messageInfo.Index > 1 { // Might be several 0-0 BatchMeta in a row without an error though + return fmt.Errorf("batchMetadata found at the end of the batch, info: %s", i.batchInfo.Info()) + } + if m.Version > 1 { + return fmt.Errorf("incorrect batch version: %d, skip current batch, info: %s", i.version, i.batchInfo.Info()) + } + i.messageInfo.Index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha) + i.messageInfo.Timestamp = m.Timestamp + if m.Timestamp == 0 { + i.zeroTsLog("BatchMetadata") + } + i.messageInfo.Url = m.Location + i.version = m.Version + i.batchInfo.version = m.Version + + case *BatchMeta: // Is not required to be present in batch since IOS doesn't have it (though we might change it) + if i.messageInfo.Index > 1 { // Might be several 0-0 BatchMeta in a row without an error though + return fmt.Errorf("batchMeta found at the end of the batch, info: %s", i.batchInfo.Info()) + } + i.messageInfo.Index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha) + i.messageInfo.Timestamp = m.Timestamp + if m.Timestamp == 0 { + i.zeroTsLog("BatchMeta") + } + // Try to get saved session's page url + if savedURL := i.urls.Get(i.messageInfo.batch.sessionID); savedURL != "" { + i.messageInfo.Url = savedURL + } + + case *Timestamp: + i.messageInfo.Timestamp = int64(m.Timestamp) + if m.Timestamp == 0 { + i.zeroTsLog("Timestamp") + } + + case *SessionStart: + i.messageInfo.Timestamp = int64(m.Timestamp) + if m.Timestamp == 0 { + i.zeroTsLog("SessionStart") + log.Printf("zero session start, project: %d, UA: %s, tracker: %s, info: %s", + m.ProjectID, m.UserAgent, m.TrackerVersion, i.batchInfo.Info()) + } + + case *SessionEnd: + i.messageInfo.Timestamp = int64(m.Timestamp) + if m.Timestamp == 0 { + i.zeroTsLog("SessionEnd") + } + // Delete session from urls cache layer + i.urls.Delete(i.messageInfo.batch.sessionID) + + case *SetPageLocation: + i.messageInfo.Url = m.URL + // Save session page url in cache for using in next batches + i.urls.Set(i.messageInfo.batch.sessionID, m.URL) + } + return nil +}