diff --git a/backend/internal/db/datasaver/saver.go b/backend/internal/db/datasaver/saver.go index 8309059ff..abbbf50e6 100644 --- a/backend/internal/db/datasaver/saver.go +++ b/backend/internal/db/datasaver/saver.go @@ -2,7 +2,7 @@ package datasaver import ( "context" - + "encoding/json" "openreplay/backend/internal/config/db" "openreplay/backend/pkg/db/clickhouse" "openreplay/backend/pkg/db/postgres" @@ -50,10 +50,6 @@ func New(log logger.Logger, cfg *db.Config, pg *postgres.Conn, ch clickhouse.Con } func (s *saverImpl) Handle(msg Message) { - if msg.TypeID() == MsgCustomEvent { - defer s.Handle(types.WrapCustomEvent(msg.(*CustomEvent))) - } - var ( sessCtx = context.WithValue(context.Background(), "sessionID", msg.SessionID()) session *sessions.Session @@ -69,6 +65,23 @@ func (s *saverImpl) Handle(msg Message) { return } + if msg.TypeID() == MsgCustomEvent { + m := msg.(*CustomEvent) + // Try to parse custom event payload to JSON and extract or_payload field + type CustomEventPayload struct { + CustomTimestamp uint64 `json:"or_timestamp"` + } + customPayload := &CustomEventPayload{} + if err := json.Unmarshal([]byte(m.Payload), customPayload); err == nil { + if customPayload.CustomTimestamp >= session.Timestamp { + s.log.Info(sessCtx, "custom event timestamp received: %v", m.Timestamp) + msg.Meta().Timestamp = customPayload.CustomTimestamp + s.log.Info(sessCtx, "custom event timestamp updated: %v", m.Timestamp) + } + } + defer s.Handle(types.WrapCustomEvent(m)) + } + if IsMobileType(msg.TypeID()) { if err := s.handleMobileMessage(sessCtx, session, msg); err != nil { if !postgres.IsPkeyViolation(err) { diff --git a/backend/internal/db/datasaver/web.go b/backend/internal/db/datasaver/web.go index 108e6a01b..701fe0e8d 100644 --- a/backend/internal/db/datasaver/web.go +++ b/backend/internal/db/datasaver/web.go @@ -2,8 +2,6 @@ package datasaver import ( "context" - "encoding/json" - "openreplay/backend/pkg/db/postgres" "openreplay/backend/pkg/db/types" "openreplay/backend/pkg/messages" @@ -64,14 +62,6 @@ func (s *saverImpl) handleWebMessage(sessCtx context.Context, session *sessions. s.pg.InsertAutocompleteValue(session.SessionID, session.ProjectID, "USERANONYMOUSID", m.ID) return nil case *messages.CustomEvent: - // Try to parse custom event payload to JSON and extract or_payload field - type CustomEventPayload struct { - CustomTimestamp uint64 `json:"or_timestamp"` - } - customPayload := &CustomEventPayload{} - if err := json.Unmarshal([]byte(m.Payload), customPayload); err == nil { - m.Timestamp = customPayload.CustomTimestamp - } if err := s.pg.InsertWebCustomEvent(session, m); err != nil { return err }