Finished refactoring for session ender service

This commit is contained in:
Alexander Zavorotynskiy 2022-05-06 12:16:24 +02:00
parent f4212d6eaa
commit 2b3728d8da
4 changed files with 83 additions and 101 deletions

View file

@ -3,7 +3,7 @@ package main
import (
"log"
"openreplay/backend/internal/config/ender"
builder "openreplay/backend/internal/ender"
"openreplay/backend/internal/sessionender"
"time"
"os"
@ -20,10 +20,12 @@ import (
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
// Load service configuration
cfg := ender.New()
builderMap := builder.NewBuilderMap()
// Init all modules
statsLogger := logger.NewQueueStats(cfg.LoggerTimeout)
sessions := sessionender.New(intervals.EVENTS_SESSION_END_TIMEOUT)
producer := queue.NewProducer()
consumer := queue.NewMessageConsumer(
cfg.GroupEvents,
@ -33,31 +35,41 @@ func main() {
},
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
statsLogger.Collect(sessionID, meta)
builderMap.HandleMessage(sessionID, msg, msg.Meta().Index)
sessions.UpdateSession(sessionID, messages.GetTimestamp(msg))
},
false,
)
tick := time.Tick(intervals.EVENTS_COMMIT_INTERVAL * time.Millisecond)
log.Printf("Ender service started\n")
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
log.Printf("Ender service started\n")
tick := time.Tick(intervals.EVENTS_COMMIT_INTERVAL * time.Millisecond)
for {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
producer.Close(cfg.ProducerTimeout)
consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP)
if err := consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP); err != nil {
log.Printf("can't commit messages with offset: %s", err)
}
consumer.Close()
os.Exit(0)
case <-tick:
builderMap.IterateReadyMessages(time.Now().UnixMilli(), func(sessionID uint64, readyMsg messages.Message) {
producer.Produce(cfg.TopicTrigger, sessionID, messages.Encode(readyMsg))
// Find ended sessions and send notification to other services
sessions.HandleEndedSessions(func(sessionID uint64, timestamp int64) bool {
msg := &messages.SessionEnd{Timestamp: uint64(timestamp)}
if err := producer.Produce(cfg.TopicTrigger, sessionID, messages.Encode(msg)); err != nil {
log.Printf("can't send message to queue: %s", err)
return false
}
return true
})
producer.Flush(cfg.ProducerTimeout)
consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP)
if err := consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP); err != nil {
log.Printf("can't commit messages with offset: %s", err)
}
default:
if err := consumer.ConsumeNext(); err != nil {
log.Fatalf("Error on consuming: %v", err)

View file

@ -1,56 +0,0 @@
package builder
import (
"log"
"openreplay/backend/pkg/intervals"
. "openreplay/backend/pkg/messages"
)
type builder struct {
readyMsgs []Message
timestamp uint64
sid uint64
}
func NewBuilder() *builder {
return &builder{}
}
func (b *builder) appendReadyMessage(msg Message) { // interface is never nil even if it holds nil value
b.readyMsgs = append(b.readyMsgs, msg)
}
func (b *builder) buildSessionEnd() {
if b.timestamp == 0 {
return
}
sessionEnd := &SessionEnd{
Timestamp: b.timestamp,
}
b.appendReadyMessage(sessionEnd)
}
func (b *builder) handleMessage(message Message, messageID uint64) {
timestamp := GetTimestamp(message)
if b.timestamp < timestamp {
b.timestamp = timestamp
}
if b.timestamp == 0 {
log.Printf("Empty timestamp, sessionID: %d, messageID: %d", b.sid, messageID)
return
}
}
func (b *builder) checkTimeouts(ts int64) bool {
if b.timestamp == 0 {
return false // There was no timestamp events yet
}
lastTsGap := ts - int64(b.timestamp)
if lastTsGap > intervals.EVENTS_SESSION_END_TIMEOUT {
b.buildSessionEnd()
return true
}
return false
}

View file

@ -1,36 +0,0 @@
package builder
import (
. "openreplay/backend/pkg/messages"
)
type builderMap map[uint64]*builder
func NewBuilderMap() builderMap {
return make(builderMap)
}
func (m builderMap) GetBuilder(sessionID uint64) *builder {
b := m[sessionID]
if b == nil {
b = NewBuilder()
m[sessionID] = b
b.sid = sessionID
}
return b
}
func (m builderMap) HandleMessage(sessionID uint64, msg Message, messageID uint64) {
b := m.GetBuilder(sessionID)
b.handleMessage(msg, messageID)
}
func (m builderMap) IterateReadyMessages(operatingTs int64, iter func(sessionID uint64, msg Message)) {
for sessionID, b := range m {
sessionEnded := b.checkTimeouts(operatingTs)
if sessionEnded {
delete(m, sessionID)
}
}
}

View file

@ -0,0 +1,62 @@
package sessionender
import (
"log"
"time"
)
// EndedSessionHandler handler for ended sessions
type EndedSessionHandler func(sessionID uint64, timestamp int64) bool
// session holds information about user's session live status
type session struct {
lastTimestamp int64
isEnded bool
}
// SessionEnder updates timestamp of last message for each session
type SessionEnder struct {
timeout int64
sessions map[uint64]*session // map[sessionID]session
}
func New(timeout int64) *SessionEnder {
return &SessionEnder{
timeout: timeout,
sessions: make(map[uint64]*session),
}
}
// UpdateSession save timestamp for new sessions and update for existing sessions
func (se *SessionEnder) UpdateSession(sessionID, timestamp uint64) {
currTS := int64(timestamp)
if currTS == 0 {
log.Printf("got empty timestamp for sessionID: %d", sessionID)
return
}
sess, ok := se.sessions[sessionID]
if !ok {
se.sessions[sessionID] = &session{
lastTimestamp: currTS,
isEnded: false,
}
return
}
if currTS > sess.lastTimestamp {
sess.lastTimestamp = currTS
sess.isEnded = false
}
}
// HandleEndedSessions runs handler for each ended session and delete information about session in successful case
func (se *SessionEnder) HandleEndedSessions(handler EndedSessionHandler) {
deadLine := time.Now().UnixMilli() - se.timeout
for sessID, sess := range se.sessions {
if sess.isEnded || sess.lastTimestamp < deadLine {
sess.isEnded = true
if handler(sessID, sess.lastTimestamp) {
delete(se.sessions, sessID)
}
}
}
}