[Ender] new message iterator (#929)
* feat(backend): added new message iterator especially for ender
This commit is contained in:
parent
da7d2038ef
commit
861302bba7
2 changed files with 180 additions and 1 deletions
|
|
@ -42,7 +42,7 @@ func main() {
|
|||
consumer := queue.NewConsumer(
|
||||
cfg.GroupEnder,
|
||||
[]string{cfg.TopicRawWeb},
|
||||
messages.NewMessageIterator(
|
||||
messages.NewEnderMessageIterator(
|
||||
func(msg messages.Message) { sessions.UpdateSession(msg) },
|
||||
[]int{messages.MsgTimestamp},
|
||||
false),
|
||||
|
|
|
|||
179
backend/pkg/messages/iterator-ender.go
Normal file
179
backend/pkg/messages/iterator-ender.go
Normal file
|
|
@ -0,0 +1,179 @@
|
|||
package messages
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
)
|
||||
|
||||
type enderMessageIteratorImpl struct {
|
||||
filter map[int]struct{}
|
||||
preFilter map[int]struct{}
|
||||
handler MessageHandler
|
||||
autoDecode bool
|
||||
version uint64
|
||||
size uint64
|
||||
canSkip bool
|
||||
broken bool
|
||||
messageInfo *message
|
||||
batchInfo *BatchInfo
|
||||
urls *pageLocations
|
||||
}
|
||||
|
||||
func NewEnderMessageIterator(messageHandler MessageHandler, messageFilter []int, autoDecode bool) MessageIterator {
|
||||
iter := &enderMessageIteratorImpl{
|
||||
handler: messageHandler,
|
||||
autoDecode: autoDecode,
|
||||
urls: NewPageLocations(),
|
||||
}
|
||||
if len(messageFilter) != 0 {
|
||||
filter := make(map[int]struct{}, len(messageFilter))
|
||||
for _, msgType := range messageFilter {
|
||||
filter[msgType] = struct{}{}
|
||||
}
|
||||
iter.filter = filter
|
||||
}
|
||||
iter.preFilter = map[int]struct{}{
|
||||
MsgBatchMetadata: {}, MsgBatchMeta: {}, MsgTimestamp: {},
|
||||
MsgSessionStart: {}, MsgSessionEnd: {}, MsgSetPageLocation: {},
|
||||
MsgSessionEndDeprecated: {}}
|
||||
return iter
|
||||
}
|
||||
|
||||
func (i *enderMessageIteratorImpl) prepareVars(batchInfo *BatchInfo) {
|
||||
i.batchInfo = batchInfo
|
||||
i.messageInfo = &message{batch: batchInfo}
|
||||
i.version = 0
|
||||
i.canSkip = false
|
||||
i.broken = false
|
||||
i.size = 0
|
||||
}
|
||||
|
||||
func (i *enderMessageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) {
|
||||
// Create new message reader
|
||||
reader := NewMessageReader(batchData)
|
||||
|
||||
// Pre-decode batch data
|
||||
if err := reader.Parse(); err != nil {
|
||||
log.Printf("pre-decode batch err: %s, info: %s", err, batchInfo.Info())
|
||||
return
|
||||
}
|
||||
|
||||
// Prepare iterator before processing messages in batch
|
||||
i.prepareVars(batchInfo)
|
||||
|
||||
// Store last timestamp message here
|
||||
var lastMessage Message
|
||||
|
||||
for reader.Next() {
|
||||
// Increase message index (can be overwritten by batch info message)
|
||||
i.messageInfo.Index++
|
||||
|
||||
msg := reader.Message()
|
||||
|
||||
// Preprocess "system" messages
|
||||
if _, ok := i.preFilter[msg.TypeID()]; ok {
|
||||
msg = msg.Decode()
|
||||
if msg == nil {
|
||||
log.Printf("decode error, type: %d, info: %s", msg.TypeID(), i.batchInfo.Info())
|
||||
return
|
||||
}
|
||||
msg = transformDeprecated(msg)
|
||||
if err := i.preprocessing(msg); err != nil {
|
||||
log.Printf("message preprocessing err: %s", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Skip messages we don't have in filter
|
||||
if i.filter != nil {
|
||||
if _, ok := i.filter[msg.TypeID()]; !ok {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if i.autoDecode {
|
||||
msg = msg.Decode()
|
||||
if msg == nil {
|
||||
log.Printf("decode error, type: %d, info: %s", msg.TypeID(), i.batchInfo.Info())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Set meta information for message
|
||||
msg.Meta().SetMeta(i.messageInfo)
|
||||
|
||||
// Update last timestamp message
|
||||
lastMessage = msg
|
||||
}
|
||||
|
||||
if lastMessage != nil {
|
||||
i.handler(lastMessage)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (i *enderMessageIteratorImpl) zeroTsLog(msgType string) {
|
||||
log.Printf("zero timestamp in %s, info: %s", msgType, i.batchInfo.Info())
|
||||
}
|
||||
|
||||
func (i *enderMessageIteratorImpl) preprocessing(msg Message) error {
|
||||
switch m := msg.(type) {
|
||||
case *BatchMetadata:
|
||||
if i.messageInfo.Index > 1 { // Might be several 0-0 BatchMeta in a row without an error though
|
||||
return fmt.Errorf("batchMetadata found at the end of the batch, info: %s", i.batchInfo.Info())
|
||||
}
|
||||
if m.Version > 1 {
|
||||
return fmt.Errorf("incorrect batch version: %d, skip current batch, info: %s", i.version, i.batchInfo.Info())
|
||||
}
|
||||
i.messageInfo.Index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha)
|
||||
i.messageInfo.Timestamp = m.Timestamp
|
||||
if m.Timestamp == 0 {
|
||||
i.zeroTsLog("BatchMetadata")
|
||||
}
|
||||
i.messageInfo.Url = m.Location
|
||||
i.version = m.Version
|
||||
i.batchInfo.version = m.Version
|
||||
|
||||
case *BatchMeta: // Is not required to be present in batch since IOS doesn't have it (though we might change it)
|
||||
if i.messageInfo.Index > 1 { // Might be several 0-0 BatchMeta in a row without an error though
|
||||
return fmt.Errorf("batchMeta found at the end of the batch, info: %s", i.batchInfo.Info())
|
||||
}
|
||||
i.messageInfo.Index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha)
|
||||
i.messageInfo.Timestamp = m.Timestamp
|
||||
if m.Timestamp == 0 {
|
||||
i.zeroTsLog("BatchMeta")
|
||||
}
|
||||
// Try to get saved session's page url
|
||||
if savedURL := i.urls.Get(i.messageInfo.batch.sessionID); savedURL != "" {
|
||||
i.messageInfo.Url = savedURL
|
||||
}
|
||||
|
||||
case *Timestamp:
|
||||
i.messageInfo.Timestamp = int64(m.Timestamp)
|
||||
if m.Timestamp == 0 {
|
||||
i.zeroTsLog("Timestamp")
|
||||
}
|
||||
|
||||
case *SessionStart:
|
||||
i.messageInfo.Timestamp = int64(m.Timestamp)
|
||||
if m.Timestamp == 0 {
|
||||
i.zeroTsLog("SessionStart")
|
||||
log.Printf("zero session start, project: %d, UA: %s, tracker: %s, info: %s",
|
||||
m.ProjectID, m.UserAgent, m.TrackerVersion, i.batchInfo.Info())
|
||||
}
|
||||
|
||||
case *SessionEnd:
|
||||
i.messageInfo.Timestamp = int64(m.Timestamp)
|
||||
if m.Timestamp == 0 {
|
||||
i.zeroTsLog("SessionEnd")
|
||||
}
|
||||
// Delete session from urls cache layer
|
||||
i.urls.Delete(i.messageInfo.batch.sessionID)
|
||||
|
||||
case *SetPageLocation:
|
||||
i.messageInfo.Url = m.URL
|
||||
// Save session page url in cache for using in next batches
|
||||
i.urls.Set(i.messageInfo.batch.sessionID, m.URL)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue