* feat(backend): handlers for mobile messages * feat(backend): new service template * feat(backend): save mobile session start and send batches to kafka * feat(backend): added mobile support to sink, ender and storage * helm(videostorage): added helm chart for a new service videostorage * fix(backend): added pointer to streing for userBrowser (because it's null for mobile sessions) * feat(backend): added MsgIOSBatchMeta handler to message iterator's logic * feat(backend): added ios ts parser to ender * feat(backend): enabled producing batch of messages to queue * feat(backend): removed iosstart from mob files * feat(backend): added new ios message types * feat(backend): added iosStart and iosEnd * fix(backend): fixed log issue * feat(backend): send tar.gz archives to special queue topic * feat(backend): read raw archives from kafka * fix(backend): added missing file * fix(backend): removed the second file reading * fix(backend): fixed wrong queue topic name * fix(backend): fixed mobile trigger topic name * feat(backend): added tar.gz extractor and iOSSessionEnd handler * feat(backend): debug logs on message uploading * fix(backend): added raw-images topic consumption * feat(backend): now sink send iosSessionEnd to video-storage * feat(backend): added dir creator for new sessions * feat(backend): added correct command to execute * feat(backend): added overwrite option * feat(backend): added s3 uploader for video session replays * feat(backend): new consumer group for mobile sessions * feat(backend): debug logs for uploader * feat(backend): removed unused log * feat(backend): fixed s3 key for video replays * feat(backend): removed debug code * feat(backend): fixed video-storage message filter * fix(backend): added mobileSessionEnd to SessionEnd converter * feat(backend): added first version if db support for mobile events * fix(backend): added swipe events to mob file * feat(backend): added swipe event to pg * feat(backend): split logic into 2 services: image-storage and video-storage * feat(backend): added helm chart for image-storage service * fix(backend): fixed table name for mobile taps * feat(backend): added metadata handler for mobile message parser + fix message filters * feat(backend): added iosRawTopic to DB message consumer * fix(backend): removed value from mobile inputs * feat(backend): removed debug log from iterator * feat(backend): added new apple devices to iOS device parser * fix(backend): added real projectID instead of 0 * feat(backend): extended a list of simulators for device detector * feat(backend): updated networkCall mobile message * fix(backend): added new way to define is network call successed or not * feat(backend): added timezone support for mobile start request * feat(backend): added 2 mobile events Input and Click to mob file * feat(backend): refactored image storage * feat(backend): video storage with 2 workers * feat(backend): added project's platform support * feat(backend): added memory size field for mobile start request * feat(backend): changed video preset for ultrafast * feat(backend): added debug log to http /late handler * feat(backend): added debug log to db service for iosCrash messages * feat(backend): added tapRage event handler to heuristics * feat(backend): changed table and field names for ios crashes * feat(backend): added payload for tapRage events * feat(backend): added TapRage events insert to DB * feat(backend): added fps value to /mobile/start response * feat(backend): added image quality parameter to /mobile/start response * feat(backend): added ScreenLeave handler * feat(backend): removed screenEnter and screenLeave events, added new viewComponent event --------- Co-authored-by: rjshrjndrn <rjshrjndrn@gmail.com>
233 lines
6.5 KiB
Go
233 lines
6.5 KiB
Go
package datasaver
|
|
|
|
import (
|
|
"log"
|
|
|
|
"openreplay/backend/internal/config/db"
|
|
"openreplay/backend/pkg/db/clickhouse"
|
|
"openreplay/backend/pkg/db/postgres"
|
|
"openreplay/backend/pkg/db/types"
|
|
. "openreplay/backend/pkg/messages"
|
|
queue "openreplay/backend/pkg/queue/types"
|
|
"openreplay/backend/pkg/sessions"
|
|
)
|
|
|
|
type Saver interface {
|
|
Handle(msg Message)
|
|
Commit() error
|
|
Close() error
|
|
}
|
|
|
|
type saverImpl struct {
|
|
cfg *db.Config
|
|
pg *postgres.Conn
|
|
sessions sessions.Sessions
|
|
ch clickhouse.Connector
|
|
producer queue.Producer
|
|
}
|
|
|
|
func New(cfg *db.Config, pg *postgres.Conn, session sessions.Sessions) Saver {
|
|
s := &saverImpl{
|
|
cfg: cfg,
|
|
pg: pg,
|
|
sessions: session,
|
|
}
|
|
s.init()
|
|
return s
|
|
}
|
|
|
|
func (s *saverImpl) Handle(msg Message) {
|
|
if msg.TypeID() == MsgCustomEvent {
|
|
defer s.Handle(types.WrapCustomEvent(msg.(*CustomEvent)))
|
|
}
|
|
if IsIOSType(msg.TypeID()) {
|
|
// Handle iOS messages
|
|
if err := s.handleMobileMessage(msg); err != nil {
|
|
if !postgres.IsPkeyViolation(err) {
|
|
log.Printf("iOS Message Insertion Error %v, SessionID: %v, Message: %v", err, msg.SessionID(), msg)
|
|
}
|
|
return
|
|
}
|
|
}
|
|
if err := s.handleMessage(msg); err != nil {
|
|
if !postgres.IsPkeyViolation(err) {
|
|
log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, msg.SessionID(), msg)
|
|
}
|
|
return
|
|
}
|
|
if err := s.handleExtraMessage(msg); err != nil {
|
|
log.Printf("Stats Insertion Error %v; Session: %d, Message: %v", err, msg.SessionID(), msg)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (s *saverImpl) handleMobileMessage(msg Message) error {
|
|
session, err := s.sessions.Get(msg.SessionID())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
switch m := msg.(type) {
|
|
case *IOSSessionStart:
|
|
return s.pg.InsertIOSSessionStart(m.SessionID(), m)
|
|
case *IOSSessionEnd:
|
|
return s.pg.InsertIOSSessionEnd(m.SessionID(), m)
|
|
case *IOSUserID:
|
|
if err = s.sessions.UpdateUserID(session.SessionID, m.Value); err != nil {
|
|
return err
|
|
}
|
|
s.pg.InsertAutocompleteValue(session.SessionID, session.ProjectID, "USERID_IOS", m.Value)
|
|
return nil
|
|
case *IOSUserAnonymousID:
|
|
if err = s.sessions.UpdateAnonymousID(session.SessionID, m.Value); err != nil {
|
|
return err
|
|
}
|
|
s.pg.InsertAutocompleteValue(session.SessionID, session.ProjectID, "USERANONYMOUSID_IOS", m.Value)
|
|
return nil
|
|
case *IOSMetadata:
|
|
return s.sessions.UpdateMetadata(m.SessionID(), m.Key, m.Value)
|
|
case *IOSCustomEvent:
|
|
return s.pg.InsertIOSCustomEvent(session, m)
|
|
case *IOSClickEvent:
|
|
if err := s.pg.InsertIOSClickEvent(session, m); err != nil {
|
|
return err
|
|
}
|
|
return s.sessions.UpdateEventsStats(session.SessionID, 1, 0)
|
|
case *IOSSwipeEvent:
|
|
if err := s.pg.InsertIOSSwipeEvent(session, m); err != nil {
|
|
return err
|
|
}
|
|
return s.sessions.UpdateEventsStats(session.SessionID, 1, 0)
|
|
case *IOSInputEvent:
|
|
if err := s.pg.InsertIOSInputEvent(session, m); err != nil {
|
|
return err
|
|
}
|
|
return s.sessions.UpdateEventsStats(session.SessionID, 1, 0)
|
|
case *IOSNetworkCall:
|
|
return s.pg.InsertIOSNetworkCall(session, m)
|
|
case *IOSScreenEnter:
|
|
if err := s.pg.InsertIOSScreenEnter(session, m); err != nil {
|
|
return err
|
|
}
|
|
return s.sessions.UpdateEventsStats(session.SessionID, 1, 1)
|
|
case *IOSScreenLeave:
|
|
if err := s.pg.InsertIOSScreenLeave(session, m); err != nil {
|
|
return err
|
|
}
|
|
return s.sessions.UpdateEventsStats(session.SessionID, 1, 0)
|
|
case *IOSCrash:
|
|
if err := s.pg.InsertIOSCrash(session.SessionID, session.ProjectID, m); err != nil {
|
|
return err
|
|
}
|
|
return s.sessions.UpdateIssuesStats(session.SessionID, 1, 1000)
|
|
case *IOSIssueEvent:
|
|
if err = s.pg.InsertIOSIssueEvent(session, m); err != nil {
|
|
return err
|
|
}
|
|
return s.sessions.UpdateIssuesStats(session.SessionID, 0, postgres.GetIssueScore(m.Type))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *saverImpl) handleMessage(msg Message) error {
|
|
session, err := s.sessions.Get(msg.SessionID())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
switch m := msg.(type) {
|
|
case *SessionStart:
|
|
return s.pg.HandleStartEvent(m)
|
|
case *SessionEnd:
|
|
return s.pg.HandleEndEvent(m.SessionID())
|
|
case *Metadata:
|
|
return s.sessions.UpdateMetadata(m.SessionID(), m.Key, m.Value)
|
|
case *IssueEvent:
|
|
err = s.pg.InsertIssueEvent(session, m)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return s.sessions.UpdateIssuesStats(session.SessionID, 0, postgres.GetIssueScore(m.Type))
|
|
case *CustomIssue:
|
|
ie := &IssueEvent{
|
|
Type: "custom",
|
|
Timestamp: m.Timestamp,
|
|
MessageID: m.Index,
|
|
ContextString: m.Name,
|
|
Payload: m.Payload,
|
|
}
|
|
ie.SetMeta(m.Meta())
|
|
if err = s.pg.InsertIssueEvent(session, ie); err != nil {
|
|
return err
|
|
}
|
|
return s.sessions.UpdateIssuesStats(session.SessionID, 0, postgres.GetIssueScore(ie.Type))
|
|
case *UserID:
|
|
if err = s.sessions.UpdateUserID(session.SessionID, m.ID); err != nil {
|
|
return err
|
|
}
|
|
s.pg.InsertAutocompleteValue(session.SessionID, session.ProjectID, "USERID", m.ID)
|
|
return nil
|
|
case *UserAnonymousID:
|
|
if err = s.sessions.UpdateAnonymousID(session.SessionID, m.ID); err != nil {
|
|
return err
|
|
}
|
|
s.pg.InsertAutocompleteValue(session.SessionID, session.ProjectID, "USERANONYMOUSID", m.ID)
|
|
return nil
|
|
case *CustomEvent:
|
|
return s.pg.InsertWebCustomEvent(session, m)
|
|
case *MouseClick:
|
|
if err = s.pg.InsertWebClickEvent(session, m); err != nil {
|
|
return err
|
|
}
|
|
return s.sessions.UpdateEventsStats(session.SessionID, 1, 0)
|
|
case *PageEvent:
|
|
if err = s.pg.InsertWebPageEvent(session, m); err != nil {
|
|
return err
|
|
}
|
|
return s.sessions.UpdateEventsStats(session.SessionID, 1, 1)
|
|
case *NetworkRequest:
|
|
return s.pg.InsertWebNetworkRequest(session, m)
|
|
case *GraphQL:
|
|
return s.pg.InsertWebGraphQL(session, m)
|
|
case *JSException:
|
|
if err = s.pg.InsertWebErrorEvent(session, types.WrapJSException(m)); err != nil {
|
|
return err
|
|
}
|
|
return s.sessions.UpdateIssuesStats(session.SessionID, 1, 1000)
|
|
case *IntegrationEvent:
|
|
return s.pg.InsertWebErrorEvent(session, types.WrapIntegrationEvent(m))
|
|
case *InputChange:
|
|
if err = s.pg.InsertInputChangeEvent(session, m); err != nil {
|
|
return err
|
|
}
|
|
return s.sessions.UpdateEventsStats(session.SessionID, 1, 0)
|
|
case *MouseThrashing:
|
|
if err = s.pg.InsertMouseThrashing(session, m); err != nil {
|
|
return err
|
|
}
|
|
return s.sessions.UpdateIssuesStats(session.SessionID, 0, 50)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *saverImpl) Commit() error {
|
|
if s.pg != nil {
|
|
s.pg.Commit()
|
|
}
|
|
if s.ch != nil {
|
|
s.ch.Commit()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *saverImpl) Close() error {
|
|
if s.pg != nil {
|
|
if err := s.pg.Close(); err != nil {
|
|
log.Printf("pg.Close error: %s", err)
|
|
}
|
|
}
|
|
if s.ch != nil {
|
|
if err := s.ch.Stop(); err != nil {
|
|
log.Printf("ch.Close error: %s", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|