Improved ender (#537)

* feat(backend/ender): using producer timestamp for session end detection

* feat(backend/ender): added timeControl module

Co-authored-by: Alexander Zavorotynskiy <alexander@openreplay.com>
This commit is contained in:
Alexander 2022-06-15 10:49:32 +02:00 committed by GitHub
parent 911736f772
commit 883a6f6909
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 53 additions and 21 deletions

View file

@ -49,7 +49,8 @@ ENV TZ=UTC \
ASSETS_SIZE_LIMIT=6291456 \
FS_CLEAN_HRS=72 \
FILE_SPLIT_SIZE=300000 \
LOG_QUEUE_STATS_INTERVAL_SEC=60
LOG_QUEUE_STATS_INTERVAL_SEC=60 \
PARTITIONS_NUMBER=1
ARG SERVICE_NAME

View file

@ -32,7 +32,7 @@ func main() {
// Init all modules
statsLogger := logger.NewQueueStats(cfg.LoggerTimeout)
sessions, err := sessionender.New(metrics, intervals.EVENTS_SESSION_END_TIMEOUT)
sessions, err := sessionender.New(metrics, intervals.EVENTS_SESSION_END_TIMEOUT, cfg.PartitionsNumber)
if err != nil {
log.Printf("can't init ender service: %s", err)
return
@ -46,7 +46,7 @@ func main() {
},
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
statsLogger.Collect(sessionID, meta)
sessions.UpdateSession(sessionID, messages.GetTimestamp(msg))
sessions.UpdateSession(sessionID, meta.Timestamp)
},
false,
)

View file

@ -5,21 +5,23 @@ import (
)
type Config struct {
GroupEnder string
TopicTrigger string
LoggerTimeout int
TopicRawWeb string
TopicRawIOS string
ProducerTimeout int
GroupEnder string
TopicTrigger string
LoggerTimeout int
TopicRawWeb string
TopicRawIOS string
ProducerTimeout int
PartitionsNumber int
}
func New() *Config {
return &Config{
GroupEnder: env.String("GROUP_ENDER"),
TopicTrigger: env.String("TOPIC_TRIGGER"),
LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"),
TopicRawWeb: env.String("TOPIC_RAW_WEB"),
TopicRawIOS: env.String("TOPIC_RAW_IOS"),
ProducerTimeout: 2000,
GroupEnder: env.String("GROUP_ENDER"),
TopicTrigger: env.String("TOPIC_TRIGGER"),
LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"),
TopicRawWeb: env.String("TOPIC_RAW_WEB"),
TopicRawIOS: env.String("TOPIC_RAW_IOS"),
ProducerTimeout: 2000,
PartitionsNumber: env.Int("PARTITIONS_NUMBER"),
}
}

View file

@ -15,6 +15,7 @@ type EndedSessionHandler func(sessionID uint64, timestamp int64) bool
// session holds information about user's session live status
type session struct {
lastTimestamp int64
lastUpdate int64
isEnded bool
}
@ -22,11 +23,12 @@ type session struct {
type SessionEnder struct {
timeout int64
sessions map[uint64]*session // map[sessionID]session
timeCtrl *timeController
activeSessions syncfloat64.UpDownCounter
totalSessions syncfloat64.Counter
}
func New(metrics *monitoring.Metrics, timeout int64) (*SessionEnder, error) {
func New(metrics *monitoring.Metrics, timeout int64, parts int) (*SessionEnder, error) {
if metrics == nil {
return nil, fmt.Errorf("metrics module is empty")
}
@ -42,22 +44,26 @@ func New(metrics *monitoring.Metrics, timeout int64) (*SessionEnder, error) {
return &SessionEnder{
timeout: timeout,
sessions: make(map[uint64]*session),
timeCtrl: NewTimeController(parts),
activeSessions: activeSessions,
totalSessions: totalSessions,
}, nil
}
// UpdateSession save timestamp for new sessions and update for existing sessions
func (se *SessionEnder) UpdateSession(sessionID, timestamp uint64) {
currTS := int64(timestamp)
func (se *SessionEnder) UpdateSession(sessionID uint64, timestamp int64) {
localTS := time.Now().UnixMilli()
currTS := timestamp
if currTS == 0 {
log.Printf("got empty timestamp for sessionID: %d", sessionID)
return
}
se.timeCtrl.UpdateTime(sessionID, currTS)
sess, ok := se.sessions[sessionID]
if !ok {
se.sessions[sessionID] = &session{
lastTimestamp: currTS,
lastTimestamp: currTS, // timestamp from message broker
lastUpdate: localTS, // local timestamp
isEnded: false,
}
log.Printf("added new session: %d", sessionID)
@ -67,16 +73,18 @@ func (se *SessionEnder) UpdateSession(sessionID, timestamp uint64) {
}
if currTS > sess.lastTimestamp {
sess.lastTimestamp = currTS
sess.lastUpdate = localTS
sess.isEnded = false
}
}
// HandleEndedSessions runs handler for each ended session and delete information about session in successful case
func (se *SessionEnder) HandleEndedSessions(handler EndedSessionHandler) {
deadLine := time.Now().UnixMilli() - se.timeout
currTime := time.Now().UnixMilli()
allSessions, removedSessions := len(se.sessions), 0
for sessID, sess := range se.sessions {
if sess.isEnded || sess.lastTimestamp < deadLine {
if sess.isEnded || (se.timeCtrl.LastTimestamp(sessID)-sess.lastTimestamp > se.timeout) ||
(currTime-sess.lastUpdate > se.timeout) {
sess.isEnded = true
if handler(sessID, sess.lastTimestamp) {
delete(se.sessions, sessID)

View file

@ -0,0 +1,21 @@
package sessionender
type timeController struct {
parts uint64
lastTimestamp map[uint64]int64 // map[partition]consumerTimeOfLastMessage
}
func NewTimeController(parts int) *timeController {
return &timeController{
parts: uint64(parts),
lastTimestamp: make(map[uint64]int64),
}
}
func (tc *timeController) UpdateTime(sessionID uint64, timestamp int64) {
tc.lastTimestamp[sessionID%tc.parts] = timestamp
}
func (tc *timeController) LastTimestamp(sessionID uint64) int64 {
return tc.lastTimestamp[sessionID%tc.parts]
}