diff --git a/backend/cmd/db/main.go b/backend/cmd/db/main.go index 5d75b02d7..a3eac941c 100644 --- a/backend/cmd/db/main.go +++ b/backend/cmd/db/main.go @@ -6,6 +6,7 @@ import ( config "openreplay/backend/internal/config/db" "openreplay/backend/internal/db" "openreplay/backend/internal/db/datasaver" + "openreplay/backend/pkg/db/clickhouse" "openreplay/backend/pkg/db/postgres" "openreplay/backend/pkg/db/postgres/pool" "openreplay/backend/pkg/db/redis" @@ -33,9 +34,15 @@ func main() { } defer pgConn.Close() - // Init events module - pg := postgres.NewConn(log, pgConn) - defer pg.Close() + chConn := clickhouse.NewConnector(cfg.Clickhouse) + if err := chConn.Prepare(); err != nil { + log.Fatal(ctx, "can't prepare clickhouse: %s", err) + } + defer chConn.Stop() + + // Init db proxy module (postgres + clickhouse + batches) + dbProxy := postgres.NewConn(log, pgConn, chConn) + defer dbProxy.Close() // Init redis connection redisClient, err := redis.New(&cfg.Redis) @@ -49,7 +56,7 @@ func main() { tagsManager := tags.New(log, pgConn) // Init data saver - saver := datasaver.New(log, cfg, pg, sessManager, tagsManager) + saver := datasaver.New(log, cfg, dbProxy, chConn, sessManager, tagsManager) // Message filter msgFilter := []int{ diff --git a/backend/internal/config/common/config.go b/backend/internal/config/common/config.go index dd21d2ae0..a2db40c48 100644 --- a/backend/internal/config/common/config.go +++ b/backend/internal/config/common/config.go @@ -57,10 +57,18 @@ type Redshift struct { // Clickhouse config type Clickhouse struct { - URL string `env:"CLICKHOUSE_STRING"` - Database string `env:"CLICKHOUSE_DATABASE,default=default"` - UserName string `env:"CLICKHOUSE_USERNAME,default=default"` - Password string `env:"CLICKHOUSE_PASSWORD,default="` + URL string `env:"CLICKHOUSE_STRING"` + Database string `env:"CLICKHOUSE_DATABASE,default=default"` + UserName string `env:"CLICKHOUSE_USERNAME,default=default"` + Password string `env:"CLICKHOUSE_PASSWORD,default="` + LegacyUserName string `env:"CH_USERNAME,default=default"` + LegacyPassword string `env:"CH_PASSWORD,default="` +} + +func (cfg *Clickhouse) GetTrimmedURL() string { + chUrl := strings.TrimPrefix(cfg.URL, "tcp://") + chUrl = strings.TrimSuffix(chUrl, "/default") + return chUrl } // ElasticSearch config diff --git a/backend/internal/config/db/config.go b/backend/internal/config/db/config.go index 48d49dc62..e6f45e18a 100644 --- a/backend/internal/config/db/config.go +++ b/backend/internal/config/db/config.go @@ -11,6 +11,7 @@ import ( type Config struct { common.Config common.Postgres + common.Clickhouse redis.Redis ProjectExpiration time.Duration `env:"PROJECT_EXPIRATION,default=10m"` LoggerTimeout int `env:"LOG_QUEUE_STATS_INTERVAL_SEC,required"` diff --git a/backend/internal/db/datasaver/fts.go b/backend/internal/db/datasaver/fts.go new file mode 100644 index 000000000..64ca17bc4 --- /dev/null +++ b/backend/internal/db/datasaver/fts.go @@ -0,0 +1,9 @@ +package datasaver + +import ( + "openreplay/backend/pkg/messages" +) + +func (s *saverImpl) init() {} + +func (s *saverImpl) sendToFTS(msg messages.Message, projID uint32) {} diff --git a/backend/internal/db/datasaver/methods.go b/backend/internal/db/datasaver/methods.go deleted file mode 100644 index 07a8b6ba2..000000000 --- a/backend/internal/db/datasaver/methods.go +++ /dev/null @@ -1,17 +0,0 @@ -package datasaver - -import ( - . "openreplay/backend/pkg/messages" -) - -func (s *saverImpl) init() { - // noop -} - -func (s *saverImpl) handleExtraMessage(msg Message) error { - switch m := msg.(type) { - case *PerformanceTrackAggr: - return s.pg.InsertWebStatsPerformance(m) - } - return nil -} diff --git a/backend/internal/db/datasaver/mobile.go b/backend/internal/db/datasaver/mobile.go new file mode 100644 index 000000000..3c9e01a0a --- /dev/null +++ b/backend/internal/db/datasaver/mobile.go @@ -0,0 +1,72 @@ +package datasaver + +import ( + "context" + + "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/sessions" +) + +func (s *saverImpl) handleMobileMessage(sessCtx context.Context, session *sessions.Session, msg messages.Message) error { + switch m := msg.(type) { + case *messages.MobileSessionEnd: + return s.ch.InsertMobileSession(session) + case *messages.MobileUserID: + if err := s.sessions.UpdateUserID(session.SessionID, m.ID); err != nil { + return err + } + s.pg.InsertAutocompleteValue(session.SessionID, session.ProjectID, "USERIDMOBILE", m.ID) + return nil + case *messages.MobileUserAnonymousID: + if err := s.sessions.UpdateAnonymousID(session.SessionID, m.ID); err != nil { + return err + } + s.pg.InsertAutocompleteValue(session.SessionID, session.ProjectID, "USERANONYMOUSIDMOBILE", m.ID) + return nil + case *messages.MobileMetadata: + return s.sessions.UpdateMetadata(m.SessionID(), m.Key, m.Value) + case *messages.MobileEvent: + if err := s.pg.InsertMobileEvent(session, m); err != nil { + return err + } + return s.ch.InsertMobileCustom(session, m) + case *messages.MobileClickEvent: + if err := s.pg.InsertMobileClickEvent(session, m); err != nil { + return err + } + if err := s.sessions.UpdateEventsStats(session.SessionID, 1, 0); err != nil { + return err + } + return s.ch.InsertMobileClick(session, m) + case *messages.MobileSwipeEvent: + if err := s.pg.InsertMobileSwipeEvent(session, m); err != nil { + return err + } + if err := s.sessions.UpdateEventsStats(session.SessionID, 1, 0); err != nil { + return err + } + return s.ch.InsertMobileSwipe(session, m) + case *messages.MobileInputEvent: + if err := s.pg.InsertMobileInputEvent(session, m); err != nil { + return err + } + if err := s.sessions.UpdateEventsStats(session.SessionID, 1, 0); err != nil { + return err + } + return s.ch.InsertMobileInput(session, m) + case *messages.MobileNetworkCall: + if err := s.pg.InsertMobileNetworkCall(session, m); err != nil { + return err + } + return s.ch.InsertMobileRequest(session, m, session.SaveRequestPayload) + case *messages.MobileCrash: + if err := s.pg.InsertMobileCrash(session.SessionID, session.ProjectID, m); err != nil { + return err + } + if err := s.sessions.UpdateIssuesStats(session.SessionID, 1, 1000); err != nil { + return err + } + return s.ch.InsertMobileCrash(session, m) + } + return nil +} diff --git a/backend/internal/db/datasaver/saver.go b/backend/internal/db/datasaver/saver.go index d3d217e4b..476a81e9b 100644 --- a/backend/internal/db/datasaver/saver.go +++ b/backend/internal/db/datasaver/saver.go @@ -30,11 +30,18 @@ type saverImpl struct { tags tags.Tags } -func New(log logger.Logger, cfg *db.Config, pg *postgres.Conn, session sessions.Sessions, tags tags.Tags) Saver { +func New(log logger.Logger, cfg *db.Config, pg *postgres.Conn, ch clickhouse.Connector, session sessions.Sessions, tags tags.Tags) Saver { + switch { + case pg == nil: + log.Fatal(context.Background(), "pg pool is empty") + case ch == nil: + log.Fatal(context.Background(), "ch pool is empty") + } s := &saverImpl{ log: log, cfg: cfg, pg: pg, + ch: ch, sessions: session, tags: tags, } @@ -43,21 +50,34 @@ func New(log logger.Logger, cfg *db.Config, pg *postgres.Conn, session sessions. } func (s *saverImpl) Handle(msg Message) { - sessCtx := context.WithValue(context.Background(), "sessionID", msg.SessionID()) if msg.TypeID() == MsgCustomEvent { defer s.Handle(types.WrapCustomEvent(msg.(*CustomEvent))) } + + var ( + sessCtx = context.WithValue(context.Background(), "sessionID", msg.SessionID()) + session *sessions.Session + err error + ) + if msg.TypeID() == MsgSessionEnd || msg.TypeID() == MsgMobileSessionEnd { + session, err = s.sessions.GetUpdated(msg.SessionID(), true) + } else { + session, err = s.sessions.Get(msg.SessionID()) + } + if err != nil || session == nil { + s.log.Error(sessCtx, "error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, msg.SessionID(), msg) + return + } + if IsMobileType(msg.TypeID()) { - // Handle Mobile messages - if err := s.handleMobileMessage(msg); err != nil { + if err := s.handleMobileMessage(sessCtx, session, msg); err != nil { if !postgres.IsPkeyViolation(err) { s.log.Error(sessCtx, "mobile message insertion error, msg: %+v, err: %s", msg, err) } return } } else { - // Handle Web messages - if err := s.handleMessage(msg); err != nil { + if err := s.handleWebMessage(sessCtx, session, msg); err != nil { if !postgres.IsPkeyViolation(err) { s.log.Error(sessCtx, "web message insertion error, msg: %+v, err: %s", msg, err) } @@ -65,180 +85,22 @@ func (s *saverImpl) Handle(msg Message) { } } - if err := s.handleExtraMessage(msg); err != nil { - s.log.Error(sessCtx, "extra message insertion error, msg: %+v, err: %s", msg, err) - } + s.sendToFTS(msg, session.ProjectID) return } -func (s *saverImpl) handleMobileMessage(msg Message) error { - session, err := s.sessions.Get(msg.SessionID()) - if err != nil { - return err - } - switch m := msg.(type) { - case *MobileUserID: - if err = s.sessions.UpdateUserID(session.SessionID, m.ID); err != nil { - return err - } - s.pg.InsertAutocompleteValue(session.SessionID, session.ProjectID, "USERIDMOBILE", m.ID) - return nil - case *MobileUserAnonymousID: - if err = s.sessions.UpdateAnonymousID(session.SessionID, m.ID); err != nil { - return err - } - s.pg.InsertAutocompleteValue(session.SessionID, session.ProjectID, "USERANONYMOUSIDMOBILE", m.ID) - return nil - case *MobileMetadata: - return s.sessions.UpdateMetadata(m.SessionID(), m.Key, m.Value) - case *MobileEvent: - return s.pg.InsertMobileEvent(session, m) - case *MobileClickEvent: - if err := s.pg.InsertMobileClickEvent(session, m); err != nil { - return err - } - return s.sessions.UpdateEventsStats(session.SessionID, 1, 0) - case *MobileSwipeEvent: - if err := s.pg.InsertMobileSwipeEvent(session, m); err != nil { - return err - } - return s.sessions.UpdateEventsStats(session.SessionID, 1, 0) - case *MobileInputEvent: - if err := s.pg.InsertMobileInputEvent(session, m); err != nil { - return err - } - return s.sessions.UpdateEventsStats(session.SessionID, 1, 0) - case *MobileNetworkCall: - return s.pg.InsertMobileNetworkCall(session, m) - case *MobileCrash: - if err := s.pg.InsertMobileCrash(session.SessionID, session.ProjectID, m); err != nil { - return err - } - return s.sessions.UpdateIssuesStats(session.SessionID, 1, 1000) - } - return nil -} - -func (s *saverImpl) handleMessage(msg Message) error { - session, err := s.sessions.Get(msg.SessionID()) - if err != nil { - return err - } - sessCtx := context.WithValue(context.Background(), "sessionID", msg.SessionID()) - switch m := msg.(type) { - case *SessionStart: - return s.pg.HandleStartEvent(m) - case *SessionEnd: - return s.pg.HandleEndEvent(m.SessionID()) - case *Metadata: - return s.sessions.UpdateMetadata(m.SessionID(), m.Key, m.Value) - case *IssueEvent: - if m.Type == "dead_click" || m.Type == "click_rage" { - if s.tags.ShouldIgnoreTag(session.ProjectID, m.Context) { - return nil - } - } - err = s.pg.InsertIssueEvent(session, m) - if err != nil { - return err - } - return s.sessions.UpdateIssuesStats(session.SessionID, 0, postgres.GetIssueScore(m.Type)) - case *CustomIssue: - ie := &IssueEvent{ - Type: "custom", - Timestamp: m.Timestamp, - MessageID: m.Index, - ContextString: m.Name, - Payload: m.Payload, - } - ie.SetMeta(m.Meta()) - if err = s.pg.InsertIssueEvent(session, ie); err != nil { - return err - } - return s.sessions.UpdateIssuesStats(session.SessionID, 0, postgres.GetIssueScore(ie.Type)) - case *UserID: - if err = s.sessions.UpdateUserID(session.SessionID, m.ID); err != nil { - return err - } - s.pg.InsertAutocompleteValue(session.SessionID, session.ProjectID, "USERID", m.ID) - return nil - case *UserAnonymousID: - if err = s.sessions.UpdateAnonymousID(session.SessionID, m.ID); err != nil { - return err - } - s.pg.InsertAutocompleteValue(session.SessionID, session.ProjectID, "USERANONYMOUSID", m.ID) - return nil - case *CustomEvent: - return s.pg.InsertWebCustomEvent(session, m) - case *MouseClick: - if err = s.pg.InsertWebClickEvent(session, m); err != nil { - return err - } - return s.sessions.UpdateEventsStats(session.SessionID, 1, 0) - case *PageEvent: - if err = s.pg.InsertWebPageEvent(session, m); err != nil { - return err - } - s.sessions.UpdateReferrer(session.SessionID, m.Referrer) - s.sessions.UpdateUTM(session.SessionID, m.URL) - return s.sessions.UpdateEventsStats(session.SessionID, 1, 1) - case *NetworkRequest: - return s.pg.InsertWebNetworkRequest(session, m) - case *GraphQL: - return s.pg.InsertWebGraphQL(session, m) - case *JSException: - wrapper, err := types.WrapJSException(m) - if err != nil { - s.log.Warn(sessCtx, "error on wrapping JSException: %v", err) - } - if err = s.pg.InsertWebErrorEvent(session, wrapper); err != nil { - return err - } - return s.sessions.UpdateIssuesStats(session.SessionID, 1, 1000) - case *IntegrationEvent: - return s.pg.InsertWebErrorEvent(session, types.WrapIntegrationEvent(m)) - case *InputChange: - if err = s.pg.InsertInputChangeEvent(session, m); err != nil { - return err - } - return s.sessions.UpdateEventsStats(session.SessionID, 1, 0) - case *MouseThrashing: - if err = s.pg.InsertMouseThrashing(session, m); err != nil { - return err - } - return s.sessions.UpdateIssuesStats(session.SessionID, 0, 50) - case *CanvasNode: - if err = s.pg.InsertCanvasNode(session, m); err != nil { - return err - } - case *TagTrigger: - if err = s.pg.InsertTagTrigger(session, m); err != nil { - return err - } - } - return nil -} - func (s *saverImpl) Commit() error { - if s.pg != nil { - s.pg.Commit() - } - if s.ch != nil { - s.ch.Commit() - } + s.pg.Commit() + s.ch.Commit() return nil } func (s *saverImpl) Close() error { - if s.pg != nil { - if err := s.pg.Close(); err != nil { - s.log.Error(context.Background(), "pg.Close error: %s", err) - } + if err := s.pg.Close(); err != nil { + s.log.Error(context.Background(), "pg.Close error: %s", err) } - if s.ch != nil { - if err := s.ch.Stop(); err != nil { - s.log.Error(context.Background(), "ch.Close error: %s", err) - } + if err := s.ch.Stop(); err != nil { + s.log.Error(context.Background(), "ch.Close error: %s", err) } return nil } diff --git a/backend/internal/db/datasaver/web.go b/backend/internal/db/datasaver/web.go new file mode 100644 index 000000000..439bcec32 --- /dev/null +++ b/backend/internal/db/datasaver/web.go @@ -0,0 +1,146 @@ +package datasaver + +import ( + "context" + + "openreplay/backend/pkg/db/postgres" + "openreplay/backend/pkg/db/types" + "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/sessions" +) + +func (s *saverImpl) handleWebMessage(sessCtx context.Context, session *sessions.Session, msg messages.Message) error { + switch m := msg.(type) { + case *messages.SessionStart: + return s.pg.HandleStartEvent(m) + case *messages.SessionEnd: + if err := s.pg.HandleEndEvent(m.SessionID()); err != nil { + return err + } + session, err := s.sessions.GetUpdated(m.SessionID(), true) + if err != nil { + return err + } + return s.ch.InsertWebSession(session) + case *messages.Metadata: + return s.sessions.UpdateMetadata(m.SessionID(), m.Key, m.Value) + case *messages.IssueEvent: + if m.Type == "dead_click" || m.Type == "click_rage" { + if s.tags.ShouldIgnoreTag(session.ProjectID, m.Context) { + return nil + } + } + if err := s.pg.InsertIssueEvent(session, m); err != nil { + return err + } + if err := s.sessions.UpdateIssuesStats(session.SessionID, 0, postgres.GetIssueScore(m.Type)); err != nil { + return err + } + return s.ch.InsertIssue(session, m) + case *messages.CustomIssue: + ie := &messages.IssueEvent{ + Type: "custom", + Timestamp: m.Timestamp, + MessageID: m.Index, + ContextString: m.Name, + Payload: m.Payload, + } + ie.SetMeta(m.Meta()) + if err := s.pg.InsertIssueEvent(session, ie); err != nil { + return err + } + return s.sessions.UpdateIssuesStats(session.SessionID, 0, postgres.GetIssueScore(ie.Type)) + case *messages.UserID: + if err := s.sessions.UpdateUserID(session.SessionID, m.ID); err != nil { + return err + } + s.pg.InsertAutocompleteValue(session.SessionID, session.ProjectID, "USERID", m.ID) + return nil + case *messages.UserAnonymousID: + if err := s.sessions.UpdateAnonymousID(session.SessionID, m.ID); err != nil { + return err + } + s.pg.InsertAutocompleteValue(session.SessionID, session.ProjectID, "USERANONYMOUSID", m.ID) + return nil + case *messages.CustomEvent: + if err := s.pg.InsertWebCustomEvent(session, m); err != nil { + return err + } + return s.ch.InsertCustom(session, m) + case *messages.MouseClick: + if err := s.pg.InsertWebClickEvent(session, m); err != nil { + return err + } + if err := s.sessions.UpdateEventsStats(session.SessionID, 1, 0); err != nil { + return err + } + return s.ch.InsertWebClickEvent(session, m) + case *messages.PageEvent: + if err := s.pg.InsertWebPageEvent(session, m); err != nil { + return err + } + s.sessions.UpdateReferrer(session.SessionID, m.Referrer) + s.sessions.UpdateUTM(session.SessionID, m.URL) + if err := s.sessions.UpdateEventsStats(session.SessionID, 1, 1); err != nil { + return err + } + return s.ch.InsertWebPageEvent(session, m) + case *messages.NetworkRequest: + if err := s.pg.InsertWebNetworkRequest(session, m); err != nil { + return err + } + return s.ch.InsertRequest(session, m, session.SaveRequestPayload) + case *messages.GraphQL: + if err := s.pg.InsertWebGraphQL(session, m); err != nil { + return err + } + return s.ch.InsertGraphQL(session, m) + case *messages.JSException: + wrapper, err := types.WrapJSException(m) + if err != nil { + s.log.Warn(sessCtx, "error on wrapping JSException: %v", err) + } + if err = s.pg.InsertWebErrorEvent(session, wrapper); err != nil { + return err + } + if err := s.sessions.UpdateIssuesStats(session.SessionID, 1, 1000); err != nil { + return err + } + return s.ch.InsertWebErrorEvent(session, wrapper) + case *messages.IntegrationEvent: + if err := s.pg.InsertWebErrorEvent(session, types.WrapIntegrationEvent(m)); err != nil { + return err + } + return s.ch.InsertWebErrorEvent(session, types.WrapIntegrationEvent(m)) + case *messages.InputChange: + if err := s.pg.InsertInputChangeEvent(session, m); err != nil { + return err + } + if err := s.sessions.UpdateEventsStats(session.SessionID, 1, 0); err != nil { + return err + } + return s.ch.InsertWebInputDuration(session, m) + case *messages.MouseThrashing: + if err := s.pg.InsertMouseThrashing(session, m); err != nil { + return err + } + if err := s.sessions.UpdateIssuesStats(session.SessionID, 0, 50); err != nil { + return err + } + return s.ch.InsertMouseThrashing(session, m) + case *messages.CanvasNode: + if err := s.pg.InsertCanvasNode(session, m); err != nil { + return err + } + case *messages.TagTrigger: + if err := s.pg.InsertTagTrigger(session, m); err != nil { + return err + } + case *messages.PerformanceTrackAggr: + if err := s.pg.InsertWebStatsPerformance(m); err != nil { + return err + } + return s.ch.InsertWebPerformanceTrackAggr(session, m) + } + return nil +} diff --git a/ee/backend/pkg/db/clickhouse/bulk.go b/backend/pkg/db/clickhouse/bulk.go similarity index 99% rename from ee/backend/pkg/db/clickhouse/bulk.go rename to backend/pkg/db/clickhouse/bulk.go index 6eb8d98fd..f070f4a15 100644 --- a/ee/backend/pkg/db/clickhouse/bulk.go +++ b/backend/pkg/db/clickhouse/bulk.go @@ -5,10 +5,11 @@ import ( "errors" "fmt" "log" - "openreplay/backend/pkg/metrics/database" "time" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + + "openreplay/backend/pkg/metrics/database" ) type Bulk interface { diff --git a/backend/pkg/db/clickhouse/connector.go b/backend/pkg/db/clickhouse/connector.go index 727ad7f7b..71d94ab85 100644 --- a/backend/pkg/db/clickhouse/connector.go +++ b/backend/pkg/db/clickhouse/connector.go @@ -1,19 +1,31 @@ package clickhouse import ( + "errors" + "fmt" + "log" + "strings" + "time" + + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + + "openreplay/backend/internal/config/common" "openreplay/backend/pkg/db/types" + "openreplay/backend/pkg/hashid" "openreplay/backend/pkg/messages" "openreplay/backend/pkg/sessions" + "openreplay/backend/pkg/url" ) type Connector interface { Prepare() error Commit() error Stop() error + // Web InsertWebSession(session *sessions.Session) error InsertWebPageEvent(session *sessions.Session, msg *messages.PageEvent) error InsertWebClickEvent(session *sessions.Session, msg *messages.MouseClick) error - InsertWebInputEvent(session *sessions.Session, msg *messages.InputEvent) error InsertWebErrorEvent(session *sessions.Session, msg *types.ErrorEvent) error InsertWebPerformanceTrackAggr(session *sessions.Session, msg *messages.PerformanceTrackAggr) error InsertAutocomplete(session *sessions.Session, msgType, msgValue string) error @@ -21,4 +33,669 @@ type Connector interface { InsertCustom(session *sessions.Session, msg *messages.CustomEvent) error InsertGraphQL(session *sessions.Session, msg *messages.GraphQL) error InsertIssue(session *sessions.Session, msg *messages.IssueEvent) error + InsertWebInputDuration(session *sessions.Session, msg *messages.InputChange) error + InsertMouseThrashing(session *sessions.Session, msg *messages.MouseThrashing) error + // Mobile + InsertMobileSession(session *sessions.Session) error + InsertMobileCustom(session *sessions.Session, msg *messages.MobileEvent) error + InsertMobileClick(session *sessions.Session, msg *messages.MobileClickEvent) error + InsertMobileSwipe(session *sessions.Session, msg *messages.MobileSwipeEvent) error + InsertMobileInput(session *sessions.Session, msg *messages.MobileInputEvent) error + InsertMobileRequest(session *sessions.Session, msg *messages.MobileNetworkCall, savePayload bool) error + InsertMobileCrash(session *sessions.Session, msg *messages.MobileCrash) error +} + +type task struct { + bulks []Bulk +} + +func NewTask() *task { + return &task{bulks: make([]Bulk, 0, 21)} +} + +type connectorImpl struct { + conn driver.Conn + batches map[string]Bulk //driver.Batch + workerTask chan *task + done chan struct{} + finished chan struct{} +} + +func NewConnector(cfg common.Clickhouse) Connector { + conn, err := clickhouse.Open(&clickhouse.Options{ + Addr: []string{cfg.GetTrimmedURL()}, + Auth: clickhouse.Auth{ + Database: cfg.Database, + Username: cfg.LegacyUserName, + Password: cfg.LegacyPassword, + }, + MaxOpenConns: 20, + MaxIdleConns: 15, + ConnMaxLifetime: 3 * time.Minute, + Compression: &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }, + }) + if err != nil { + log.Fatal(err) + } + + c := &connectorImpl{ + conn: conn, + batches: make(map[string]Bulk, 20), + workerTask: make(chan *task, 1), + done: make(chan struct{}), + finished: make(chan struct{}), + } + go c.worker() + return c +} + +func (c *connectorImpl) newBatch(name, query string) error { + batch, err := NewBulk(c.conn, name, query) + if err != nil { + return fmt.Errorf("can't create new batch: %s", err) + } + c.batches[name] = batch + return nil +} + +var batches = map[string]string{ + // Web + "sessions": "INSERT INTO experimental.sessions (session_id, project_id, user_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, user_state, user_city, datetime, duration, pages_count, events_count, errors_count, issue_score, referrer, issue_types, tracker_version, user_browser, user_browser_version, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10, timezone, utm_source, utm_medium, utm_campaign) VALUES (?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), ?, ?, ?, ?)", + "autocompletes": "INSERT INTO experimental.autocomplete (project_id, type, value) VALUES (?, ?, SUBSTR(?, 1, 8000))", + "pages": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, url, request_start, response_start, response_end, dom_content_loaded_event_start, dom_content_loaded_event_end, load_event_start, load_event_end, first_paint, first_contentful_paint_time, speed_index, visually_complete, time_to_interactive, url_path, event_type) VALUES (?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), ?)", + "clicks": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, label, hesitation_time, event_type, selector, normalized_x, normalized_y, url, url_path) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000))", + "inputs": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, label, event_type, duration, hesitation_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + "errors": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, source, name, message, error_id, event_type, error_tags_keys, error_tags_values) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + "performance": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, url, min_fps, avg_fps, max_fps, min_cpu, avg_cpu, max_cpu, min_total_js_heap_size, avg_total_js_heap_size, max_total_js_heap_size, min_used_js_heap_size, avg_used_js_heap_size, max_used_js_heap_size, event_type) VALUES (?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + "requests": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, url, request_body, response_body, status, method, duration, success, event_type, transfer_size, url_path) VALUES (?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000))", + "custom": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, name, payload, event_type) VALUES (?, ?, ?, ?, ?, ?, ?)", + "graphql": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, name, request_body, response_body, event_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + "issuesEvents": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, issue_id, issue_type, event_type, url, url_path) VALUES (?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000))", + "issues": "INSERT INTO experimental.issues (project_id, issue_id, type, context_string) VALUES (?, ?, ?, ?)", + //Mobile + "ios_sessions": "INSERT INTO experimental.sessions (session_id, project_id, user_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, user_state, user_city, datetime, duration, pages_count, events_count, errors_count, issue_score, referrer, issue_types, tracker_version, user_browser, user_browser_version, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10, platform, timezone) VALUES (?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), ?, ?)", + "ios_custom": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, name, payload, event_type) VALUES (?, ?, ?, ?, ?, ?, ?)", + "ios_clicks": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, label, event_type) VALUES (?, ?, ?, ?, ?, ?)", + "ios_swipes": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, label, direction, event_type) VALUES (?, ?, ?, ?, ?, ?, ?)", + "ios_inputs": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, label, event_type) VALUES (?, ?, ?, ?, ?, ?)", + "ios_requests": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, url, request_body, response_body, status, method, duration, success, event_type) VALUES (?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?)", + "ios_crashes": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, name, reason, stacktrace, event_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", +} + +func (c *connectorImpl) Prepare() error { + for table, query := range batches { + if err := c.newBatch(table, query); err != nil { + return fmt.Errorf("can't create %s batch: %s", table, err) + } + } + return nil +} + +func (c *connectorImpl) Commit() error { + newTask := NewTask() + for _, b := range c.batches { + newTask.bulks = append(newTask.bulks, b) + } + c.batches = make(map[string]Bulk, 20) + if err := c.Prepare(); err != nil { + log.Printf("can't prepare new CH batch set: %s", err) + } + c.workerTask <- newTask + return nil +} + +func (c *connectorImpl) Stop() error { + c.done <- struct{}{} + <-c.finished + return c.conn.Close() +} + +func (c *connectorImpl) sendBulks(t *task) { + for _, b := range t.bulks { + if err := b.Send(); err != nil { + log.Printf("can't send batch: %s", err) + } + } +} + +func (c *connectorImpl) worker() { + for { + select { + case t := <-c.workerTask: + c.sendBulks(t) + case <-c.done: + for t := range c.workerTask { + c.sendBulks(t) + } + c.finished <- struct{}{} + return + } + } +} + +func (c *connectorImpl) checkError(name string, err error) { + if err != clickhouse.ErrBatchAlreadySent { + log.Printf("can't create %s batch after failed append operation: %s", name, err) + } +} + +func (c *connectorImpl) InsertWebInputDuration(session *sessions.Session, msg *messages.InputChange) error { + if msg.Label == "" { + return nil + } + if err := c.batches["inputs"].Append( + session.SessionID, + uint16(session.ProjectID), + msg.MsgID(), + datetime(msg.Timestamp), + msg.Label, + "INPUT", + nullableUint16(uint16(msg.InputDuration)), + nullableUint32(uint32(msg.HesitationTime)), + ); err != nil { + c.checkError("inputs", err) + return fmt.Errorf("can't append to inputs batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertMouseThrashing(session *sessions.Session, msg *messages.MouseThrashing) error { + issueID := hashid.MouseThrashingID(session.ProjectID, session.SessionID, msg.Timestamp) + // Insert issue event to batches + if err := c.batches["issuesEvents"].Append( + session.SessionID, + uint16(session.ProjectID), + msg.MsgID(), + datetime(msg.Timestamp), + issueID, + "mouse_thrashing", + "ISSUE", + msg.Url, + extractUrlPath(msg.Url), + ); err != nil { + c.checkError("issuesEvents", err) + return fmt.Errorf("can't append to issuesEvents batch: %s", err) + } + if err := c.batches["issues"].Append( + uint16(session.ProjectID), + issueID, + "mouse_thrashing", + msg.Url, + ); err != nil { + c.checkError("issues", err) + return fmt.Errorf("can't append to issues batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertIssue(session *sessions.Session, msg *messages.IssueEvent) error { + issueID := hashid.IssueID(session.ProjectID, msg) + // Check issue type before insert to avoid panic from clickhouse lib + switch msg.Type { + case "click_rage", "dead_click", "excessive_scrolling", "bad_request", "missing_resource", "memory", "cpu", "slow_resource", "slow_page_load", "crash", "ml_cpu", "ml_memory", "ml_dead_click", "ml_click_rage", "ml_mouse_thrashing", "ml_excessive_scrolling", "ml_slow_resources", "custom", "js_exception", "mouse_thrashing", "app_crash": + default: + return fmt.Errorf("unknown issueType: %s", msg.Type) + } + // Insert issue event to batches + if err := c.batches["issuesEvents"].Append( + session.SessionID, + uint16(session.ProjectID), + msg.MessageID, + datetime(msg.Timestamp), + issueID, + msg.Type, + "ISSUE", + msg.URL, + extractUrlPath(msg.URL), + ); err != nil { + c.checkError("issuesEvents", err) + return fmt.Errorf("can't append to issuesEvents batch: %s", err) + } + if err := c.batches["issues"].Append( + uint16(session.ProjectID), + issueID, + msg.Type, + msg.ContextString, + ); err != nil { + c.checkError("issues", err) + return fmt.Errorf("can't append to issues batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertWebSession(session *sessions.Session) error { + if session.Duration == nil { + return errors.New("trying to insert session with nil duration") + } + if err := c.batches["sessions"].Append( + session.SessionID, + uint16(session.ProjectID), + session.UserID, + session.UserUUID, + session.UserOS, + nullableString(session.UserOSVersion), + nullableString(session.UserDevice), + session.UserDeviceType, + session.UserCountry, + session.UserState, + session.UserCity, + datetime(session.Timestamp), + uint32(*session.Duration), + uint16(session.PagesCount), + uint16(session.EventsCount), + uint16(session.ErrorsCount), + uint32(session.IssueScore), + session.Referrer, + session.IssueTypes, + session.TrackerVersion, + session.UserBrowser, + nullableString(session.UserBrowserVersion), + session.Metadata1, + session.Metadata2, + session.Metadata3, + session.Metadata4, + session.Metadata5, + session.Metadata6, + session.Metadata7, + session.Metadata8, + session.Metadata9, + session.Metadata10, + session.Timezone, + session.UtmSource, + session.UtmMedium, + session.UtmCampaign, + ); err != nil { + c.checkError("sessions", err) + return fmt.Errorf("can't append to sessions batch: %s", err) + } + return nil +} + +func extractUrlPath(fullUrl string) string { + _, path, query, err := url.GetURLParts(fullUrl) + if err != nil { + log.Printf("can't parse url: %s", err) + return "" + } + pathQuery := path + if query != "" { + pathQuery += "?" + query + } + return strings.ToLower(pathQuery) +} + +func (c *connectorImpl) InsertWebPageEvent(session *sessions.Session, msg *messages.PageEvent) error { + if err := c.batches["pages"].Append( + session.SessionID, + uint16(session.ProjectID), + msg.MessageID, + datetime(msg.Timestamp), + msg.URL, + nullableUint16(uint16(msg.RequestStart)), + nullableUint16(uint16(msg.ResponseStart)), + nullableUint16(uint16(msg.ResponseEnd)), + nullableUint16(uint16(msg.DomContentLoadedEventStart)), + nullableUint16(uint16(msg.DomContentLoadedEventEnd)), + nullableUint16(uint16(msg.LoadEventStart)), + nullableUint16(uint16(msg.LoadEventEnd)), + nullableUint16(uint16(msg.FirstPaint)), + nullableUint16(uint16(msg.FirstContentfulPaint)), + nullableUint16(uint16(msg.SpeedIndex)), + nullableUint16(uint16(msg.VisuallyComplete)), + nullableUint16(uint16(msg.TimeToInteractive)), + extractUrlPath(msg.URL), + "LOCATION", + ); err != nil { + c.checkError("pages", err) + return fmt.Errorf("can't append to pages batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertWebClickEvent(session *sessions.Session, msg *messages.MouseClick) error { + if msg.Label == "" { + return nil + } + var nX *float32 = nil + var nY *float32 = nil + if msg.NormalizedX != 101 && msg.NormalizedY != 101 { + // To support previous versions of tracker + if msg.NormalizedX <= 100 && msg.NormalizedY <= 100 { + msg.NormalizedX *= 100 + msg.NormalizedY *= 100 + } + normalizedX := float32(msg.NormalizedX) / 100.0 + normalizedY := float32(msg.NormalizedY) / 100.0 + nXVal := normalizedX + nX = &nXVal + nYVal := normalizedY + nY = &nYVal + } + if err := c.batches["clicks"].Append( + session.SessionID, + uint16(session.ProjectID), + msg.MsgID(), + datetime(msg.Timestamp), + msg.Label, + nullableUint32(uint32(msg.HesitationTime)), + "CLICK", + msg.Selector, + nX, + nY, + msg.Url, + extractUrlPath(msg.Url), + ); err != nil { + c.checkError("clicks", err) + return fmt.Errorf("can't append to clicks batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertWebErrorEvent(session *sessions.Session, msg *types.ErrorEvent) error { + keys, values := make([]string, 0, len(msg.Tags)), make([]*string, 0, len(msg.Tags)) + for k, v := range msg.Tags { + keys = append(keys, k) + values = append(values, v) + } + // Check error source before insert to avoid panic from clickhouse lib + switch msg.Source { + case "js_exception", "bugsnag", "cloudwatch", "datadog", "elasticsearch", "newrelic", "rollbar", "sentry", "stackdriver", "sumologic": + default: + return fmt.Errorf("unknown error source: %s", msg.Source) + } + msgID, _ := msg.ID(session.ProjectID) + // Insert event to batch + if err := c.batches["errors"].Append( + session.SessionID, + uint16(session.ProjectID), + msg.MessageID, + datetime(msg.Timestamp), + msg.Source, + nullableString(msg.Name), + msg.Message, + msgID, + "ERROR", + keys, + values, + ); err != nil { + c.checkError("errors", err) + return fmt.Errorf("can't append to errors batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertWebPerformanceTrackAggr(session *sessions.Session, msg *messages.PerformanceTrackAggr) error { + var timestamp uint64 = (msg.TimestampStart + msg.TimestampEnd) / 2 + if err := c.batches["performance"].Append( + session.SessionID, + uint16(session.ProjectID), + uint64(0), // TODO: find messageID for performance events + datetime(timestamp), + nullableString(msg.Meta().Url), + uint8(msg.MinFPS), + uint8(msg.AvgFPS), + uint8(msg.MaxFPS), + uint8(msg.MinCPU), + uint8(msg.AvgCPU), + uint8(msg.MaxCPU), + msg.MinTotalJSHeapSize, + msg.AvgTotalJSHeapSize, + msg.MaxTotalJSHeapSize, + msg.MinUsedJSHeapSize, + msg.AvgUsedJSHeapSize, + msg.MaxUsedJSHeapSize, + "PERFORMANCE", + ); err != nil { + c.checkError("performance", err) + return fmt.Errorf("can't append to performance batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertAutocomplete(session *sessions.Session, msgType, msgValue string) error { + if len(msgValue) == 0 { + return nil + } + if err := c.batches["autocompletes"].Append( + uint16(session.ProjectID), + msgType, + msgValue, + ); err != nil { + c.checkError("autocompletes", err) + return fmt.Errorf("can't append to autocompletes batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertRequest(session *sessions.Session, msg *messages.NetworkRequest, savePayload bool) error { + urlMethod := url.EnsureMethod(msg.Method) + if urlMethod == "" { + return fmt.Errorf("can't parse http method. sess: %d, method: %s", session.SessionID, msg.Method) + } + var request, response *string + if savePayload { + request = &msg.Request + response = &msg.Response + } + if err := c.batches["requests"].Append( + session.SessionID, + uint16(session.ProjectID), + msg.Meta().Index, + datetime(uint64(msg.Meta().Timestamp)), + msg.URL, + request, + response, + uint16(msg.Status), + url.EnsureMethod(msg.Method), + uint16(msg.Duration), + msg.Status < 400, + "REQUEST", + uint32(msg.TransferredBodySize), + extractUrlPath(msg.URL), + ); err != nil { + c.checkError("requests", err) + return fmt.Errorf("can't append to requests batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertCustom(session *sessions.Session, msg *messages.CustomEvent) error { + if err := c.batches["custom"].Append( + session.SessionID, + uint16(session.ProjectID), + msg.Meta().Index, + datetime(uint64(msg.Meta().Timestamp)), + msg.Name, + msg.Payload, + "CUSTOM", + ); err != nil { + c.checkError("custom", err) + return fmt.Errorf("can't append to custom batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertGraphQL(session *sessions.Session, msg *messages.GraphQL) error { + if err := c.batches["graphql"].Append( + session.SessionID, + uint16(session.ProjectID), + msg.Meta().Index, + datetime(uint64(msg.Meta().Timestamp)), + msg.OperationName, + nullableString(msg.Variables), + nullableString(msg.Response), + "GRAPHQL", + ); err != nil { + c.checkError("graphql", err) + return fmt.Errorf("can't append to graphql batch: %s", err) + } + return nil +} + +// Mobile events + +func (c *connectorImpl) InsertMobileSession(session *sessions.Session) error { + if session.Duration == nil { + return errors.New("trying to insert mobile session with nil duration") + } + if err := c.batches["ios_sessions"].Append( + session.SessionID, + uint16(session.ProjectID), + session.UserID, + session.UserUUID, + session.UserOS, + nullableString(session.UserOSVersion), + nullableString(session.UserDevice), + session.UserDeviceType, + session.UserCountry, + session.UserState, + session.UserCity, + datetime(session.Timestamp), + uint32(*session.Duration), + uint16(session.PagesCount), + uint16(session.EventsCount), + uint16(session.ErrorsCount), + uint32(session.IssueScore), + session.Referrer, + session.IssueTypes, + session.TrackerVersion, + session.UserBrowser, + nullableString(session.UserBrowserVersion), + session.Metadata1, + session.Metadata2, + session.Metadata3, + session.Metadata4, + session.Metadata5, + session.Metadata6, + session.Metadata7, + session.Metadata8, + session.Metadata9, + session.Metadata10, + "ios", + session.Timezone, + ); err != nil { + c.checkError("ios_sessions", err) + return fmt.Errorf("can't append to sessions batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertMobileCustom(session *sessions.Session, msg *messages.MobileEvent) error { + if err := c.batches["ios_custom"].Append( + session.SessionID, + uint16(session.ProjectID), + msg.Meta().Index, + datetime(uint64(msg.Meta().Timestamp)), + msg.Name, + msg.Payload, + "CUSTOM", + ); err != nil { + c.checkError("ios_custom", err) + return fmt.Errorf("can't append to mobile custom batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertMobileClick(session *sessions.Session, msg *messages.MobileClickEvent) error { + if msg.Label == "" { + return nil + } + if err := c.batches["ios_clicks"].Append( + session.SessionID, + uint16(session.ProjectID), + msg.MsgID(), + datetime(msg.Timestamp), + msg.Label, + "TAP", + ); err != nil { + c.checkError("ios_clicks", err) + return fmt.Errorf("can't append to mobile clicks batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertMobileSwipe(session *sessions.Session, msg *messages.MobileSwipeEvent) error { + if msg.Label == "" { + return nil + } + if err := c.batches["ios_swipes"].Append( + session.SessionID, + uint16(session.ProjectID), + msg.MsgID(), + datetime(msg.Timestamp), + msg.Label, + nullableString(msg.Direction), + "SWIPE", + ); err != nil { + c.checkError("ios_clicks", err) + return fmt.Errorf("can't append to mobile clicks batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertMobileInput(session *sessions.Session, msg *messages.MobileInputEvent) error { + if msg.Label == "" { + return nil + } + if err := c.batches["ios_inputs"].Append( + session.SessionID, + uint16(session.ProjectID), + msg.MsgID(), + datetime(msg.Timestamp), + msg.Label, + "INPUT", + ); err != nil { + c.checkError("ios_inputs", err) + return fmt.Errorf("can't append to mobile inputs batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertMobileRequest(session *sessions.Session, msg *messages.MobileNetworkCall, savePayload bool) error { + urlMethod := url.EnsureMethod(msg.Method) + if urlMethod == "" { + return fmt.Errorf("can't parse http method. sess: %d, method: %s", session.SessionID, msg.Method) + } + var request, response *string + if savePayload { + request = &msg.Request + response = &msg.Response + } + if err := c.batches["ios_requests"].Append( + session.SessionID, + uint16(session.ProjectID), + msg.Meta().Index, + datetime(uint64(msg.Meta().Timestamp)), + msg.URL, + request, + response, + uint16(msg.Status), + url.EnsureMethod(msg.Method), + uint16(msg.Duration), + msg.Status < 400, + "REQUEST", + ); err != nil { + c.checkError("ios_requests", err) + return fmt.Errorf("can't append to mobile requests batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertMobileCrash(session *sessions.Session, msg *messages.MobileCrash) error { + if err := c.batches["ios_crashes"].Append( + session.SessionID, + uint16(session.ProjectID), + msg.MsgID(), + datetime(msg.Timestamp), + msg.Name, + msg.Reason, + msg.Stacktrace, + "CRASH", + ); err != nil { + c.checkError("ios_crashes", err) + return fmt.Errorf("can't append to mobile crashges batch: %s", err) + } + return nil } diff --git a/ee/backend/pkg/db/clickhouse/insert_type.go b/backend/pkg/db/clickhouse/insert_type.go similarity index 100% rename from ee/backend/pkg/db/clickhouse/insert_type.go rename to backend/pkg/db/clickhouse/insert_type.go diff --git a/backend/pkg/db/postgres/connector.go b/backend/pkg/db/postgres/connector.go index cda778d7c..7ee1f997f 100644 --- a/backend/pkg/db/postgres/connector.go +++ b/backend/pkg/db/postgres/connector.go @@ -19,20 +19,17 @@ type Conn struct { Pool pool.Pool batches *batch.BatchSet bulks *BulkSet - chConn CH // hack for autocomplete inserts, TODO: rewrite + chConn CH } -func (conn *Conn) SetClickHouse(ch CH) { - conn.chConn = ch -} - -func NewConn(log logger.Logger, pool pool.Pool) *Conn { +func NewConn(log logger.Logger, pool pool.Pool, ch CH) *Conn { if pool == nil { log.Fatal(context.Background(), "pg pool is empty") } return &Conn{ log: log, Pool: pool, + chConn: ch, bulks: NewBulkSet(log, pool), batches: batch.NewBatchSet(log, pool), } diff --git a/backend/pkg/sessions/sessions.go b/backend/pkg/sessions/sessions.go index 446fd1b1f..bd2519cc6 100644 --- a/backend/pkg/sessions/sessions.go +++ b/backend/pkg/sessions/sessions.go @@ -16,7 +16,7 @@ type Sessions interface { AddUnStarted(session *UnStartedSession) error AddCached(sessionID uint64, data map[string]string) error Get(sessionID uint64) (*Session, error) - GetUpdated(sessionID uint64) (*Session, error) + GetUpdated(sessionID uint64, keepInCache bool) (*Session, error) GetCached(sessionID uint64) (map[string]string, error) GetDuration(sessionID uint64) (uint64, error) UpdateDuration(sessionID uint64, timestamp uint64) (uint64, error) @@ -104,11 +104,14 @@ func (s *sessionsImpl) Get(sessionID uint64) (*Session, error) { } // Special method for clickhouse connector -func (s *sessionsImpl) GetUpdated(sessionID uint64) (*Session, error) { +func (s *sessionsImpl) GetUpdated(sessionID uint64, keepInCache bool) (*Session, error) { session, err := s.getFromDB(sessionID) if err != nil { return nil, err } + if !keepInCache { + return session, nil + } if err := s.cache.Set(session); err != nil { ctx := context.WithValue(context.Background(), "sessionID", sessionID) s.log.Warn(ctx, "failed to cache session: %s", err) diff --git a/ee/backend/internal/db/datasaver/fts.go b/ee/backend/internal/db/datasaver/fts.go index 34f75b006..15f0fd1e9 100644 --- a/ee/backend/internal/db/datasaver/fts.go +++ b/ee/backend/internal/db/datasaver/fts.go @@ -3,7 +3,9 @@ package datasaver import ( "encoding/json" "log" + "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/queue" ) type NetworkRequestFTS struct { @@ -98,6 +100,12 @@ func WrapGraphQL(m *messages.GraphQL, projID uint32) *GraphQLFTS { } } +func (s *saverImpl) init() { + if s.cfg.UseQuickwit { + s.producer = queue.NewProducer(s.cfg.MessageSizeLimit, true) + } +} + func (s *saverImpl) sendToFTS(msg messages.Message, projID uint32) { // Skip, if FTS is disabled if s.producer == nil { diff --git a/ee/backend/internal/db/datasaver/methods.go b/ee/backend/internal/db/datasaver/methods.go deleted file mode 100644 index 1644a1fc0..000000000 --- a/ee/backend/internal/db/datasaver/methods.go +++ /dev/null @@ -1,93 +0,0 @@ -package datasaver - -import ( - "log" - - "openreplay/backend/pkg/db/clickhouse" - "openreplay/backend/pkg/db/types" - "openreplay/backend/pkg/env" - "openreplay/backend/pkg/messages" - "openreplay/backend/pkg/queue" - "openreplay/backend/pkg/sessions" -) - -func (s *saverImpl) init() { - s.ch = clickhouse.NewConnector(env.String("CLICKHOUSE_STRING")) - if err := s.ch.Prepare(); err != nil { - log.Fatalf("can't prepare clickhouse: %s", err) - } - s.pg.SetClickHouse(s.ch) - if s.cfg.UseQuickwit { - s.producer = queue.NewProducer(s.cfg.MessageSizeLimit, true) - } -} - -func (s *saverImpl) handleExtraMessage(msg messages.Message) error { - // Get session data - var ( - session *sessions.Session - err error - ) - - if msg.TypeID() == messages.MsgSessionEnd || msg.TypeID() == messages.MsgMobileSessionEnd { - session, err = s.sessions.GetUpdated(msg.SessionID()) - } else { - session, err = s.sessions.Get(msg.SessionID()) - } - if err != nil || session == nil { - log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, msg.SessionID(), msg) - return err - } - - // Send data to quickwit - s.sendToFTS(msg, session.ProjectID) - - // Handle message - switch m := msg.(type) { - case *messages.SessionEnd: - return s.ch.InsertWebSession(session) - case *messages.PerformanceTrackAggr: - return s.ch.InsertWebPerformanceTrackAggr(session, m) - case *messages.MouseClick: - return s.ch.InsertWebClickEvent(session, m) - // Unique for Web - case *messages.PageEvent: - return s.ch.InsertWebPageEvent(session, m) - case *messages.JSException: - wrapper, _ := types.WrapJSException(m) - return s.ch.InsertWebErrorEvent(session, wrapper) - case *messages.IntegrationEvent: - return s.ch.InsertWebErrorEvent(session, types.WrapIntegrationEvent(m)) - case *messages.IssueEvent: - return s.ch.InsertIssue(session, m) - case *messages.CustomEvent: - return s.ch.InsertCustom(session, m) - case *messages.NetworkRequest: - if err := s.ch.InsertRequest(session, m, session.SaveRequestPayload); err != nil { - log.Printf("can't insert request event into clickhouse: %s", err) - } - case *messages.GraphQL: - return s.ch.InsertGraphQL(session, m) - case *messages.InputChange: - return s.ch.InsertWebInputDuration(session, m) - case *messages.MouseThrashing: - return s.ch.InsertMouseThrashing(session, m) - - // Mobile messages - case *messages.MobileSessionEnd: - return s.ch.InsertMobileSession(session) - case *messages.MobileEvent: - return s.ch.InsertMobileCustom(session, m) - case *messages.MobileClickEvent: - return s.ch.InsertMobileClick(session, m) - case *messages.MobileSwipeEvent: - return s.ch.InsertMobileSwipe(session, m) - case *messages.MobileInputEvent: - return s.ch.InsertMobileInput(session, m) - case *messages.MobileNetworkCall: - return s.ch.InsertMobileRequest(session, m, session.SaveRequestPayload) - case *messages.MobileCrash: - return s.ch.InsertMobileCrash(session, m) - } - return nil -} diff --git a/ee/backend/pkg/db/clickhouse/connector.go b/ee/backend/pkg/db/clickhouse/connector.go deleted file mode 100644 index b61acd547..000000000 --- a/ee/backend/pkg/db/clickhouse/connector.go +++ /dev/null @@ -1,713 +0,0 @@ -package clickhouse - -import ( - "errors" - "fmt" - "github.com/ClickHouse/clickhouse-go/v2" - "github.com/ClickHouse/clickhouse-go/v2/lib/driver" - "log" - "openreplay/backend/pkg/db/types" - "openreplay/backend/pkg/hashid" - "openreplay/backend/pkg/messages" - "openreplay/backend/pkg/sessions" - "openreplay/backend/pkg/url" - "os" - "strings" - "time" - - "openreplay/backend/pkg/license" -) - -type Connector interface { - Prepare() error - Commit() error - Stop() error - // Web - InsertWebSession(session *sessions.Session) error - InsertWebPageEvent(session *sessions.Session, msg *messages.PageEvent) error - InsertWebClickEvent(session *sessions.Session, msg *messages.MouseClick) error - InsertWebErrorEvent(session *sessions.Session, msg *types.ErrorEvent) error - InsertWebPerformanceTrackAggr(session *sessions.Session, msg *messages.PerformanceTrackAggr) error - InsertAutocomplete(session *sessions.Session, msgType, msgValue string) error - InsertRequest(session *sessions.Session, msg *messages.NetworkRequest, savePayload bool) error - InsertCustom(session *sessions.Session, msg *messages.CustomEvent) error - InsertGraphQL(session *sessions.Session, msg *messages.GraphQL) error - InsertIssue(session *sessions.Session, msg *messages.IssueEvent) error - InsertWebInputDuration(session *sessions.Session, msg *messages.InputChange) error - InsertMouseThrashing(session *sessions.Session, msg *messages.MouseThrashing) error - // Mobile - InsertMobileSession(session *sessions.Session) error - InsertMobileCustom(session *sessions.Session, msg *messages.MobileEvent) error - InsertMobileClick(session *sessions.Session, msg *messages.MobileClickEvent) error - InsertMobileSwipe(session *sessions.Session, msg *messages.MobileSwipeEvent) error - InsertMobileInput(session *sessions.Session, msg *messages.MobileInputEvent) error - InsertMobileRequest(session *sessions.Session, msg *messages.MobileNetworkCall, savePayload bool) error - InsertMobileCrash(session *sessions.Session, msg *messages.MobileCrash) error -} - -type task struct { - bulks []Bulk -} - -func NewTask() *task { - return &task{bulks: make([]Bulk, 0, 21)} -} - -type connectorImpl struct { - conn driver.Conn - batches map[string]Bulk //driver.Batch - workerTask chan *task - done chan struct{} - finished chan struct{} -} - -func getEnv(key, fallback string) string { - if value, ok := os.LookupEnv(key); ok { - return value - } - return fallback -} - -func NewConnector(url string) Connector { - license.CheckLicense() - url = strings.TrimPrefix(url, "tcp://") - url = strings.TrimSuffix(url, "/default") - userName := getEnv("CH_USERNAME", "default") - password := getEnv("CH_PASSWORD", "") - conn, err := clickhouse.Open(&clickhouse.Options{ - Addr: []string{url}, - Auth: clickhouse.Auth{ - Database: "default", - Username: userName, - Password: password, - }, - MaxOpenConns: 20, - MaxIdleConns: 15, - ConnMaxLifetime: 3 * time.Minute, - Compression: &clickhouse.Compression{ - Method: clickhouse.CompressionLZ4, - }, - }) - if err != nil { - log.Fatal(err) - } - - c := &connectorImpl{ - conn: conn, - batches: make(map[string]Bulk, 20), - workerTask: make(chan *task, 1), - done: make(chan struct{}), - finished: make(chan struct{}), - } - go c.worker() - return c -} - -func (c *connectorImpl) newBatch(name, query string) error { - batch, err := NewBulk(c.conn, name, query) - if err != nil { - return fmt.Errorf("can't create new batch: %s", err) - } - c.batches[name] = batch - return nil -} - -var batches = map[string]string{ - // Web - "sessions": "INSERT INTO experimental.sessions (session_id, project_id, user_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, user_state, user_city, datetime, duration, pages_count, events_count, errors_count, issue_score, referrer, issue_types, tracker_version, user_browser, user_browser_version, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10, timezone, utm_source, utm_medium, utm_campaign) VALUES (?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), ?, ?, ?, ?)", - "autocompletes": "INSERT INTO experimental.autocomplete (project_id, type, value) VALUES (?, ?, SUBSTR(?, 1, 8000))", - "pages": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, url, request_start, response_start, response_end, dom_content_loaded_event_start, dom_content_loaded_event_end, load_event_start, load_event_end, first_paint, first_contentful_paint_time, speed_index, visually_complete, time_to_interactive, url_path, event_type) VALUES (?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), ?)", - "clicks": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, label, hesitation_time, event_type, selector, normalized_x, normalized_y, url, url_path) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000))", - "inputs": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, label, event_type, duration, hesitation_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", - "errors": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, source, name, message, error_id, event_type, error_tags_keys, error_tags_values) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", - "performance": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, url, min_fps, avg_fps, max_fps, min_cpu, avg_cpu, max_cpu, min_total_js_heap_size, avg_total_js_heap_size, max_total_js_heap_size, min_used_js_heap_size, avg_used_js_heap_size, max_used_js_heap_size, event_type) VALUES (?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", - "requests": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, url, request_body, response_body, status, method, duration, success, event_type, transfer_size, url_path) VALUES (?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000))", - "custom": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, name, payload, event_type) VALUES (?, ?, ?, ?, ?, ?, ?)", - "graphql": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, name, request_body, response_body, event_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", - "issuesEvents": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, issue_id, issue_type, event_type, url, url_path) VALUES (?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000))", - "issues": "INSERT INTO experimental.issues (project_id, issue_id, type, context_string) VALUES (?, ?, ?, ?)", - //Mobile - "ios_sessions": "INSERT INTO experimental.sessions (session_id, project_id, user_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, user_state, user_city, datetime, duration, pages_count, events_count, errors_count, issue_score, referrer, issue_types, tracker_version, user_browser, user_browser_version, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10, platform, timezone) VALUES (?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), ?, ?)", - "ios_custom": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, name, payload, event_type) VALUES (?, ?, ?, ?, ?, ?, ?)", - "ios_clicks": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, label, event_type) VALUES (?, ?, ?, ?, ?, ?)", - "ios_swipes": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, label, direction, event_type) VALUES (?, ?, ?, ?, ?, ?, ?)", - "ios_inputs": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, label, event_type) VALUES (?, ?, ?, ?, ?, ?)", - "ios_requests": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, url, request_body, response_body, status, method, duration, success, event_type) VALUES (?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?)", - "ios_crashes": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, name, reason, stacktrace, event_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", -} - -func (c *connectorImpl) Prepare() error { - for table, query := range batches { - if err := c.newBatch(table, query); err != nil { - return fmt.Errorf("can't create %s batch: %s", table, err) - } - } - return nil -} - -func (c *connectorImpl) Commit() error { - newTask := NewTask() - for _, b := range c.batches { - newTask.bulks = append(newTask.bulks, b) - } - c.batches = make(map[string]Bulk, 20) - if err := c.Prepare(); err != nil { - log.Printf("can't prepare new CH batch set: %s", err) - } - c.workerTask <- newTask - return nil -} - -func (c *connectorImpl) Stop() error { - c.done <- struct{}{} - <-c.finished - return c.conn.Close() -} - -func (c *connectorImpl) sendBulks(t *task) { - for _, b := range t.bulks { - if err := b.Send(); err != nil { - log.Printf("can't send batch: %s", err) - } - } -} - -func (c *connectorImpl) worker() { - for { - select { - case t := <-c.workerTask: - c.sendBulks(t) - case <-c.done: - for t := range c.workerTask { - c.sendBulks(t) - } - c.finished <- struct{}{} - return - } - } -} - -func (c *connectorImpl) checkError(name string, err error) { - if err != clickhouse.ErrBatchAlreadySent { - log.Printf("can't create %s batch after failed append operation: %s", name, err) - } -} - -func (c *connectorImpl) InsertWebInputDuration(session *sessions.Session, msg *messages.InputChange) error { - if msg.Label == "" { - return nil - } - if err := c.batches["inputs"].Append( - session.SessionID, - uint16(session.ProjectID), - msg.MsgID(), - datetime(msg.Timestamp), - msg.Label, - "INPUT", - nullableUint16(uint16(msg.InputDuration)), - nullableUint32(uint32(msg.HesitationTime)), - ); err != nil { - c.checkError("inputs", err) - return fmt.Errorf("can't append to inputs batch: %s", err) - } - return nil -} - -func (c *connectorImpl) InsertMouseThrashing(session *sessions.Session, msg *messages.MouseThrashing) error { - issueID := hashid.MouseThrashingID(session.ProjectID, session.SessionID, msg.Timestamp) - // Insert issue event to batches - if err := c.batches["issuesEvents"].Append( - session.SessionID, - uint16(session.ProjectID), - msg.MsgID(), - datetime(msg.Timestamp), - issueID, - "mouse_thrashing", - "ISSUE", - msg.Url, - extractUrlPath(msg.Url), - ); err != nil { - c.checkError("issuesEvents", err) - return fmt.Errorf("can't append to issuesEvents batch: %s", err) - } - if err := c.batches["issues"].Append( - uint16(session.ProjectID), - issueID, - "mouse_thrashing", - msg.Url, - ); err != nil { - c.checkError("issues", err) - return fmt.Errorf("can't append to issues batch: %s", err) - } - return nil -} - -func (c *connectorImpl) InsertIssue(session *sessions.Session, msg *messages.IssueEvent) error { - issueID := hashid.IssueID(session.ProjectID, msg) - // Check issue type before insert to avoid panic from clickhouse lib - switch msg.Type { - case "click_rage", "dead_click", "excessive_scrolling", "bad_request", "missing_resource", "memory", "cpu", "slow_resource", "slow_page_load", "crash", "ml_cpu", "ml_memory", "ml_dead_click", "ml_click_rage", "ml_mouse_thrashing", "ml_excessive_scrolling", "ml_slow_resources", "custom", "js_exception", "mouse_thrashing", "app_crash": - default: - return fmt.Errorf("unknown issueType: %s", msg.Type) - } - // Insert issue event to batches - if err := c.batches["issuesEvents"].Append( - session.SessionID, - uint16(session.ProjectID), - msg.MessageID, - datetime(msg.Timestamp), - issueID, - msg.Type, - "ISSUE", - msg.URL, - extractUrlPath(msg.URL), - ); err != nil { - c.checkError("issuesEvents", err) - return fmt.Errorf("can't append to issuesEvents batch: %s", err) - } - if err := c.batches["issues"].Append( - uint16(session.ProjectID), - issueID, - msg.Type, - msg.ContextString, - ); err != nil { - c.checkError("issues", err) - return fmt.Errorf("can't append to issues batch: %s", err) - } - return nil -} - -func (c *connectorImpl) InsertWebSession(session *sessions.Session) error { - if session.Duration == nil { - return errors.New("trying to insert session with nil duration") - } - if err := c.batches["sessions"].Append( - session.SessionID, - uint16(session.ProjectID), - session.UserID, - session.UserUUID, - session.UserOS, - nullableString(session.UserOSVersion), - nullableString(session.UserDevice), - session.UserDeviceType, - session.UserCountry, - session.UserState, - session.UserCity, - datetime(session.Timestamp), - uint32(*session.Duration), - uint16(session.PagesCount), - uint16(session.EventsCount), - uint16(session.ErrorsCount), - uint32(session.IssueScore), - session.Referrer, - session.IssueTypes, - session.TrackerVersion, - session.UserBrowser, - nullableString(session.UserBrowserVersion), - session.Metadata1, - session.Metadata2, - session.Metadata3, - session.Metadata4, - session.Metadata5, - session.Metadata6, - session.Metadata7, - session.Metadata8, - session.Metadata9, - session.Metadata10, - session.Timezone, - session.UtmSource, - session.UtmMedium, - session.UtmCampaign, - ); err != nil { - c.checkError("sessions", err) - return fmt.Errorf("can't append to sessions batch: %s", err) - } - return nil -} - -func extractUrlPath(fullUrl string) string { - _, path, query, err := url.GetURLParts(fullUrl) - if err != nil { - log.Printf("can't parse url: %s", err) - return "" - } - pathQuery := path - if query != "" { - pathQuery += "?" + query - } - return strings.ToLower(pathQuery) -} - -func (c *connectorImpl) InsertWebPageEvent(session *sessions.Session, msg *messages.PageEvent) error { - if err := c.batches["pages"].Append( - session.SessionID, - uint16(session.ProjectID), - msg.MessageID, - datetime(msg.Timestamp), - msg.URL, - nullableUint16(uint16(msg.RequestStart)), - nullableUint16(uint16(msg.ResponseStart)), - nullableUint16(uint16(msg.ResponseEnd)), - nullableUint16(uint16(msg.DomContentLoadedEventStart)), - nullableUint16(uint16(msg.DomContentLoadedEventEnd)), - nullableUint16(uint16(msg.LoadEventStart)), - nullableUint16(uint16(msg.LoadEventEnd)), - nullableUint16(uint16(msg.FirstPaint)), - nullableUint16(uint16(msg.FirstContentfulPaint)), - nullableUint16(uint16(msg.SpeedIndex)), - nullableUint16(uint16(msg.VisuallyComplete)), - nullableUint16(uint16(msg.TimeToInteractive)), - extractUrlPath(msg.URL), - "LOCATION", - ); err != nil { - c.checkError("pages", err) - return fmt.Errorf("can't append to pages batch: %s", err) - } - return nil -} - -func (c *connectorImpl) InsertWebClickEvent(session *sessions.Session, msg *messages.MouseClick) error { - if msg.Label == "" { - return nil - } - var nX *float32 = nil - var nY *float32 = nil - if msg.NormalizedX != 101 && msg.NormalizedY != 101 { - // To support previous versions of tracker - if msg.NormalizedX <= 100 && msg.NormalizedY <= 100 { - msg.NormalizedX *= 100 - msg.NormalizedY *= 100 - } - normalizedX := float32(msg.NormalizedX) / 100.0 - normalizedY := float32(msg.NormalizedY) / 100.0 - nXVal := normalizedX - nX = &nXVal - nYVal := normalizedY - nY = &nYVal - } - if err := c.batches["clicks"].Append( - session.SessionID, - uint16(session.ProjectID), - msg.MsgID(), - datetime(msg.Timestamp), - msg.Label, - nullableUint32(uint32(msg.HesitationTime)), - "CLICK", - msg.Selector, - nX, - nY, - msg.Url, - extractUrlPath(msg.Url), - ); err != nil { - c.checkError("clicks", err) - return fmt.Errorf("can't append to clicks batch: %s", err) - } - return nil -} - -func (c *connectorImpl) InsertWebErrorEvent(session *sessions.Session, msg *types.ErrorEvent) error { - keys, values := make([]string, 0, len(msg.Tags)), make([]*string, 0, len(msg.Tags)) - for k, v := range msg.Tags { - keys = append(keys, k) - values = append(values, v) - } - // Check error source before insert to avoid panic from clickhouse lib - switch msg.Source { - case "js_exception", "bugsnag", "cloudwatch", "datadog", "elasticsearch", "newrelic", "rollbar", "sentry", "stackdriver", "sumologic": - default: - return fmt.Errorf("unknown error source: %s", msg.Source) - } - msgID, _ := msg.ID(session.ProjectID) - // Insert event to batch - if err := c.batches["errors"].Append( - session.SessionID, - uint16(session.ProjectID), - msg.MessageID, - datetime(msg.Timestamp), - msg.Source, - nullableString(msg.Name), - msg.Message, - msgID, - "ERROR", - keys, - values, - ); err != nil { - c.checkError("errors", err) - return fmt.Errorf("can't append to errors batch: %s", err) - } - return nil -} - -func (c *connectorImpl) InsertWebPerformanceTrackAggr(session *sessions.Session, msg *messages.PerformanceTrackAggr) error { - var timestamp uint64 = (msg.TimestampStart + msg.TimestampEnd) / 2 - if err := c.batches["performance"].Append( - session.SessionID, - uint16(session.ProjectID), - uint64(0), // TODO: find messageID for performance events - datetime(timestamp), - nullableString(msg.Meta().Url), - uint8(msg.MinFPS), - uint8(msg.AvgFPS), - uint8(msg.MaxFPS), - uint8(msg.MinCPU), - uint8(msg.AvgCPU), - uint8(msg.MaxCPU), - msg.MinTotalJSHeapSize, - msg.AvgTotalJSHeapSize, - msg.MaxTotalJSHeapSize, - msg.MinUsedJSHeapSize, - msg.AvgUsedJSHeapSize, - msg.MaxUsedJSHeapSize, - "PERFORMANCE", - ); err != nil { - c.checkError("performance", err) - return fmt.Errorf("can't append to performance batch: %s", err) - } - return nil -} - -func (c *connectorImpl) InsertAutocomplete(session *sessions.Session, msgType, msgValue string) error { - if len(msgValue) == 0 { - return nil - } - if err := c.batches["autocompletes"].Append( - uint16(session.ProjectID), - msgType, - msgValue, - ); err != nil { - c.checkError("autocompletes", err) - return fmt.Errorf("can't append to autocompletes batch: %s", err) - } - return nil -} - -func (c *connectorImpl) InsertRequest(session *sessions.Session, msg *messages.NetworkRequest, savePayload bool) error { - urlMethod := url.EnsureMethod(msg.Method) - if urlMethod == "" { - return fmt.Errorf("can't parse http method. sess: %d, method: %s", session.SessionID, msg.Method) - } - var request, response *string - if savePayload { - request = &msg.Request - response = &msg.Response - } - if err := c.batches["requests"].Append( - session.SessionID, - uint16(session.ProjectID), - msg.Meta().Index, - datetime(uint64(msg.Meta().Timestamp)), - msg.URL, - request, - response, - uint16(msg.Status), - url.EnsureMethod(msg.Method), - uint16(msg.Duration), - msg.Status < 400, - "REQUEST", - uint32(msg.TransferredBodySize), - extractUrlPath(msg.URL), - ); err != nil { - c.checkError("requests", err) - return fmt.Errorf("can't append to requests batch: %s", err) - } - return nil -} - -func (c *connectorImpl) InsertCustom(session *sessions.Session, msg *messages.CustomEvent) error { - if err := c.batches["custom"].Append( - session.SessionID, - uint16(session.ProjectID), - msg.Meta().Index, - datetime(uint64(msg.Meta().Timestamp)), - msg.Name, - msg.Payload, - "CUSTOM", - ); err != nil { - c.checkError("custom", err) - return fmt.Errorf("can't append to custom batch: %s", err) - } - return nil -} - -func (c *connectorImpl) InsertGraphQL(session *sessions.Session, msg *messages.GraphQL) error { - if err := c.batches["graphql"].Append( - session.SessionID, - uint16(session.ProjectID), - msg.Meta().Index, - datetime(uint64(msg.Meta().Timestamp)), - msg.OperationName, - nullableString(msg.Variables), - nullableString(msg.Response), - "GRAPHQL", - ); err != nil { - c.checkError("graphql", err) - return fmt.Errorf("can't append to graphql batch: %s", err) - } - return nil -} - -// Mobile events - -func (c *connectorImpl) InsertMobileSession(session *sessions.Session) error { - if session.Duration == nil { - return errors.New("trying to insert mobile session with nil duration") - } - if err := c.batches["ios_sessions"].Append( - session.SessionID, - uint16(session.ProjectID), - session.UserID, - session.UserUUID, - session.UserOS, - nullableString(session.UserOSVersion), - nullableString(session.UserDevice), - session.UserDeviceType, - session.UserCountry, - session.UserState, - session.UserCity, - datetime(session.Timestamp), - uint32(*session.Duration), - uint16(session.PagesCount), - uint16(session.EventsCount), - uint16(session.ErrorsCount), - uint32(session.IssueScore), - session.Referrer, - session.IssueTypes, - session.TrackerVersion, - session.UserBrowser, - nullableString(session.UserBrowserVersion), - session.Metadata1, - session.Metadata2, - session.Metadata3, - session.Metadata4, - session.Metadata5, - session.Metadata6, - session.Metadata7, - session.Metadata8, - session.Metadata9, - session.Metadata10, - "ios", - session.Timezone, - ); err != nil { - c.checkError("ios_sessions", err) - return fmt.Errorf("can't append to sessions batch: %s", err) - } - return nil -} - -func (c *connectorImpl) InsertMobileCustom(session *sessions.Session, msg *messages.MobileEvent) error { - if err := c.batches["ios_custom"].Append( - session.SessionID, - uint16(session.ProjectID), - msg.Meta().Index, - datetime(uint64(msg.Meta().Timestamp)), - msg.Name, - msg.Payload, - "CUSTOM", - ); err != nil { - c.checkError("ios_custom", err) - return fmt.Errorf("can't append to mobile custom batch: %s", err) - } - return nil -} - -func (c *connectorImpl) InsertMobileClick(session *sessions.Session, msg *messages.MobileClickEvent) error { - if msg.Label == "" { - return nil - } - if err := c.batches["ios_clicks"].Append( - session.SessionID, - uint16(session.ProjectID), - msg.MsgID(), - datetime(msg.Timestamp), - msg.Label, - "TAP", - ); err != nil { - c.checkError("ios_clicks", err) - return fmt.Errorf("can't append to mobile clicks batch: %s", err) - } - return nil -} - -func (c *connectorImpl) InsertMobileSwipe(session *sessions.Session, msg *messages.MobileSwipeEvent) error { - if msg.Label == "" { - return nil - } - if err := c.batches["ios_swipes"].Append( - session.SessionID, - uint16(session.ProjectID), - msg.MsgID(), - datetime(msg.Timestamp), - msg.Label, - nullableString(msg.Direction), - "SWIPE", - ); err != nil { - c.checkError("ios_clicks", err) - return fmt.Errorf("can't append to mobile clicks batch: %s", err) - } - return nil -} - -func (c *connectorImpl) InsertMobileInput(session *sessions.Session, msg *messages.MobileInputEvent) error { - if msg.Label == "" { - return nil - } - if err := c.batches["ios_inputs"].Append( - session.SessionID, - uint16(session.ProjectID), - msg.MsgID(), - datetime(msg.Timestamp), - msg.Label, - "INPUT", - ); err != nil { - c.checkError("ios_inputs", err) - return fmt.Errorf("can't append to mobile inputs batch: %s", err) - } - return nil -} - -func (c *connectorImpl) InsertMobileRequest(session *sessions.Session, msg *messages.MobileNetworkCall, savePayload bool) error { - urlMethod := url.EnsureMethod(msg.Method) - if urlMethod == "" { - return fmt.Errorf("can't parse http method. sess: %d, method: %s", session.SessionID, msg.Method) - } - var request, response *string - if savePayload { - request = &msg.Request - response = &msg.Response - } - if err := c.batches["ios_requests"].Append( - session.SessionID, - uint16(session.ProjectID), - msg.Meta().Index, - datetime(uint64(msg.Meta().Timestamp)), - msg.URL, - request, - response, - uint16(msg.Status), - url.EnsureMethod(msg.Method), - uint16(msg.Duration), - msg.Status < 400, - "REQUEST", - ); err != nil { - c.checkError("ios_requests", err) - return fmt.Errorf("can't append to mobile requests batch: %s", err) - } - return nil -} - -func (c *connectorImpl) InsertMobileCrash(session *sessions.Session, msg *messages.MobileCrash) error { - if err := c.batches["ios_crashes"].Append( - session.SessionID, - uint16(session.ProjectID), - msg.MsgID(), - datetime(msg.Timestamp), - msg.Name, - msg.Reason, - msg.Stacktrace, - "CRASH", - ); err != nil { - c.checkError("ios_crashes", err) - return fmt.Errorf("can't append to mobile crashges batch: %s", err) - } - return nil -}