diff --git a/backend/Dockerfile b/backend/Dockerfile index cae1bf81f..08b7d871c 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -49,7 +49,8 @@ ENV TZ=UTC \ ASSETS_SIZE_LIMIT=6291456 \ FS_CLEAN_HRS=72 \ FILE_SPLIT_SIZE=300000 \ - LOG_QUEUE_STATS_INTERVAL_SEC=60 + LOG_QUEUE_STATS_INTERVAL_SEC=60 \ + PARTITIONS_NUMBER=1 ARG SERVICE_NAME diff --git a/backend/cmd/ender/main.go b/backend/cmd/ender/main.go index 55b01219b..825046e3d 100644 --- a/backend/cmd/ender/main.go +++ b/backend/cmd/ender/main.go @@ -32,7 +32,7 @@ func main() { // Init all modules statsLogger := logger.NewQueueStats(cfg.LoggerTimeout) - sessions, err := sessionender.New(metrics, intervals.EVENTS_SESSION_END_TIMEOUT) + sessions, err := sessionender.New(metrics, intervals.EVENTS_SESSION_END_TIMEOUT, cfg.PartitionsNumber) if err != nil { log.Printf("can't init ender service: %s", err) return @@ -46,7 +46,7 @@ func main() { }, func(sessionID uint64, msg messages.Message, meta *types.Meta) { statsLogger.Collect(sessionID, meta) - sessions.UpdateSession(sessionID, messages.GetTimestamp(msg)) + sessions.UpdateSession(sessionID, meta.Timestamp) }, false, ) diff --git a/backend/internal/config/ender/config.go b/backend/internal/config/ender/config.go index 81457ad0e..4aec62c19 100644 --- a/backend/internal/config/ender/config.go +++ b/backend/internal/config/ender/config.go @@ -5,21 +5,23 @@ import ( ) type Config struct { - GroupEnder string - TopicTrigger string - LoggerTimeout int - TopicRawWeb string - TopicRawIOS string - ProducerTimeout int + GroupEnder string + TopicTrigger string + LoggerTimeout int + TopicRawWeb string + TopicRawIOS string + ProducerTimeout int + PartitionsNumber int } func New() *Config { return &Config{ - GroupEnder: env.String("GROUP_ENDER"), - TopicTrigger: env.String("TOPIC_TRIGGER"), - LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"), - TopicRawWeb: env.String("TOPIC_RAW_WEB"), - TopicRawIOS: env.String("TOPIC_RAW_IOS"), - ProducerTimeout: 2000, + GroupEnder: env.String("GROUP_ENDER"), + TopicTrigger: env.String("TOPIC_TRIGGER"), + LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"), + TopicRawWeb: env.String("TOPIC_RAW_WEB"), + TopicRawIOS: env.String("TOPIC_RAW_IOS"), + ProducerTimeout: 2000, + PartitionsNumber: env.Int("PARTITIONS_NUMBER"), } } diff --git a/backend/internal/sessionender/ender.go b/backend/internal/sessionender/ender.go index 4a3f09d55..107618f59 100644 --- a/backend/internal/sessionender/ender.go +++ b/backend/internal/sessionender/ender.go @@ -15,6 +15,7 @@ type EndedSessionHandler func(sessionID uint64, timestamp int64) bool // session holds information about user's session live status type session struct { lastTimestamp int64 + lastUpdate int64 isEnded bool } @@ -22,11 +23,12 @@ type session struct { type SessionEnder struct { timeout int64 sessions map[uint64]*session // map[sessionID]session + timeCtrl *timeController activeSessions syncfloat64.UpDownCounter totalSessions syncfloat64.Counter } -func New(metrics *monitoring.Metrics, timeout int64) (*SessionEnder, error) { +func New(metrics *monitoring.Metrics, timeout int64, parts int) (*SessionEnder, error) { if metrics == nil { return nil, fmt.Errorf("metrics module is empty") } @@ -42,22 +44,26 @@ func New(metrics *monitoring.Metrics, timeout int64) (*SessionEnder, error) { return &SessionEnder{ timeout: timeout, sessions: make(map[uint64]*session), + timeCtrl: NewTimeController(parts), activeSessions: activeSessions, totalSessions: totalSessions, }, nil } // UpdateSession save timestamp for new sessions and update for existing sessions -func (se *SessionEnder) UpdateSession(sessionID, timestamp uint64) { - currTS := int64(timestamp) +func (se *SessionEnder) UpdateSession(sessionID uint64, timestamp int64) { + localTS := time.Now().UnixMilli() + currTS := timestamp if currTS == 0 { log.Printf("got empty timestamp for sessionID: %d", sessionID) return } + se.timeCtrl.UpdateTime(sessionID, currTS) sess, ok := se.sessions[sessionID] if !ok { se.sessions[sessionID] = &session{ - lastTimestamp: currTS, + lastTimestamp: currTS, // timestamp from message broker + lastUpdate: localTS, // local timestamp isEnded: false, } log.Printf("added new session: %d", sessionID) @@ -67,16 +73,18 @@ func (se *SessionEnder) UpdateSession(sessionID, timestamp uint64) { } if currTS > sess.lastTimestamp { sess.lastTimestamp = currTS + sess.lastUpdate = localTS sess.isEnded = false } } // HandleEndedSessions runs handler for each ended session and delete information about session in successful case func (se *SessionEnder) HandleEndedSessions(handler EndedSessionHandler) { - deadLine := time.Now().UnixMilli() - se.timeout + currTime := time.Now().UnixMilli() allSessions, removedSessions := len(se.sessions), 0 for sessID, sess := range se.sessions { - if sess.isEnded || sess.lastTimestamp < deadLine { + if sess.isEnded || (se.timeCtrl.LastTimestamp(sessID)-sess.lastTimestamp > se.timeout) || + (currTime-sess.lastUpdate > se.timeout) { sess.isEnded = true if handler(sessID, sess.lastTimestamp) { delete(se.sessions, sessID) diff --git a/backend/internal/sessionender/timecontroller.go b/backend/internal/sessionender/timecontroller.go new file mode 100644 index 000000000..0126692c7 --- /dev/null +++ b/backend/internal/sessionender/timecontroller.go @@ -0,0 +1,21 @@ +package sessionender + +type timeController struct { + parts uint64 + lastTimestamp map[uint64]int64 // map[partition]consumerTimeOfLastMessage +} + +func NewTimeController(parts int) *timeController { + return &timeController{ + parts: uint64(parts), + lastTimestamp: make(map[uint64]int64), + } +} + +func (tc *timeController) UpdateTime(sessionID uint64, timestamp int64) { + tc.lastTimestamp[sessionID%tc.parts] = timestamp +} + +func (tc *timeController) LastTimestamp(sessionID uint64) int64 { + return tc.lastTimestamp[sessionID%tc.parts] +}