Speed up Ender (#762)
* feat(backend): process only Timestamp messages in ender's messageHandler * feat(backend): moved stats logger to session ender
This commit is contained in:
parent
041a7f7bbf
commit
617c78f224
4 changed files with 51 additions and 36 deletions
|
|
@ -19,42 +19,27 @@ import (
|
|||
)
|
||||
|
||||
func main() {
|
||||
metrics := monitoring.New("ender")
|
||||
|
||||
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
|
||||
|
||||
// Load service configuration
|
||||
metrics := monitoring.New("ender")
|
||||
cfg := ender.New()
|
||||
|
||||
pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres, 0, 0, metrics), cfg.ProjectExpirationTimeoutMs)
|
||||
defer pg.Close()
|
||||
|
||||
// Init all modules
|
||||
statsLogger := logger.NewQueueStats(cfg.LoggerTimeout)
|
||||
sessions, err := sessionender.New(metrics, intervals.EVENTS_SESSION_END_TIMEOUT, cfg.PartitionsNumber)
|
||||
sessions, err := sessionender.New(metrics, intervals.EVENTS_SESSION_END_TIMEOUT, cfg.PartitionsNumber, logger.NewQueueStats(cfg.LoggerTimeout))
|
||||
if err != nil {
|
||||
log.Printf("can't init ender service: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
producer := queue.NewProducer(cfg.MessageSizeLimit, true)
|
||||
|
||||
msgHandler := func(msg messages.Message) {
|
||||
if msg.TypeID() == messages.MsgSessionStart || msg.TypeID() == messages.MsgSessionEnd {
|
||||
return
|
||||
}
|
||||
if msg.Meta().Timestamp == 0 {
|
||||
log.Printf("ZERO TS, sessID: %d, msgType: %d", msg.Meta().SessionID(), msg.TypeID())
|
||||
}
|
||||
statsLogger.Collect(msg)
|
||||
sessions.UpdateSession(msg)
|
||||
}
|
||||
|
||||
consumer := queue.NewConsumer(
|
||||
cfg.GroupEnder,
|
||||
[]string{
|
||||
cfg.TopicRawWeb,
|
||||
},
|
||||
messages.NewMessageIterator(msgHandler, nil, false),
|
||||
[]string{cfg.TopicRawWeb},
|
||||
messages.NewMessageIterator(
|
||||
func(msg messages.Message) { sessions.UpdateSession(msg) },
|
||||
[]int{messages.MsgTimestamp},
|
||||
false),
|
||||
false,
|
||||
cfg.MessageSizeLimit,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
|
||||
"log"
|
||||
log2 "openreplay/backend/pkg/log"
|
||||
"openreplay/backend/pkg/messages"
|
||||
"openreplay/backend/pkg/monitoring"
|
||||
"time"
|
||||
|
|
@ -28,9 +29,10 @@ type SessionEnder struct {
|
|||
timeCtrl *timeController
|
||||
activeSessions syncfloat64.UpDownCounter
|
||||
totalSessions syncfloat64.Counter
|
||||
stats log2.QueueStats
|
||||
}
|
||||
|
||||
func New(metrics *monitoring.Metrics, timeout int64, parts int) (*SessionEnder, error) {
|
||||
func New(metrics *monitoring.Metrics, timeout int64, parts int, stats log2.QueueStats) (*SessionEnder, error) {
|
||||
if metrics == nil {
|
||||
return nil, fmt.Errorf("metrics module is empty")
|
||||
}
|
||||
|
|
@ -49,26 +51,31 @@ func New(metrics *monitoring.Metrics, timeout int64, parts int) (*SessionEnder,
|
|||
timeCtrl: NewTimeController(parts),
|
||||
activeSessions: activeSessions,
|
||||
totalSessions: totalSessions,
|
||||
stats: stats,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// UpdateSession save timestamp for new sessions and update for existing sessions
|
||||
func (se *SessionEnder) UpdateSession(msg messages.Message) {
|
||||
sessionID := msg.Meta().SessionID()
|
||||
currTS := msg.Meta().Batch().Timestamp()
|
||||
msgTimestamp := msg.Meta().Timestamp
|
||||
localTS := time.Now().UnixMilli()
|
||||
if currTS == 0 {
|
||||
se.stats.Collect(msg)
|
||||
var (
|
||||
sessionID = msg.Meta().SessionID()
|
||||
batchTimestamp = msg.Meta().Batch().Timestamp()
|
||||
msgTimestamp = msg.Meta().Timestamp
|
||||
localTimestamp = time.Now().UnixMilli()
|
||||
)
|
||||
if batchTimestamp == 0 {
|
||||
log.Printf("got empty timestamp for sessionID: %d", sessionID)
|
||||
return
|
||||
}
|
||||
se.timeCtrl.UpdateTime(sessionID, currTS)
|
||||
se.timeCtrl.UpdateTime(sessionID, batchTimestamp)
|
||||
sess, ok := se.sessions[sessionID]
|
||||
if !ok {
|
||||
// Register new session
|
||||
se.sessions[sessionID] = &session{
|
||||
lastTimestamp: currTS, // timestamp from message broker
|
||||
lastUpdate: localTS, // local timestamp
|
||||
lastUserTime: msgTimestamp, // last timestamp from user's machine
|
||||
lastTimestamp: batchTimestamp, // timestamp from message broker
|
||||
lastUpdate: localTimestamp, // local timestamp
|
||||
lastUserTime: msgTimestamp, // last timestamp from user's machine
|
||||
isEnded: false,
|
||||
}
|
||||
se.activeSessions.Add(context.Background(), 1)
|
||||
|
|
@ -80,9 +87,9 @@ func (se *SessionEnder) UpdateSession(msg messages.Message) {
|
|||
sess.lastUserTime = msgTimestamp
|
||||
}
|
||||
// Keep information about the latest message for generating sessionEnd trigger
|
||||
if currTS > sess.lastTimestamp {
|
||||
sess.lastTimestamp = currTS
|
||||
sess.lastUpdate = localTS
|
||||
if batchTimestamp > sess.lastTimestamp {
|
||||
sess.lastTimestamp = batchTimestamp
|
||||
sess.lastUpdate = localTimestamp
|
||||
sess.isEnded = false
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -34,6 +34,10 @@ type queueStats struct {
|
|||
tick <-chan time.Time
|
||||
}
|
||||
|
||||
type QueueStats interface {
|
||||
Collect(msg messages.Message)
|
||||
}
|
||||
|
||||
func NewQueueStats(sec int) *queueStats {
|
||||
return &queueStats{
|
||||
prts: make(map[int32]*partitionStats),
|
||||
|
|
|
|||
|
|
@ -143,6 +143,10 @@ func (i *messageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
func (i *messageIteratorImpl) zeroTsLog(msgType string) {
|
||||
log.Printf("zero timestamp in %s, sess: %d", msgType, i.batchInfo.SessionID())
|
||||
}
|
||||
|
||||
func (i *messageIteratorImpl) preprocessing(msg Message) error {
|
||||
switch m := msg.(type) {
|
||||
case *BatchMetadata:
|
||||
|
|
@ -154,6 +158,9 @@ func (i *messageIteratorImpl) preprocessing(msg Message) error {
|
|||
}
|
||||
i.messageInfo.Index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha)
|
||||
i.messageInfo.Timestamp = m.Timestamp
|
||||
if m.Timestamp == 0 {
|
||||
i.zeroTsLog("BatchMetadata")
|
||||
}
|
||||
i.messageInfo.Url = m.Url
|
||||
i.version = m.Version
|
||||
|
||||
|
|
@ -163,15 +170,27 @@ func (i *messageIteratorImpl) preprocessing(msg Message) error {
|
|||
}
|
||||
i.messageInfo.Index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha)
|
||||
i.messageInfo.Timestamp = m.Timestamp
|
||||
if m.Timestamp == 0 {
|
||||
i.zeroTsLog("BatchMeta")
|
||||
}
|
||||
|
||||
case *Timestamp:
|
||||
i.messageInfo.Timestamp = int64(m.Timestamp)
|
||||
if m.Timestamp == 0 {
|
||||
i.zeroTsLog("Timestamp")
|
||||
}
|
||||
|
||||
case *SessionStart:
|
||||
i.messageInfo.Timestamp = int64(m.Timestamp)
|
||||
if m.Timestamp == 0 {
|
||||
i.zeroTsLog("SessionStart")
|
||||
}
|
||||
|
||||
case *SessionEnd:
|
||||
i.messageInfo.Timestamp = int64(m.Timestamp)
|
||||
if m.Timestamp == 0 {
|
||||
i.zeroTsLog("SessionEnd")
|
||||
}
|
||||
|
||||
case *SetPageLocation:
|
||||
i.messageInfo.Url = m.URL
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue