feat(db): custom ts for custom events (#3390)
This commit is contained in:
parent
5304dbf8c1
commit
3edea4acb4
2 changed files with 18 additions and 15 deletions
|
|
@ -2,7 +2,7 @@ package datasaver
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"openreplay/backend/internal/config/db"
|
"openreplay/backend/internal/config/db"
|
||||||
"openreplay/backend/pkg/db/clickhouse"
|
"openreplay/backend/pkg/db/clickhouse"
|
||||||
"openreplay/backend/pkg/db/postgres"
|
"openreplay/backend/pkg/db/postgres"
|
||||||
|
|
@ -50,10 +50,6 @@ func New(log logger.Logger, cfg *db.Config, pg *postgres.Conn, ch clickhouse.Con
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *saverImpl) Handle(msg Message) {
|
func (s *saverImpl) Handle(msg Message) {
|
||||||
if msg.TypeID() == MsgCustomEvent {
|
|
||||||
defer s.Handle(types.WrapCustomEvent(msg.(*CustomEvent)))
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
sessCtx = context.WithValue(context.Background(), "sessionID", msg.SessionID())
|
sessCtx = context.WithValue(context.Background(), "sessionID", msg.SessionID())
|
||||||
session *sessions.Session
|
session *sessions.Session
|
||||||
|
|
@ -69,6 +65,23 @@ func (s *saverImpl) Handle(msg Message) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if msg.TypeID() == MsgCustomEvent {
|
||||||
|
m := msg.(*CustomEvent)
|
||||||
|
// Try to parse custom event payload to JSON and extract or_payload field
|
||||||
|
type CustomEventPayload struct {
|
||||||
|
CustomTimestamp uint64 `json:"or_timestamp"`
|
||||||
|
}
|
||||||
|
customPayload := &CustomEventPayload{}
|
||||||
|
if err := json.Unmarshal([]byte(m.Payload), customPayload); err == nil {
|
||||||
|
if customPayload.CustomTimestamp >= session.Timestamp {
|
||||||
|
s.log.Info(sessCtx, "custom event timestamp received: %v", m.Timestamp)
|
||||||
|
msg.Meta().Timestamp = customPayload.CustomTimestamp
|
||||||
|
s.log.Info(sessCtx, "custom event timestamp updated: %v", m.Timestamp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
defer s.Handle(types.WrapCustomEvent(m))
|
||||||
|
}
|
||||||
|
|
||||||
if IsMobileType(msg.TypeID()) {
|
if IsMobileType(msg.TypeID()) {
|
||||||
if err := s.handleMobileMessage(sessCtx, session, msg); err != nil {
|
if err := s.handleMobileMessage(sessCtx, session, msg); err != nil {
|
||||||
if !postgres.IsPkeyViolation(err) {
|
if !postgres.IsPkeyViolation(err) {
|
||||||
|
|
|
||||||
|
|
@ -2,8 +2,6 @@ package datasaver
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
|
|
||||||
"openreplay/backend/pkg/db/postgres"
|
"openreplay/backend/pkg/db/postgres"
|
||||||
"openreplay/backend/pkg/db/types"
|
"openreplay/backend/pkg/db/types"
|
||||||
"openreplay/backend/pkg/messages"
|
"openreplay/backend/pkg/messages"
|
||||||
|
|
@ -64,14 +62,6 @@ func (s *saverImpl) handleWebMessage(sessCtx context.Context, session *sessions.
|
||||||
s.pg.InsertAutocompleteValue(session.SessionID, session.ProjectID, "USERANONYMOUSID", m.ID)
|
s.pg.InsertAutocompleteValue(session.SessionID, session.ProjectID, "USERANONYMOUSID", m.ID)
|
||||||
return nil
|
return nil
|
||||||
case *messages.CustomEvent:
|
case *messages.CustomEvent:
|
||||||
// Try to parse custom event payload to JSON and extract or_payload field
|
|
||||||
type CustomEventPayload struct {
|
|
||||||
CustomTimestamp uint64 `json:"or_timestamp"`
|
|
||||||
}
|
|
||||||
customPayload := &CustomEventPayload{}
|
|
||||||
if err := json.Unmarshal([]byte(m.Payload), customPayload); err == nil {
|
|
||||||
m.Timestamp = customPayload.CustomTimestamp
|
|
||||||
}
|
|
||||||
if err := s.pg.InsertWebCustomEvent(session, m); err != nil {
|
if err := s.pg.InsertWebCustomEvent(session, m); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue