diff --git a/backend/cmd/ender/main.go b/backend/cmd/ender/main.go index 5952aa7cc..478298110 100644 --- a/backend/cmd/ender/main.go +++ b/backend/cmd/ender/main.go @@ -19,42 +19,27 @@ import ( ) func main() { - metrics := monitoring.New("ender") - log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) - - // Load service configuration + metrics := monitoring.New("ender") cfg := ender.New() pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres, 0, 0, metrics), cfg.ProjectExpirationTimeoutMs) defer pg.Close() - // Init all modules - statsLogger := logger.NewQueueStats(cfg.LoggerTimeout) - sessions, err := sessionender.New(metrics, intervals.EVENTS_SESSION_END_TIMEOUT, cfg.PartitionsNumber) + sessions, err := sessionender.New(metrics, intervals.EVENTS_SESSION_END_TIMEOUT, cfg.PartitionsNumber, logger.NewQueueStats(cfg.LoggerTimeout)) if err != nil { log.Printf("can't init ender service: %s", err) return } + producer := queue.NewProducer(cfg.MessageSizeLimit, true) - - msgHandler := func(msg messages.Message) { - if msg.TypeID() == messages.MsgSessionStart || msg.TypeID() == messages.MsgSessionEnd { - return - } - if msg.Meta().Timestamp == 0 { - log.Printf("ZERO TS, sessID: %d, msgType: %d", msg.Meta().SessionID(), msg.TypeID()) - } - statsLogger.Collect(msg) - sessions.UpdateSession(msg) - } - consumer := queue.NewConsumer( cfg.GroupEnder, - []string{ - cfg.TopicRawWeb, - }, - messages.NewMessageIterator(msgHandler, nil, false), + []string{cfg.TopicRawWeb}, + messages.NewMessageIterator( + func(msg messages.Message) { sessions.UpdateSession(msg) }, + []int{messages.MsgTimestamp}, + false), false, cfg.MessageSizeLimit, ) diff --git a/backend/internal/sessionender/ender.go b/backend/internal/sessionender/ender.go index 07ccf19af..dbd3eb901 100644 --- a/backend/internal/sessionender/ender.go +++ b/backend/internal/sessionender/ender.go @@ -5,6 +5,7 @@ import ( "fmt" "go.opentelemetry.io/otel/metric/instrument/syncfloat64" "log" + log2 "openreplay/backend/pkg/log" "openreplay/backend/pkg/messages" "openreplay/backend/pkg/monitoring" "time" @@ -28,9 +29,10 @@ type SessionEnder struct { timeCtrl *timeController activeSessions syncfloat64.UpDownCounter totalSessions syncfloat64.Counter + stats log2.QueueStats } -func New(metrics *monitoring.Metrics, timeout int64, parts int) (*SessionEnder, error) { +func New(metrics *monitoring.Metrics, timeout int64, parts int, stats log2.QueueStats) (*SessionEnder, error) { if metrics == nil { return nil, fmt.Errorf("metrics module is empty") } @@ -49,26 +51,31 @@ func New(metrics *monitoring.Metrics, timeout int64, parts int) (*SessionEnder, timeCtrl: NewTimeController(parts), activeSessions: activeSessions, totalSessions: totalSessions, + stats: stats, }, nil } // UpdateSession save timestamp for new sessions and update for existing sessions func (se *SessionEnder) UpdateSession(msg messages.Message) { - sessionID := msg.Meta().SessionID() - currTS := msg.Meta().Batch().Timestamp() - msgTimestamp := msg.Meta().Timestamp - localTS := time.Now().UnixMilli() - if currTS == 0 { + se.stats.Collect(msg) + var ( + sessionID = msg.Meta().SessionID() + batchTimestamp = msg.Meta().Batch().Timestamp() + msgTimestamp = msg.Meta().Timestamp + localTimestamp = time.Now().UnixMilli() + ) + if batchTimestamp == 0 { log.Printf("got empty timestamp for sessionID: %d", sessionID) return } - se.timeCtrl.UpdateTime(sessionID, currTS) + se.timeCtrl.UpdateTime(sessionID, batchTimestamp) sess, ok := se.sessions[sessionID] if !ok { + // Register new session se.sessions[sessionID] = &session{ - lastTimestamp: currTS, // timestamp from message broker - lastUpdate: localTS, // local timestamp - lastUserTime: msgTimestamp, // last timestamp from user's machine + lastTimestamp: batchTimestamp, // timestamp from message broker + lastUpdate: localTimestamp, // local timestamp + lastUserTime: msgTimestamp, // last timestamp from user's machine isEnded: false, } se.activeSessions.Add(context.Background(), 1) @@ -80,9 +87,9 @@ func (se *SessionEnder) UpdateSession(msg messages.Message) { sess.lastUserTime = msgTimestamp } // Keep information about the latest message for generating sessionEnd trigger - if currTS > sess.lastTimestamp { - sess.lastTimestamp = currTS - sess.lastUpdate = localTS + if batchTimestamp > sess.lastTimestamp { + sess.lastTimestamp = batchTimestamp + sess.lastUpdate = localTimestamp sess.isEnded = false } } diff --git a/backend/pkg/log/queue.go b/backend/pkg/log/queue.go index ce1f5f764..e9ac5dc1f 100644 --- a/backend/pkg/log/queue.go +++ b/backend/pkg/log/queue.go @@ -34,6 +34,10 @@ type queueStats struct { tick <-chan time.Time } +type QueueStats interface { + Collect(msg messages.Message) +} + func NewQueueStats(sec int) *queueStats { return &queueStats{ prts: make(map[int32]*partitionStats), diff --git a/backend/pkg/messages/iterator.go b/backend/pkg/messages/iterator.go index 290cb7dc1..7694b28e0 100644 --- a/backend/pkg/messages/iterator.go +++ b/backend/pkg/messages/iterator.go @@ -143,6 +143,10 @@ func (i *messageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) { } } +func (i *messageIteratorImpl) zeroTsLog(msgType string) { + log.Printf("zero timestamp in %s, sess: %d", msgType, i.batchInfo.SessionID()) +} + func (i *messageIteratorImpl) preprocessing(msg Message) error { switch m := msg.(type) { case *BatchMetadata: @@ -154,6 +158,9 @@ func (i *messageIteratorImpl) preprocessing(msg Message) error { } i.messageInfo.Index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha) i.messageInfo.Timestamp = m.Timestamp + if m.Timestamp == 0 { + i.zeroTsLog("BatchMetadata") + } i.messageInfo.Url = m.Url i.version = m.Version @@ -163,15 +170,27 @@ func (i *messageIteratorImpl) preprocessing(msg Message) error { } i.messageInfo.Index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha) i.messageInfo.Timestamp = m.Timestamp + if m.Timestamp == 0 { + i.zeroTsLog("BatchMeta") + } case *Timestamp: i.messageInfo.Timestamp = int64(m.Timestamp) + if m.Timestamp == 0 { + i.zeroTsLog("Timestamp") + } case *SessionStart: i.messageInfo.Timestamp = int64(m.Timestamp) + if m.Timestamp == 0 { + i.zeroTsLog("SessionStart") + } case *SessionEnd: i.messageInfo.Timestamp = int64(m.Timestamp) + if m.Timestamp == 0 { + i.zeroTsLog("SessionEnd") + } case *SetPageLocation: i.messageInfo.Url = m.URL