fix(backend/internals): builder codefix
This commit is contained in:
parent
85b87e17df
commit
6d2bfc0e77
2 changed files with 13 additions and 11 deletions
|
|
@ -1,17 +1,19 @@
|
|||
package builder
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"openreplay/backend/internal/handlers"
|
||||
. "openreplay/backend/pkg/messages"
|
||||
)
|
||||
|
||||
type builder struct {
|
||||
readyMsgs []Message
|
||||
timestamp uint64
|
||||
lastMessageID uint64
|
||||
lastSystemTimestamp int64
|
||||
processors []handlers.MessageProcessor
|
||||
ended bool
|
||||
readyMsgs []Message
|
||||
timestamp uint64
|
||||
lastMessageID uint64
|
||||
lastSystemTime time.Time
|
||||
processors []handlers.MessageProcessor
|
||||
ended bool
|
||||
}
|
||||
|
||||
func NewBuilder(handlers ...handlers.MessageProcessor) *builder {
|
||||
|
|
@ -55,7 +57,7 @@ func (b *builder) handleMessage(message Message, messageID uint64) {
|
|||
}
|
||||
|
||||
b.timestamp = timestamp
|
||||
b.lastSystemTimestamp = time.Now().UnixMilli()
|
||||
b.lastSystemTime = time.Now()
|
||||
for _, p := range b.processors {
|
||||
if rm := p.Handle(message, messageID, b.timestamp); rm != nil {
|
||||
b.readyMsgs = append(b.readyMsgs, rm)
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ func NewBuilderMap(handlersFabric func() []handlers.MessageProcessor) *builderMa
|
|||
func (m *builderMap) GetBuilder(sessionID uint64) *builder {
|
||||
b := m.sessions[sessionID]
|
||||
if b == nil {
|
||||
b = NewBuilder(m.handlersFabric()) // Should create new instances
|
||||
b = NewBuilder(m.handlersFabric()...) // Should create new instances
|
||||
m.sessions[sessionID] = b
|
||||
}
|
||||
return b
|
||||
|
|
@ -36,14 +36,14 @@ func (m *builderMap) HandleMessage(sessionID uint64, msg Message, messageID uint
|
|||
}
|
||||
|
||||
func (m *builderMap) iterateSessionReadyMessages(sessionID uint64, b *builder, iter func(msg Message)) {
|
||||
if b.ended || b.lastSystemTimestamp+FORCE_DELETE_TIMEOUT < time.Now().UnixMilli() {
|
||||
if b.ended || b.lastSystemTime.Add(FORCE_DELETE_TIMEOUT).Before(time.Now()) {
|
||||
for _, p := range b.processors {
|
||||
if rm := p.Build(); rm != nil {
|
||||
b.readyMsgs = append(b.readyMsgs, rm)
|
||||
}
|
||||
}
|
||||
}
|
||||
b.iterateReadyMessage(iter)
|
||||
b.iterateReadyMessages(iter)
|
||||
if b.ended {
|
||||
delete(m.sessions, sessionID)
|
||||
}
|
||||
|
|
@ -69,6 +69,6 @@ func (m *builderMap) IterateSessionReadyMessages(sessionID uint64, iter func(msg
|
|||
m.iterateSessionReadyMessages(
|
||||
sessionID,
|
||||
session,
|
||||
inter,
|
||||
iter,
|
||||
)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue