diff --git a/backend/internal/db/datasaver/saver.go b/backend/internal/db/datasaver/saver.go index 8309059ff..ad1929f08 100644 --- a/backend/internal/db/datasaver/saver.go +++ b/backend/internal/db/datasaver/saver.go @@ -2,11 +2,12 @@ package datasaver import ( "context" + "encoding/json" + "openreplay/backend/pkg/db/types" "openreplay/backend/internal/config/db" "openreplay/backend/pkg/db/clickhouse" "openreplay/backend/pkg/db/postgres" - "openreplay/backend/pkg/db/types" "openreplay/backend/pkg/logger" . "openreplay/backend/pkg/messages" queue "openreplay/backend/pkg/queue/types" @@ -50,10 +51,6 @@ func New(log logger.Logger, cfg *db.Config, pg *postgres.Conn, ch clickhouse.Con } func (s *saverImpl) Handle(msg Message) { - if msg.TypeID() == MsgCustomEvent { - defer s.Handle(types.WrapCustomEvent(msg.(*CustomEvent))) - } - var ( sessCtx = context.WithValue(context.Background(), "sessionID", msg.SessionID()) session *sessions.Session @@ -69,6 +66,23 @@ func (s *saverImpl) Handle(msg Message) { return } + if msg.TypeID() == MsgCustomEvent { + m := msg.(*CustomEvent) + // Try to parse custom event payload to JSON and extract or_payload field + type CustomEventPayload struct { + CustomTimestamp uint64 `json:"or_timestamp"` + } + customPayload := &CustomEventPayload{} + if err := json.Unmarshal([]byte(m.Payload), customPayload); err == nil { + if customPayload.CustomTimestamp >= session.Timestamp { + s.log.Info(sessCtx, "custom event timestamp received: %v", m.Timestamp) + msg.Meta().Timestamp = customPayload.CustomTimestamp + s.log.Info(sessCtx, "custom event timestamp updated: %v", m.Timestamp) + } + } + defer s.Handle(types.WrapCustomEvent(m)) + } + if IsMobileType(msg.TypeID()) { if err := s.handleMobileMessage(sessCtx, session, msg); err != nil { if !postgres.IsPkeyViolation(err) {