* feat(backend): new session end detection logic + several fixes * feat(backend): support partitions managment in ender * feat(backend): added new consumer support to redis * feat(backend): added support for new consumer in kafka * feat(backend): added new consumer support to redis (ee) * feat(backend): small refactoring in ender
28 lines
927 B
Go
28 lines
927 B
Go
package sessionender
|
|
|
|
type timeController struct {
|
|
parts uint64
|
|
lastBatchTimestamp map[uint64]int64 // map[partition]consumerTimeOfLastMessage
|
|
lastUpdateTimestamp map[uint64]int64 // map[partition]systemTimeOfLastMessage
|
|
}
|
|
|
|
func NewTimeController(parts int) *timeController {
|
|
return &timeController{
|
|
parts: uint64(parts),
|
|
lastBatchTimestamp: make(map[uint64]int64),
|
|
lastUpdateTimestamp: make(map[uint64]int64),
|
|
}
|
|
}
|
|
|
|
func (tc *timeController) UpdateTime(sessionID uint64, batchTimestamp, updateTimestamp int64) {
|
|
tc.lastBatchTimestamp[sessionID%tc.parts] = batchTimestamp
|
|
tc.lastUpdateTimestamp[sessionID%tc.parts] = updateTimestamp
|
|
}
|
|
|
|
func (tc *timeController) LastBatchTimestamp(sessionID uint64) int64 {
|
|
return tc.lastBatchTimestamp[sessionID%tc.parts]
|
|
}
|
|
|
|
func (tc *timeController) LastUpdateTimestamp(sessionID uint64) int64 {
|
|
return tc.lastUpdateTimestamp[sessionID%tc.parts]
|
|
}
|