feat(db): custom event's ts (#3395)
This commit is contained in:
parent
f12bdebf82
commit
de33a42151
1 changed files with 19 additions and 5 deletions
|
|
@ -2,11 +2,12 @@ package datasaver
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"openreplay/backend/pkg/db/types"
|
||||||
|
|
||||||
"openreplay/backend/internal/config/db"
|
"openreplay/backend/internal/config/db"
|
||||||
"openreplay/backend/pkg/db/clickhouse"
|
"openreplay/backend/pkg/db/clickhouse"
|
||||||
"openreplay/backend/pkg/db/postgres"
|
"openreplay/backend/pkg/db/postgres"
|
||||||
"openreplay/backend/pkg/db/types"
|
|
||||||
"openreplay/backend/pkg/logger"
|
"openreplay/backend/pkg/logger"
|
||||||
. "openreplay/backend/pkg/messages"
|
. "openreplay/backend/pkg/messages"
|
||||||
queue "openreplay/backend/pkg/queue/types"
|
queue "openreplay/backend/pkg/queue/types"
|
||||||
|
|
@ -50,10 +51,6 @@ func New(log logger.Logger, cfg *db.Config, pg *postgres.Conn, ch clickhouse.Con
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *saverImpl) Handle(msg Message) {
|
func (s *saverImpl) Handle(msg Message) {
|
||||||
if msg.TypeID() == MsgCustomEvent {
|
|
||||||
defer s.Handle(types.WrapCustomEvent(msg.(*CustomEvent)))
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
sessCtx = context.WithValue(context.Background(), "sessionID", msg.SessionID())
|
sessCtx = context.WithValue(context.Background(), "sessionID", msg.SessionID())
|
||||||
session *sessions.Session
|
session *sessions.Session
|
||||||
|
|
@ -69,6 +66,23 @@ func (s *saverImpl) Handle(msg Message) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if msg.TypeID() == MsgCustomEvent {
|
||||||
|
m := msg.(*CustomEvent)
|
||||||
|
// Try to parse custom event payload to JSON and extract or_payload field
|
||||||
|
type CustomEventPayload struct {
|
||||||
|
CustomTimestamp uint64 `json:"or_timestamp"`
|
||||||
|
}
|
||||||
|
customPayload := &CustomEventPayload{}
|
||||||
|
if err := json.Unmarshal([]byte(m.Payload), customPayload); err == nil {
|
||||||
|
if customPayload.CustomTimestamp >= session.Timestamp {
|
||||||
|
s.log.Info(sessCtx, "custom event timestamp received: %v", m.Timestamp)
|
||||||
|
msg.Meta().Timestamp = customPayload.CustomTimestamp
|
||||||
|
s.log.Info(sessCtx, "custom event timestamp updated: %v", m.Timestamp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
defer s.Handle(types.WrapCustomEvent(m))
|
||||||
|
}
|
||||||
|
|
||||||
if IsMobileType(msg.TypeID()) {
|
if IsMobileType(msg.TypeID()) {
|
||||||
if err := s.handleMobileMessage(sessCtx, session, msg); err != nil {
|
if err := s.handleMobileMessage(sessCtx, session, msg); err != nil {
|
||||||
if !postgres.IsPkeyViolation(err) {
|
if !postgres.IsPkeyViolation(err) {
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue