feat(backend): logic improvements in sessionEnder

This commit is contained in:
Alexander Zavorotynskiy 2023-08-03 18:05:36 +04:00
parent 2e99095dcd
commit 81f70ddd4d
3 changed files with 154 additions and 31 deletions

View file

@ -77,28 +77,69 @@ func main() {
case <-tick:
failedSessionEnds := make(map[uint64]uint64)
duplicatedSessionEnds := make(map[uint64]uint64)
negativeDuration := make(map[uint64]uint64)
shorterDuration := make(map[uint64]int64)
diffDuration := make(map[uint64]int64)
noSessionInDB := make(map[uint64]uint64)
updatedDurations := 0
newSessionEnds := 0
type SessionEndType int
const (
FailedSessionEnd SessionEndType = iota + 1
DuplicatedSessionEnd
NegativeDuration
ShorterDuration
DiffDuration
NewSessionEnd
NoSessionInDB
)
// Find ended sessions and send notification to other services
sessions.HandleEndedSessions(func(sessionID uint64, timestamp uint64) bool {
sessions.HandleEndedSessions(func(sessionID uint64, timestamp uint64) (bool, int) {
msg := &messages.SessionEnd{Timestamp: timestamp}
currDuration, err := pg.GetSessionDuration(sessionID)
if err != nil {
log.Printf("getSessionDuration failed, sessID: %d, err: %s", sessionID, err)
}
sess, err := pg.Cache.GetSession(sessionID)
if err != nil {
log.Printf("can't get session from database to compare durations, sessID: %d, err: %s", sessionID, err)
} else {
newDur := timestamp - sess.Timestamp
// Skip if session was ended before with same duration
if currDuration == newDur {
duplicatedSessionEnds[sessionID] = currDuration
return true, int(DuplicatedSessionEnd)
}
// Skip if session was ended before with longer duration
if currDuration > newDur {
shorterDuration[sessionID] = int64(currDuration) - int64(newDur)
return true, int(ShorterDuration)
}
}
newDuration, err := pg.InsertSessionEnd(sessionID, msg.Timestamp)
if err != nil {
if strings.Contains(err.Error(), "integer out of range") {
// Skip session with broken duration
failedSessionEnds[sessionID] = timestamp
return true
return true, int(FailedSessionEnd)
}
if strings.Contains(err.Error(), "is less than zero for uint64") {
negativeDuration[sessionID] = timestamp
return true, int(NegativeDuration)
}
if strings.Contains(err.Error(), "no rows in result set") {
noSessionInDB[sessionID] = timestamp
return true, int(NoSessionInDB)
}
log.Printf("can't save sessionEnd to database, sessID: %d, err: %s", sessionID, err)
return false
return false, 0
}
// Check one more time just in case
if currDuration == newDuration {
// Skip session end duplicate
duplicatedSessionEnds[sessionID] = currDuration
return true
return true, int(DuplicatedSessionEnd)
}
if cfg.UseEncryption {
if key := storage.GenerateEncryptionKey(); key != nil {
@ -111,22 +152,34 @@ func main() {
}
if err := producer.Produce(cfg.TopicRawWeb, sessionID, msg.Encode()); err != nil {
log.Printf("can't send sessionEnd to topic: %s; sessID: %d", err, sessionID)
return false
return false, 0
}
return true
if currDuration != 0 {
diffDuration[sessionID] = int64(newDuration) - int64(currDuration)
updatedDurations++
} else {
newSessionEnds++
}
return true, int(NewSessionEnd)
})
if len(failedSessionEnds) > 0 {
log.Println("sessions with wrong duration:", failedSessionEnds)
if n := len(failedSessionEnds); n > 0 {
log.Println("sessions with wrong duration:", n, failedSessionEnds)
}
if len(duplicatedSessionEnds) > 0 {
log.Println("session end duplicates:", duplicatedSessionEnds)
if n := len(negativeDuration); n > 0 {
log.Println("sessions with negative duration:", n, negativeDuration)
}
if n := len(noSessionInDB); n > 0 {
log.Printf("sessions without info in DB: %d, %v", n, noSessionInDB)
}
log.Printf("[INFO] failed: %d, negative: %d, shorter: %d, same: %d, updated: %d, new: %d, not found: %d",
len(failedSessionEnds), len(negativeDuration), len(shorterDuration), len(duplicatedSessionEnds),
updatedDurations, newSessionEnds, len(noSessionInDB))
producer.Flush(cfg.ProducerTimeout)
if err := consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP); err != nil {
log.Printf("can't commit messages with offset: %s", err)
}
case msg := <-consumer.Rebalanced():
log.Println(msg)
log.Printf("Rebalanced: %+v", msg)
default:
if !memoryManager.HasFreeMemory() {
continue

View file

@ -9,12 +9,12 @@ import (
)
// EndedSessionHandler handler for ended sessions
type EndedSessionHandler func(sessionID uint64, timestamp uint64) bool
type EndedSessionHandler func(sessionID uint64, timestamp uint64) (bool, int)
// session holds information about user's session live status
type session struct {
lastTimestamp int64
lastUpdate int64
lastTimestamp int64 // timestamp from message broker
lastUpdate int64 // local timestamp
lastUserTime uint64
isEnded bool
}
@ -24,6 +24,8 @@ type SessionEnder struct {
timeout int64
sessions map[uint64]*session // map[sessionID]session
timeCtrl *timeController
parts uint64
enabled bool
}
func New(timeout int64, parts int) (*SessionEnder, error) {
@ -31,9 +33,38 @@ func New(timeout int64, parts int) (*SessionEnder, error) {
timeout: timeout,
sessions: make(map[uint64]*session),
timeCtrl: NewTimeController(parts),
parts: uint64(parts), // ender uses all partitions by default
enabled: true,
}, nil
}
func (se *SessionEnder) Enable() {
se.enabled = true
}
func (se *SessionEnder) Disable() {
se.enabled = false
}
func (se *SessionEnder) ActivePartitions(parts []uint64) {
activeParts := make(map[uint64]bool, 0)
for _, p := range parts {
activeParts[p] = true
}
removedSessions := 0
activeSessions := 0
for sessID, _ := range se.sessions {
if !activeParts[sessID%se.parts] {
delete(se.sessions, sessID)
removedSessions++
} else {
activeSessions++
}
}
log.Printf("SessionEnder: %d sessions left in active partitions: %+v, removed %d sessions",
activeSessions, parts, removedSessions)
}
// UpdateSession save timestamp for new sessions and update for existing sessions
func (se *SessionEnder) UpdateSession(msg messages.Message) {
var (
@ -46,14 +77,14 @@ func (se *SessionEnder) UpdateSession(msg messages.Message) {
log.Printf("got empty timestamp for sessionID: %d", sessionID)
return
}
se.timeCtrl.UpdateTime(sessionID, batchTimestamp)
se.timeCtrl.UpdateTime(sessionID, batchTimestamp, localTimestamp)
sess, ok := se.sessions[sessionID]
if !ok {
// Register new session
se.sessions[sessionID] = &session{
lastTimestamp: batchTimestamp, // timestamp from message broker
lastUpdate: localTimestamp, // local timestamp
lastUserTime: msgTimestamp, // last timestamp from user's machine
lastTimestamp: batchTimestamp,
lastUpdate: localTimestamp,
lastUserTime: msgTimestamp, // last timestamp from user's machine
isEnded: false,
}
ender.IncreaseActiveSessions()
@ -74,21 +105,53 @@ func (se *SessionEnder) UpdateSession(msg messages.Message) {
// HandleEndedSessions runs handler for each ended session and delete information about session in successful case
func (se *SessionEnder) HandleEndedSessions(handler EndedSessionHandler) {
if !se.enabled {
log.Printf("SessionEnder is disabled")
return
}
currTime := time.Now().UnixMilli()
allSessions, removedSessions := len(se.sessions), 0
brokerTime := make(map[int]int, 0)
serverTime := make(map[int]int, 0)
isSessionEnded := func(sessID uint64, sess *session) (bool, int) {
// Has been finished already
if sess.isEnded {
return true, 1
}
batchTimeDiff := se.timeCtrl.LastBatchTimestamp(sessID) - sess.lastTimestamp
// Has been finished according to batch timestamp and hasn't been updated for a long time
if (batchTimeDiff >= se.timeout) && (currTime-sess.lastUpdate >= se.timeout) {
return true, 2
}
// Hasn't been finished according to batch timestamp but hasn't been read from partition for a long time
if (batchTimeDiff < se.timeout) && (currTime-se.timeCtrl.LastUpdateTimestamp(sessID) >= se.timeout) {
return true, 3
}
return false, 0
}
for sessID, sess := range se.sessions {
if sess.isEnded || (se.timeCtrl.LastTimestamp(sessID)-sess.lastTimestamp > se.timeout) ||
(currTime-sess.lastUpdate > se.timeout) {
if ended, endCase := isSessionEnded(sessID, sess); ended {
sess.isEnded = true
if handler(sessID, sess.lastUserTime) {
if res, _ := handler(sessID, sess.lastUserTime); res {
delete(se.sessions, sessID)
ender.DecreaseActiveSessions()
ender.IncreaseClosedSessions()
removedSessions++
if endCase == 2 {
brokerTime[1]++
}
if endCase == 3 {
serverTime[1]++
}
} else {
log.Printf("sessID: %d, userTime: %d", sessID, sess.lastUserTime)
}
}
}
log.Printf("Removed %d of %d sessions", removedSessions, allSessions)
log.Printf("Removed %d of %d sessions; brokerTime: %d, serverTime: %d",
removedSessions, allSessions, brokerTime, serverTime)
}

View file

@ -1,21 +1,28 @@
package sessionender
type timeController struct {
parts uint64
lastTimestamp map[uint64]int64 // map[partition]consumerTimeOfLastMessage
parts uint64
lastBatchTimestamp map[uint64]int64 // map[partition]consumerTimeOfLastMessage
lastUpdateTimestamp map[uint64]int64 // map[partition]systemTimeOfLastMessage
}
func NewTimeController(parts int) *timeController {
return &timeController{
parts: uint64(parts),
lastTimestamp: make(map[uint64]int64),
parts: uint64(parts),
lastBatchTimestamp: make(map[uint64]int64),
lastUpdateTimestamp: make(map[uint64]int64),
}
}
func (tc *timeController) UpdateTime(sessionID uint64, timestamp int64) {
tc.lastTimestamp[sessionID%tc.parts] = timestamp
func (tc *timeController) UpdateTime(sessionID uint64, batchTimestamp, updateTimestamp int64) {
tc.lastBatchTimestamp[sessionID%tc.parts] = batchTimestamp
tc.lastUpdateTimestamp[sessionID%tc.parts] = updateTimestamp
}
func (tc *timeController) LastTimestamp(sessionID uint64) int64 {
return tc.lastTimestamp[sessionID%tc.parts]
func (tc *timeController) LastBatchTimestamp(sessionID uint64) int64 {
return tc.lastBatchTimestamp[sessionID%tc.parts]
}
func (tc *timeController) LastUpdateTimestamp(sessionID uint64) int64 {
return tc.lastUpdateTimestamp[sessionID%tc.parts]
}