ClickHouse support (#2830)

* feat(db): added CH support to db service

* feat(db): removed license check for CH client

* feat(db): removed fts integration

* feat(clickhouse): added config instead of direct env parsing

* feat(clickhouse): removed prev extraHandlers

* feat(clickhouse): an unified approach for data insertion to dbs

* feat(clickhouse): removed unused imports
This commit is contained in:
Alexander 2024-12-10 12:41:52 +01:00 committed by GitHub
parent 122416d311
commit 9b75e4502f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 980 additions and 1012 deletions

View file

@ -6,6 +6,7 @@ import (
config "openreplay/backend/internal/config/db"
"openreplay/backend/internal/db"
"openreplay/backend/internal/db/datasaver"
"openreplay/backend/pkg/db/clickhouse"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/db/redis"
@ -33,9 +34,15 @@ func main() {
}
defer pgConn.Close()
// Init events module
pg := postgres.NewConn(log, pgConn)
defer pg.Close()
chConn := clickhouse.NewConnector(cfg.Clickhouse)
if err := chConn.Prepare(); err != nil {
log.Fatal(ctx, "can't prepare clickhouse: %s", err)
}
defer chConn.Stop()
// Init db proxy module (postgres + clickhouse + batches)
dbProxy := postgres.NewConn(log, pgConn, chConn)
defer dbProxy.Close()
// Init redis connection
redisClient, err := redis.New(&cfg.Redis)
@ -49,7 +56,7 @@ func main() {
tagsManager := tags.New(log, pgConn)
// Init data saver
saver := datasaver.New(log, cfg, pg, sessManager, tagsManager)
saver := datasaver.New(log, cfg, dbProxy, chConn, sessManager, tagsManager)
// Message filter
msgFilter := []int{

View file

@ -57,10 +57,18 @@ type Redshift struct {
// Clickhouse config
type Clickhouse struct {
URL string `env:"CLICKHOUSE_STRING"`
Database string `env:"CLICKHOUSE_DATABASE,default=default"`
UserName string `env:"CLICKHOUSE_USERNAME,default=default"`
Password string `env:"CLICKHOUSE_PASSWORD,default="`
URL string `env:"CLICKHOUSE_STRING"`
Database string `env:"CLICKHOUSE_DATABASE,default=default"`
UserName string `env:"CLICKHOUSE_USERNAME,default=default"`
Password string `env:"CLICKHOUSE_PASSWORD,default="`
LegacyUserName string `env:"CH_USERNAME,default=default"`
LegacyPassword string `env:"CH_PASSWORD,default="`
}
func (cfg *Clickhouse) GetTrimmedURL() string {
chUrl := strings.TrimPrefix(cfg.URL, "tcp://")
chUrl = strings.TrimSuffix(chUrl, "/default")
return chUrl
}
// ElasticSearch config

View file

@ -11,6 +11,7 @@ import (
type Config struct {
common.Config
common.Postgres
common.Clickhouse
redis.Redis
ProjectExpiration time.Duration `env:"PROJECT_EXPIRATION,default=10m"`
LoggerTimeout int `env:"LOG_QUEUE_STATS_INTERVAL_SEC,required"`

View file

@ -0,0 +1,9 @@
package datasaver
import (
"openreplay/backend/pkg/messages"
)
func (s *saverImpl) init() {}
func (s *saverImpl) sendToFTS(msg messages.Message, projID uint32) {}

View file

@ -1,17 +0,0 @@
package datasaver
import (
. "openreplay/backend/pkg/messages"
)
func (s *saverImpl) init() {
// noop
}
func (s *saverImpl) handleExtraMessage(msg Message) error {
switch m := msg.(type) {
case *PerformanceTrackAggr:
return s.pg.InsertWebStatsPerformance(m)
}
return nil
}

View file

@ -0,0 +1,72 @@
package datasaver
import (
"context"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/sessions"
)
func (s *saverImpl) handleMobileMessage(sessCtx context.Context, session *sessions.Session, msg messages.Message) error {
switch m := msg.(type) {
case *messages.MobileSessionEnd:
return s.ch.InsertMobileSession(session)
case *messages.MobileUserID:
if err := s.sessions.UpdateUserID(session.SessionID, m.ID); err != nil {
return err
}
s.pg.InsertAutocompleteValue(session.SessionID, session.ProjectID, "USERIDMOBILE", m.ID)
return nil
case *messages.MobileUserAnonymousID:
if err := s.sessions.UpdateAnonymousID(session.SessionID, m.ID); err != nil {
return err
}
s.pg.InsertAutocompleteValue(session.SessionID, session.ProjectID, "USERANONYMOUSIDMOBILE", m.ID)
return nil
case *messages.MobileMetadata:
return s.sessions.UpdateMetadata(m.SessionID(), m.Key, m.Value)
case *messages.MobileEvent:
if err := s.pg.InsertMobileEvent(session, m); err != nil {
return err
}
return s.ch.InsertMobileCustom(session, m)
case *messages.MobileClickEvent:
if err := s.pg.InsertMobileClickEvent(session, m); err != nil {
return err
}
if err := s.sessions.UpdateEventsStats(session.SessionID, 1, 0); err != nil {
return err
}
return s.ch.InsertMobileClick(session, m)
case *messages.MobileSwipeEvent:
if err := s.pg.InsertMobileSwipeEvent(session, m); err != nil {
return err
}
if err := s.sessions.UpdateEventsStats(session.SessionID, 1, 0); err != nil {
return err
}
return s.ch.InsertMobileSwipe(session, m)
case *messages.MobileInputEvent:
if err := s.pg.InsertMobileInputEvent(session, m); err != nil {
return err
}
if err := s.sessions.UpdateEventsStats(session.SessionID, 1, 0); err != nil {
return err
}
return s.ch.InsertMobileInput(session, m)
case *messages.MobileNetworkCall:
if err := s.pg.InsertMobileNetworkCall(session, m); err != nil {
return err
}
return s.ch.InsertMobileRequest(session, m, session.SaveRequestPayload)
case *messages.MobileCrash:
if err := s.pg.InsertMobileCrash(session.SessionID, session.ProjectID, m); err != nil {
return err
}
if err := s.sessions.UpdateIssuesStats(session.SessionID, 1, 1000); err != nil {
return err
}
return s.ch.InsertMobileCrash(session, m)
}
return nil
}

View file

@ -30,11 +30,18 @@ type saverImpl struct {
tags tags.Tags
}
func New(log logger.Logger, cfg *db.Config, pg *postgres.Conn, session sessions.Sessions, tags tags.Tags) Saver {
func New(log logger.Logger, cfg *db.Config, pg *postgres.Conn, ch clickhouse.Connector, session sessions.Sessions, tags tags.Tags) Saver {
switch {
case pg == nil:
log.Fatal(context.Background(), "pg pool is empty")
case ch == nil:
log.Fatal(context.Background(), "ch pool is empty")
}
s := &saverImpl{
log: log,
cfg: cfg,
pg: pg,
ch: ch,
sessions: session,
tags: tags,
}
@ -43,21 +50,34 @@ func New(log logger.Logger, cfg *db.Config, pg *postgres.Conn, session sessions.
}
func (s *saverImpl) Handle(msg Message) {
sessCtx := context.WithValue(context.Background(), "sessionID", msg.SessionID())
if msg.TypeID() == MsgCustomEvent {
defer s.Handle(types.WrapCustomEvent(msg.(*CustomEvent)))
}
var (
sessCtx = context.WithValue(context.Background(), "sessionID", msg.SessionID())
session *sessions.Session
err error
)
if msg.TypeID() == MsgSessionEnd || msg.TypeID() == MsgMobileSessionEnd {
session, err = s.sessions.GetUpdated(msg.SessionID(), true)
} else {
session, err = s.sessions.Get(msg.SessionID())
}
if err != nil || session == nil {
s.log.Error(sessCtx, "error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, msg.SessionID(), msg)
return
}
if IsMobileType(msg.TypeID()) {
// Handle Mobile messages
if err := s.handleMobileMessage(msg); err != nil {
if err := s.handleMobileMessage(sessCtx, session, msg); err != nil {
if !postgres.IsPkeyViolation(err) {
s.log.Error(sessCtx, "mobile message insertion error, msg: %+v, err: %s", msg, err)
}
return
}
} else {
// Handle Web messages
if err := s.handleMessage(msg); err != nil {
if err := s.handleWebMessage(sessCtx, session, msg); err != nil {
if !postgres.IsPkeyViolation(err) {
s.log.Error(sessCtx, "web message insertion error, msg: %+v, err: %s", msg, err)
}
@ -65,180 +85,22 @@ func (s *saverImpl) Handle(msg Message) {
}
}
if err := s.handleExtraMessage(msg); err != nil {
s.log.Error(sessCtx, "extra message insertion error, msg: %+v, err: %s", msg, err)
}
s.sendToFTS(msg, session.ProjectID)
return
}
func (s *saverImpl) handleMobileMessage(msg Message) error {
session, err := s.sessions.Get(msg.SessionID())
if err != nil {
return err
}
switch m := msg.(type) {
case *MobileUserID:
if err = s.sessions.UpdateUserID(session.SessionID, m.ID); err != nil {
return err
}
s.pg.InsertAutocompleteValue(session.SessionID, session.ProjectID, "USERIDMOBILE", m.ID)
return nil
case *MobileUserAnonymousID:
if err = s.sessions.UpdateAnonymousID(session.SessionID, m.ID); err != nil {
return err
}
s.pg.InsertAutocompleteValue(session.SessionID, session.ProjectID, "USERANONYMOUSIDMOBILE", m.ID)
return nil
case *MobileMetadata:
return s.sessions.UpdateMetadata(m.SessionID(), m.Key, m.Value)
case *MobileEvent:
return s.pg.InsertMobileEvent(session, m)
case *MobileClickEvent:
if err := s.pg.InsertMobileClickEvent(session, m); err != nil {
return err
}
return s.sessions.UpdateEventsStats(session.SessionID, 1, 0)
case *MobileSwipeEvent:
if err := s.pg.InsertMobileSwipeEvent(session, m); err != nil {
return err
}
return s.sessions.UpdateEventsStats(session.SessionID, 1, 0)
case *MobileInputEvent:
if err := s.pg.InsertMobileInputEvent(session, m); err != nil {
return err
}
return s.sessions.UpdateEventsStats(session.SessionID, 1, 0)
case *MobileNetworkCall:
return s.pg.InsertMobileNetworkCall(session, m)
case *MobileCrash:
if err := s.pg.InsertMobileCrash(session.SessionID, session.ProjectID, m); err != nil {
return err
}
return s.sessions.UpdateIssuesStats(session.SessionID, 1, 1000)
}
return nil
}
func (s *saverImpl) handleMessage(msg Message) error {
session, err := s.sessions.Get(msg.SessionID())
if err != nil {
return err
}
sessCtx := context.WithValue(context.Background(), "sessionID", msg.SessionID())
switch m := msg.(type) {
case *SessionStart:
return s.pg.HandleStartEvent(m)
case *SessionEnd:
return s.pg.HandleEndEvent(m.SessionID())
case *Metadata:
return s.sessions.UpdateMetadata(m.SessionID(), m.Key, m.Value)
case *IssueEvent:
if m.Type == "dead_click" || m.Type == "click_rage" {
if s.tags.ShouldIgnoreTag(session.ProjectID, m.Context) {
return nil
}
}
err = s.pg.InsertIssueEvent(session, m)
if err != nil {
return err
}
return s.sessions.UpdateIssuesStats(session.SessionID, 0, postgres.GetIssueScore(m.Type))
case *CustomIssue:
ie := &IssueEvent{
Type: "custom",
Timestamp: m.Timestamp,
MessageID: m.Index,
ContextString: m.Name,
Payload: m.Payload,
}
ie.SetMeta(m.Meta())
if err = s.pg.InsertIssueEvent(session, ie); err != nil {
return err
}
return s.sessions.UpdateIssuesStats(session.SessionID, 0, postgres.GetIssueScore(ie.Type))
case *UserID:
if err = s.sessions.UpdateUserID(session.SessionID, m.ID); err != nil {
return err
}
s.pg.InsertAutocompleteValue(session.SessionID, session.ProjectID, "USERID", m.ID)
return nil
case *UserAnonymousID:
if err = s.sessions.UpdateAnonymousID(session.SessionID, m.ID); err != nil {
return err
}
s.pg.InsertAutocompleteValue(session.SessionID, session.ProjectID, "USERANONYMOUSID", m.ID)
return nil
case *CustomEvent:
return s.pg.InsertWebCustomEvent(session, m)
case *MouseClick:
if err = s.pg.InsertWebClickEvent(session, m); err != nil {
return err
}
return s.sessions.UpdateEventsStats(session.SessionID, 1, 0)
case *PageEvent:
if err = s.pg.InsertWebPageEvent(session, m); err != nil {
return err
}
s.sessions.UpdateReferrer(session.SessionID, m.Referrer)
s.sessions.UpdateUTM(session.SessionID, m.URL)
return s.sessions.UpdateEventsStats(session.SessionID, 1, 1)
case *NetworkRequest:
return s.pg.InsertWebNetworkRequest(session, m)
case *GraphQL:
return s.pg.InsertWebGraphQL(session, m)
case *JSException:
wrapper, err := types.WrapJSException(m)
if err != nil {
s.log.Warn(sessCtx, "error on wrapping JSException: %v", err)
}
if err = s.pg.InsertWebErrorEvent(session, wrapper); err != nil {
return err
}
return s.sessions.UpdateIssuesStats(session.SessionID, 1, 1000)
case *IntegrationEvent:
return s.pg.InsertWebErrorEvent(session, types.WrapIntegrationEvent(m))
case *InputChange:
if err = s.pg.InsertInputChangeEvent(session, m); err != nil {
return err
}
return s.sessions.UpdateEventsStats(session.SessionID, 1, 0)
case *MouseThrashing:
if err = s.pg.InsertMouseThrashing(session, m); err != nil {
return err
}
return s.sessions.UpdateIssuesStats(session.SessionID, 0, 50)
case *CanvasNode:
if err = s.pg.InsertCanvasNode(session, m); err != nil {
return err
}
case *TagTrigger:
if err = s.pg.InsertTagTrigger(session, m); err != nil {
return err
}
}
return nil
}
func (s *saverImpl) Commit() error {
if s.pg != nil {
s.pg.Commit()
}
if s.ch != nil {
s.ch.Commit()
}
s.pg.Commit()
s.ch.Commit()
return nil
}
func (s *saverImpl) Close() error {
if s.pg != nil {
if err := s.pg.Close(); err != nil {
s.log.Error(context.Background(), "pg.Close error: %s", err)
}
if err := s.pg.Close(); err != nil {
s.log.Error(context.Background(), "pg.Close error: %s", err)
}
if s.ch != nil {
if err := s.ch.Stop(); err != nil {
s.log.Error(context.Background(), "ch.Close error: %s", err)
}
if err := s.ch.Stop(); err != nil {
s.log.Error(context.Background(), "ch.Close error: %s", err)
}
return nil
}

View file

@ -0,0 +1,146 @@
package datasaver
import (
"context"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/sessions"
)
func (s *saverImpl) handleWebMessage(sessCtx context.Context, session *sessions.Session, msg messages.Message) error {
switch m := msg.(type) {
case *messages.SessionStart:
return s.pg.HandleStartEvent(m)
case *messages.SessionEnd:
if err := s.pg.HandleEndEvent(m.SessionID()); err != nil {
return err
}
session, err := s.sessions.GetUpdated(m.SessionID(), true)
if err != nil {
return err
}
return s.ch.InsertWebSession(session)
case *messages.Metadata:
return s.sessions.UpdateMetadata(m.SessionID(), m.Key, m.Value)
case *messages.IssueEvent:
if m.Type == "dead_click" || m.Type == "click_rage" {
if s.tags.ShouldIgnoreTag(session.ProjectID, m.Context) {
return nil
}
}
if err := s.pg.InsertIssueEvent(session, m); err != nil {
return err
}
if err := s.sessions.UpdateIssuesStats(session.SessionID, 0, postgres.GetIssueScore(m.Type)); err != nil {
return err
}
return s.ch.InsertIssue(session, m)
case *messages.CustomIssue:
ie := &messages.IssueEvent{
Type: "custom",
Timestamp: m.Timestamp,
MessageID: m.Index,
ContextString: m.Name,
Payload: m.Payload,
}
ie.SetMeta(m.Meta())
if err := s.pg.InsertIssueEvent(session, ie); err != nil {
return err
}
return s.sessions.UpdateIssuesStats(session.SessionID, 0, postgres.GetIssueScore(ie.Type))
case *messages.UserID:
if err := s.sessions.UpdateUserID(session.SessionID, m.ID); err != nil {
return err
}
s.pg.InsertAutocompleteValue(session.SessionID, session.ProjectID, "USERID", m.ID)
return nil
case *messages.UserAnonymousID:
if err := s.sessions.UpdateAnonymousID(session.SessionID, m.ID); err != nil {
return err
}
s.pg.InsertAutocompleteValue(session.SessionID, session.ProjectID, "USERANONYMOUSID", m.ID)
return nil
case *messages.CustomEvent:
if err := s.pg.InsertWebCustomEvent(session, m); err != nil {
return err
}
return s.ch.InsertCustom(session, m)
case *messages.MouseClick:
if err := s.pg.InsertWebClickEvent(session, m); err != nil {
return err
}
if err := s.sessions.UpdateEventsStats(session.SessionID, 1, 0); err != nil {
return err
}
return s.ch.InsertWebClickEvent(session, m)
case *messages.PageEvent:
if err := s.pg.InsertWebPageEvent(session, m); err != nil {
return err
}
s.sessions.UpdateReferrer(session.SessionID, m.Referrer)
s.sessions.UpdateUTM(session.SessionID, m.URL)
if err := s.sessions.UpdateEventsStats(session.SessionID, 1, 1); err != nil {
return err
}
return s.ch.InsertWebPageEvent(session, m)
case *messages.NetworkRequest:
if err := s.pg.InsertWebNetworkRequest(session, m); err != nil {
return err
}
return s.ch.InsertRequest(session, m, session.SaveRequestPayload)
case *messages.GraphQL:
if err := s.pg.InsertWebGraphQL(session, m); err != nil {
return err
}
return s.ch.InsertGraphQL(session, m)
case *messages.JSException:
wrapper, err := types.WrapJSException(m)
if err != nil {
s.log.Warn(sessCtx, "error on wrapping JSException: %v", err)
}
if err = s.pg.InsertWebErrorEvent(session, wrapper); err != nil {
return err
}
if err := s.sessions.UpdateIssuesStats(session.SessionID, 1, 1000); err != nil {
return err
}
return s.ch.InsertWebErrorEvent(session, wrapper)
case *messages.IntegrationEvent:
if err := s.pg.InsertWebErrorEvent(session, types.WrapIntegrationEvent(m)); err != nil {
return err
}
return s.ch.InsertWebErrorEvent(session, types.WrapIntegrationEvent(m))
case *messages.InputChange:
if err := s.pg.InsertInputChangeEvent(session, m); err != nil {
return err
}
if err := s.sessions.UpdateEventsStats(session.SessionID, 1, 0); err != nil {
return err
}
return s.ch.InsertWebInputDuration(session, m)
case *messages.MouseThrashing:
if err := s.pg.InsertMouseThrashing(session, m); err != nil {
return err
}
if err := s.sessions.UpdateIssuesStats(session.SessionID, 0, 50); err != nil {
return err
}
return s.ch.InsertMouseThrashing(session, m)
case *messages.CanvasNode:
if err := s.pg.InsertCanvasNode(session, m); err != nil {
return err
}
case *messages.TagTrigger:
if err := s.pg.InsertTagTrigger(session, m); err != nil {
return err
}
case *messages.PerformanceTrackAggr:
if err := s.pg.InsertWebStatsPerformance(m); err != nil {
return err
}
return s.ch.InsertWebPerformanceTrackAggr(session, m)
}
return nil
}

View file

@ -5,10 +5,11 @@ import (
"errors"
"fmt"
"log"
"openreplay/backend/pkg/metrics/database"
"time"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"openreplay/backend/pkg/metrics/database"
)
type Bulk interface {

View file

@ -1,19 +1,31 @@
package clickhouse
import (
"errors"
"fmt"
"log"
"strings"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"openreplay/backend/internal/config/common"
"openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/hashid"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/sessions"
"openreplay/backend/pkg/url"
)
type Connector interface {
Prepare() error
Commit() error
Stop() error
// Web
InsertWebSession(session *sessions.Session) error
InsertWebPageEvent(session *sessions.Session, msg *messages.PageEvent) error
InsertWebClickEvent(session *sessions.Session, msg *messages.MouseClick) error
InsertWebInputEvent(session *sessions.Session, msg *messages.InputEvent) error
InsertWebErrorEvent(session *sessions.Session, msg *types.ErrorEvent) error
InsertWebPerformanceTrackAggr(session *sessions.Session, msg *messages.PerformanceTrackAggr) error
InsertAutocomplete(session *sessions.Session, msgType, msgValue string) error
@ -21,4 +33,669 @@ type Connector interface {
InsertCustom(session *sessions.Session, msg *messages.CustomEvent) error
InsertGraphQL(session *sessions.Session, msg *messages.GraphQL) error
InsertIssue(session *sessions.Session, msg *messages.IssueEvent) error
InsertWebInputDuration(session *sessions.Session, msg *messages.InputChange) error
InsertMouseThrashing(session *sessions.Session, msg *messages.MouseThrashing) error
// Mobile
InsertMobileSession(session *sessions.Session) error
InsertMobileCustom(session *sessions.Session, msg *messages.MobileEvent) error
InsertMobileClick(session *sessions.Session, msg *messages.MobileClickEvent) error
InsertMobileSwipe(session *sessions.Session, msg *messages.MobileSwipeEvent) error
InsertMobileInput(session *sessions.Session, msg *messages.MobileInputEvent) error
InsertMobileRequest(session *sessions.Session, msg *messages.MobileNetworkCall, savePayload bool) error
InsertMobileCrash(session *sessions.Session, msg *messages.MobileCrash) error
}
type task struct {
bulks []Bulk
}
func NewTask() *task {
return &task{bulks: make([]Bulk, 0, 21)}
}
type connectorImpl struct {
conn driver.Conn
batches map[string]Bulk //driver.Batch
workerTask chan *task
done chan struct{}
finished chan struct{}
}
func NewConnector(cfg common.Clickhouse) Connector {
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{cfg.GetTrimmedURL()},
Auth: clickhouse.Auth{
Database: cfg.Database,
Username: cfg.LegacyUserName,
Password: cfg.LegacyPassword,
},
MaxOpenConns: 20,
MaxIdleConns: 15,
ConnMaxLifetime: 3 * time.Minute,
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
},
})
if err != nil {
log.Fatal(err)
}
c := &connectorImpl{
conn: conn,
batches: make(map[string]Bulk, 20),
workerTask: make(chan *task, 1),
done: make(chan struct{}),
finished: make(chan struct{}),
}
go c.worker()
return c
}
func (c *connectorImpl) newBatch(name, query string) error {
batch, err := NewBulk(c.conn, name, query)
if err != nil {
return fmt.Errorf("can't create new batch: %s", err)
}
c.batches[name] = batch
return nil
}
var batches = map[string]string{
// Web
"sessions": "INSERT INTO experimental.sessions (session_id, project_id, user_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, user_state, user_city, datetime, duration, pages_count, events_count, errors_count, issue_score, referrer, issue_types, tracker_version, user_browser, user_browser_version, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10, timezone, utm_source, utm_medium, utm_campaign) VALUES (?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), ?, ?, ?, ?)",
"autocompletes": "INSERT INTO experimental.autocomplete (project_id, type, value) VALUES (?, ?, SUBSTR(?, 1, 8000))",
"pages": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, url, request_start, response_start, response_end, dom_content_loaded_event_start, dom_content_loaded_event_end, load_event_start, load_event_end, first_paint, first_contentful_paint_time, speed_index, visually_complete, time_to_interactive, url_path, event_type) VALUES (?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), ?)",
"clicks": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, label, hesitation_time, event_type, selector, normalized_x, normalized_y, url, url_path) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000))",
"inputs": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, label, event_type, duration, hesitation_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
"errors": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, source, name, message, error_id, event_type, error_tags_keys, error_tags_values) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"performance": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, url, min_fps, avg_fps, max_fps, min_cpu, avg_cpu, max_cpu, min_total_js_heap_size, avg_total_js_heap_size, max_total_js_heap_size, min_used_js_heap_size, avg_used_js_heap_size, max_used_js_heap_size, event_type) VALUES (?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"requests": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, url, request_body, response_body, status, method, duration, success, event_type, transfer_size, url_path) VALUES (?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000))",
"custom": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, name, payload, event_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
"graphql": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, name, request_body, response_body, event_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
"issuesEvents": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, issue_id, issue_type, event_type, url, url_path) VALUES (?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000))",
"issues": "INSERT INTO experimental.issues (project_id, issue_id, type, context_string) VALUES (?, ?, ?, ?)",
//Mobile
"ios_sessions": "INSERT INTO experimental.sessions (session_id, project_id, user_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, user_state, user_city, datetime, duration, pages_count, events_count, errors_count, issue_score, referrer, issue_types, tracker_version, user_browser, user_browser_version, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10, platform, timezone) VALUES (?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), ?, ?)",
"ios_custom": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, name, payload, event_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
"ios_clicks": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, label, event_type) VALUES (?, ?, ?, ?, ?, ?)",
"ios_swipes": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, label, direction, event_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
"ios_inputs": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, label, event_type) VALUES (?, ?, ?, ?, ?, ?)",
"ios_requests": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, url, request_body, response_body, status, method, duration, success, event_type) VALUES (?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?)",
"ios_crashes": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, name, reason, stacktrace, event_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
}
func (c *connectorImpl) Prepare() error {
for table, query := range batches {
if err := c.newBatch(table, query); err != nil {
return fmt.Errorf("can't create %s batch: %s", table, err)
}
}
return nil
}
func (c *connectorImpl) Commit() error {
newTask := NewTask()
for _, b := range c.batches {
newTask.bulks = append(newTask.bulks, b)
}
c.batches = make(map[string]Bulk, 20)
if err := c.Prepare(); err != nil {
log.Printf("can't prepare new CH batch set: %s", err)
}
c.workerTask <- newTask
return nil
}
func (c *connectorImpl) Stop() error {
c.done <- struct{}{}
<-c.finished
return c.conn.Close()
}
func (c *connectorImpl) sendBulks(t *task) {
for _, b := range t.bulks {
if err := b.Send(); err != nil {
log.Printf("can't send batch: %s", err)
}
}
}
func (c *connectorImpl) worker() {
for {
select {
case t := <-c.workerTask:
c.sendBulks(t)
case <-c.done:
for t := range c.workerTask {
c.sendBulks(t)
}
c.finished <- struct{}{}
return
}
}
}
func (c *connectorImpl) checkError(name string, err error) {
if err != clickhouse.ErrBatchAlreadySent {
log.Printf("can't create %s batch after failed append operation: %s", name, err)
}
}
func (c *connectorImpl) InsertWebInputDuration(session *sessions.Session, msg *messages.InputChange) error {
if msg.Label == "" {
return nil
}
if err := c.batches["inputs"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MsgID(),
datetime(msg.Timestamp),
msg.Label,
"INPUT",
nullableUint16(uint16(msg.InputDuration)),
nullableUint32(uint32(msg.HesitationTime)),
); err != nil {
c.checkError("inputs", err)
return fmt.Errorf("can't append to inputs batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertMouseThrashing(session *sessions.Session, msg *messages.MouseThrashing) error {
issueID := hashid.MouseThrashingID(session.ProjectID, session.SessionID, msg.Timestamp)
// Insert issue event to batches
if err := c.batches["issuesEvents"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MsgID(),
datetime(msg.Timestamp),
issueID,
"mouse_thrashing",
"ISSUE",
msg.Url,
extractUrlPath(msg.Url),
); err != nil {
c.checkError("issuesEvents", err)
return fmt.Errorf("can't append to issuesEvents batch: %s", err)
}
if err := c.batches["issues"].Append(
uint16(session.ProjectID),
issueID,
"mouse_thrashing",
msg.Url,
); err != nil {
c.checkError("issues", err)
return fmt.Errorf("can't append to issues batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertIssue(session *sessions.Session, msg *messages.IssueEvent) error {
issueID := hashid.IssueID(session.ProjectID, msg)
// Check issue type before insert to avoid panic from clickhouse lib
switch msg.Type {
case "click_rage", "dead_click", "excessive_scrolling", "bad_request", "missing_resource", "memory", "cpu", "slow_resource", "slow_page_load", "crash", "ml_cpu", "ml_memory", "ml_dead_click", "ml_click_rage", "ml_mouse_thrashing", "ml_excessive_scrolling", "ml_slow_resources", "custom", "js_exception", "mouse_thrashing", "app_crash":
default:
return fmt.Errorf("unknown issueType: %s", msg.Type)
}
// Insert issue event to batches
if err := c.batches["issuesEvents"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MessageID,
datetime(msg.Timestamp),
issueID,
msg.Type,
"ISSUE",
msg.URL,
extractUrlPath(msg.URL),
); err != nil {
c.checkError("issuesEvents", err)
return fmt.Errorf("can't append to issuesEvents batch: %s", err)
}
if err := c.batches["issues"].Append(
uint16(session.ProjectID),
issueID,
msg.Type,
msg.ContextString,
); err != nil {
c.checkError("issues", err)
return fmt.Errorf("can't append to issues batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertWebSession(session *sessions.Session) error {
if session.Duration == nil {
return errors.New("trying to insert session with nil duration")
}
if err := c.batches["sessions"].Append(
session.SessionID,
uint16(session.ProjectID),
session.UserID,
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
session.UserState,
session.UserCity,
datetime(session.Timestamp),
uint32(*session.Duration),
uint16(session.PagesCount),
uint16(session.EventsCount),
uint16(session.ErrorsCount),
uint32(session.IssueScore),
session.Referrer,
session.IssueTypes,
session.TrackerVersion,
session.UserBrowser,
nullableString(session.UserBrowserVersion),
session.Metadata1,
session.Metadata2,
session.Metadata3,
session.Metadata4,
session.Metadata5,
session.Metadata6,
session.Metadata7,
session.Metadata8,
session.Metadata9,
session.Metadata10,
session.Timezone,
session.UtmSource,
session.UtmMedium,
session.UtmCampaign,
); err != nil {
c.checkError("sessions", err)
return fmt.Errorf("can't append to sessions batch: %s", err)
}
return nil
}
func extractUrlPath(fullUrl string) string {
_, path, query, err := url.GetURLParts(fullUrl)
if err != nil {
log.Printf("can't parse url: %s", err)
return ""
}
pathQuery := path
if query != "" {
pathQuery += "?" + query
}
return strings.ToLower(pathQuery)
}
func (c *connectorImpl) InsertWebPageEvent(session *sessions.Session, msg *messages.PageEvent) error {
if err := c.batches["pages"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MessageID,
datetime(msg.Timestamp),
msg.URL,
nullableUint16(uint16(msg.RequestStart)),
nullableUint16(uint16(msg.ResponseStart)),
nullableUint16(uint16(msg.ResponseEnd)),
nullableUint16(uint16(msg.DomContentLoadedEventStart)),
nullableUint16(uint16(msg.DomContentLoadedEventEnd)),
nullableUint16(uint16(msg.LoadEventStart)),
nullableUint16(uint16(msg.LoadEventEnd)),
nullableUint16(uint16(msg.FirstPaint)),
nullableUint16(uint16(msg.FirstContentfulPaint)),
nullableUint16(uint16(msg.SpeedIndex)),
nullableUint16(uint16(msg.VisuallyComplete)),
nullableUint16(uint16(msg.TimeToInteractive)),
extractUrlPath(msg.URL),
"LOCATION",
); err != nil {
c.checkError("pages", err)
return fmt.Errorf("can't append to pages batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertWebClickEvent(session *sessions.Session, msg *messages.MouseClick) error {
if msg.Label == "" {
return nil
}
var nX *float32 = nil
var nY *float32 = nil
if msg.NormalizedX != 101 && msg.NormalizedY != 101 {
// To support previous versions of tracker
if msg.NormalizedX <= 100 && msg.NormalizedY <= 100 {
msg.NormalizedX *= 100
msg.NormalizedY *= 100
}
normalizedX := float32(msg.NormalizedX) / 100.0
normalizedY := float32(msg.NormalizedY) / 100.0
nXVal := normalizedX
nX = &nXVal
nYVal := normalizedY
nY = &nYVal
}
if err := c.batches["clicks"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MsgID(),
datetime(msg.Timestamp),
msg.Label,
nullableUint32(uint32(msg.HesitationTime)),
"CLICK",
msg.Selector,
nX,
nY,
msg.Url,
extractUrlPath(msg.Url),
); err != nil {
c.checkError("clicks", err)
return fmt.Errorf("can't append to clicks batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertWebErrorEvent(session *sessions.Session, msg *types.ErrorEvent) error {
keys, values := make([]string, 0, len(msg.Tags)), make([]*string, 0, len(msg.Tags))
for k, v := range msg.Tags {
keys = append(keys, k)
values = append(values, v)
}
// Check error source before insert to avoid panic from clickhouse lib
switch msg.Source {
case "js_exception", "bugsnag", "cloudwatch", "datadog", "elasticsearch", "newrelic", "rollbar", "sentry", "stackdriver", "sumologic":
default:
return fmt.Errorf("unknown error source: %s", msg.Source)
}
msgID, _ := msg.ID(session.ProjectID)
// Insert event to batch
if err := c.batches["errors"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MessageID,
datetime(msg.Timestamp),
msg.Source,
nullableString(msg.Name),
msg.Message,
msgID,
"ERROR",
keys,
values,
); err != nil {
c.checkError("errors", err)
return fmt.Errorf("can't append to errors batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertWebPerformanceTrackAggr(session *sessions.Session, msg *messages.PerformanceTrackAggr) error {
var timestamp uint64 = (msg.TimestampStart + msg.TimestampEnd) / 2
if err := c.batches["performance"].Append(
session.SessionID,
uint16(session.ProjectID),
uint64(0), // TODO: find messageID for performance events
datetime(timestamp),
nullableString(msg.Meta().Url),
uint8(msg.MinFPS),
uint8(msg.AvgFPS),
uint8(msg.MaxFPS),
uint8(msg.MinCPU),
uint8(msg.AvgCPU),
uint8(msg.MaxCPU),
msg.MinTotalJSHeapSize,
msg.AvgTotalJSHeapSize,
msg.MaxTotalJSHeapSize,
msg.MinUsedJSHeapSize,
msg.AvgUsedJSHeapSize,
msg.MaxUsedJSHeapSize,
"PERFORMANCE",
); err != nil {
c.checkError("performance", err)
return fmt.Errorf("can't append to performance batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertAutocomplete(session *sessions.Session, msgType, msgValue string) error {
if len(msgValue) == 0 {
return nil
}
if err := c.batches["autocompletes"].Append(
uint16(session.ProjectID),
msgType,
msgValue,
); err != nil {
c.checkError("autocompletes", err)
return fmt.Errorf("can't append to autocompletes batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertRequest(session *sessions.Session, msg *messages.NetworkRequest, savePayload bool) error {
urlMethod := url.EnsureMethod(msg.Method)
if urlMethod == "" {
return fmt.Errorf("can't parse http method. sess: %d, method: %s", session.SessionID, msg.Method)
}
var request, response *string
if savePayload {
request = &msg.Request
response = &msg.Response
}
if err := c.batches["requests"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.Meta().Index,
datetime(uint64(msg.Meta().Timestamp)),
msg.URL,
request,
response,
uint16(msg.Status),
url.EnsureMethod(msg.Method),
uint16(msg.Duration),
msg.Status < 400,
"REQUEST",
uint32(msg.TransferredBodySize),
extractUrlPath(msg.URL),
); err != nil {
c.checkError("requests", err)
return fmt.Errorf("can't append to requests batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertCustom(session *sessions.Session, msg *messages.CustomEvent) error {
if err := c.batches["custom"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.Meta().Index,
datetime(uint64(msg.Meta().Timestamp)),
msg.Name,
msg.Payload,
"CUSTOM",
); err != nil {
c.checkError("custom", err)
return fmt.Errorf("can't append to custom batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertGraphQL(session *sessions.Session, msg *messages.GraphQL) error {
if err := c.batches["graphql"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.Meta().Index,
datetime(uint64(msg.Meta().Timestamp)),
msg.OperationName,
nullableString(msg.Variables),
nullableString(msg.Response),
"GRAPHQL",
); err != nil {
c.checkError("graphql", err)
return fmt.Errorf("can't append to graphql batch: %s", err)
}
return nil
}
// Mobile events
func (c *connectorImpl) InsertMobileSession(session *sessions.Session) error {
if session.Duration == nil {
return errors.New("trying to insert mobile session with nil duration")
}
if err := c.batches["ios_sessions"].Append(
session.SessionID,
uint16(session.ProjectID),
session.UserID,
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
session.UserState,
session.UserCity,
datetime(session.Timestamp),
uint32(*session.Duration),
uint16(session.PagesCount),
uint16(session.EventsCount),
uint16(session.ErrorsCount),
uint32(session.IssueScore),
session.Referrer,
session.IssueTypes,
session.TrackerVersion,
session.UserBrowser,
nullableString(session.UserBrowserVersion),
session.Metadata1,
session.Metadata2,
session.Metadata3,
session.Metadata4,
session.Metadata5,
session.Metadata6,
session.Metadata7,
session.Metadata8,
session.Metadata9,
session.Metadata10,
"ios",
session.Timezone,
); err != nil {
c.checkError("ios_sessions", err)
return fmt.Errorf("can't append to sessions batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertMobileCustom(session *sessions.Session, msg *messages.MobileEvent) error {
if err := c.batches["ios_custom"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.Meta().Index,
datetime(uint64(msg.Meta().Timestamp)),
msg.Name,
msg.Payload,
"CUSTOM",
); err != nil {
c.checkError("ios_custom", err)
return fmt.Errorf("can't append to mobile custom batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertMobileClick(session *sessions.Session, msg *messages.MobileClickEvent) error {
if msg.Label == "" {
return nil
}
if err := c.batches["ios_clicks"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MsgID(),
datetime(msg.Timestamp),
msg.Label,
"TAP",
); err != nil {
c.checkError("ios_clicks", err)
return fmt.Errorf("can't append to mobile clicks batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertMobileSwipe(session *sessions.Session, msg *messages.MobileSwipeEvent) error {
if msg.Label == "" {
return nil
}
if err := c.batches["ios_swipes"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MsgID(),
datetime(msg.Timestamp),
msg.Label,
nullableString(msg.Direction),
"SWIPE",
); err != nil {
c.checkError("ios_clicks", err)
return fmt.Errorf("can't append to mobile clicks batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertMobileInput(session *sessions.Session, msg *messages.MobileInputEvent) error {
if msg.Label == "" {
return nil
}
if err := c.batches["ios_inputs"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MsgID(),
datetime(msg.Timestamp),
msg.Label,
"INPUT",
); err != nil {
c.checkError("ios_inputs", err)
return fmt.Errorf("can't append to mobile inputs batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertMobileRequest(session *sessions.Session, msg *messages.MobileNetworkCall, savePayload bool) error {
urlMethod := url.EnsureMethod(msg.Method)
if urlMethod == "" {
return fmt.Errorf("can't parse http method. sess: %d, method: %s", session.SessionID, msg.Method)
}
var request, response *string
if savePayload {
request = &msg.Request
response = &msg.Response
}
if err := c.batches["ios_requests"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.Meta().Index,
datetime(uint64(msg.Meta().Timestamp)),
msg.URL,
request,
response,
uint16(msg.Status),
url.EnsureMethod(msg.Method),
uint16(msg.Duration),
msg.Status < 400,
"REQUEST",
); err != nil {
c.checkError("ios_requests", err)
return fmt.Errorf("can't append to mobile requests batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertMobileCrash(session *sessions.Session, msg *messages.MobileCrash) error {
if err := c.batches["ios_crashes"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MsgID(),
datetime(msg.Timestamp),
msg.Name,
msg.Reason,
msg.Stacktrace,
"CRASH",
); err != nil {
c.checkError("ios_crashes", err)
return fmt.Errorf("can't append to mobile crashges batch: %s", err)
}
return nil
}

View file

@ -19,20 +19,17 @@ type Conn struct {
Pool pool.Pool
batches *batch.BatchSet
bulks *BulkSet
chConn CH // hack for autocomplete inserts, TODO: rewrite
chConn CH
}
func (conn *Conn) SetClickHouse(ch CH) {
conn.chConn = ch
}
func NewConn(log logger.Logger, pool pool.Pool) *Conn {
func NewConn(log logger.Logger, pool pool.Pool, ch CH) *Conn {
if pool == nil {
log.Fatal(context.Background(), "pg pool is empty")
}
return &Conn{
log: log,
Pool: pool,
chConn: ch,
bulks: NewBulkSet(log, pool),
batches: batch.NewBatchSet(log, pool),
}

View file

@ -16,7 +16,7 @@ type Sessions interface {
AddUnStarted(session *UnStartedSession) error
AddCached(sessionID uint64, data map[string]string) error
Get(sessionID uint64) (*Session, error)
GetUpdated(sessionID uint64) (*Session, error)
GetUpdated(sessionID uint64, keepInCache bool) (*Session, error)
GetCached(sessionID uint64) (map[string]string, error)
GetDuration(sessionID uint64) (uint64, error)
UpdateDuration(sessionID uint64, timestamp uint64) (uint64, error)
@ -104,11 +104,14 @@ func (s *sessionsImpl) Get(sessionID uint64) (*Session, error) {
}
// Special method for clickhouse connector
func (s *sessionsImpl) GetUpdated(sessionID uint64) (*Session, error) {
func (s *sessionsImpl) GetUpdated(sessionID uint64, keepInCache bool) (*Session, error) {
session, err := s.getFromDB(sessionID)
if err != nil {
return nil, err
}
if !keepInCache {
return session, nil
}
if err := s.cache.Set(session); err != nil {
ctx := context.WithValue(context.Background(), "sessionID", sessionID)
s.log.Warn(ctx, "failed to cache session: %s", err)

View file

@ -3,7 +3,9 @@ package datasaver
import (
"encoding/json"
"log"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
)
type NetworkRequestFTS struct {
@ -98,6 +100,12 @@ func WrapGraphQL(m *messages.GraphQL, projID uint32) *GraphQLFTS {
}
}
func (s *saverImpl) init() {
if s.cfg.UseQuickwit {
s.producer = queue.NewProducer(s.cfg.MessageSizeLimit, true)
}
}
func (s *saverImpl) sendToFTS(msg messages.Message, projID uint32) {
// Skip, if FTS is disabled
if s.producer == nil {

View file

@ -1,93 +0,0 @@
package datasaver
import (
"log"
"openreplay/backend/pkg/db/clickhouse"
"openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/env"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/sessions"
)
func (s *saverImpl) init() {
s.ch = clickhouse.NewConnector(env.String("CLICKHOUSE_STRING"))
if err := s.ch.Prepare(); err != nil {
log.Fatalf("can't prepare clickhouse: %s", err)
}
s.pg.SetClickHouse(s.ch)
if s.cfg.UseQuickwit {
s.producer = queue.NewProducer(s.cfg.MessageSizeLimit, true)
}
}
func (s *saverImpl) handleExtraMessage(msg messages.Message) error {
// Get session data
var (
session *sessions.Session
err error
)
if msg.TypeID() == messages.MsgSessionEnd || msg.TypeID() == messages.MsgMobileSessionEnd {
session, err = s.sessions.GetUpdated(msg.SessionID())
} else {
session, err = s.sessions.Get(msg.SessionID())
}
if err != nil || session == nil {
log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, msg.SessionID(), msg)
return err
}
// Send data to quickwit
s.sendToFTS(msg, session.ProjectID)
// Handle message
switch m := msg.(type) {
case *messages.SessionEnd:
return s.ch.InsertWebSession(session)
case *messages.PerformanceTrackAggr:
return s.ch.InsertWebPerformanceTrackAggr(session, m)
case *messages.MouseClick:
return s.ch.InsertWebClickEvent(session, m)
// Unique for Web
case *messages.PageEvent:
return s.ch.InsertWebPageEvent(session, m)
case *messages.JSException:
wrapper, _ := types.WrapJSException(m)
return s.ch.InsertWebErrorEvent(session, wrapper)
case *messages.IntegrationEvent:
return s.ch.InsertWebErrorEvent(session, types.WrapIntegrationEvent(m))
case *messages.IssueEvent:
return s.ch.InsertIssue(session, m)
case *messages.CustomEvent:
return s.ch.InsertCustom(session, m)
case *messages.NetworkRequest:
if err := s.ch.InsertRequest(session, m, session.SaveRequestPayload); err != nil {
log.Printf("can't insert request event into clickhouse: %s", err)
}
case *messages.GraphQL:
return s.ch.InsertGraphQL(session, m)
case *messages.InputChange:
return s.ch.InsertWebInputDuration(session, m)
case *messages.MouseThrashing:
return s.ch.InsertMouseThrashing(session, m)
// Mobile messages
case *messages.MobileSessionEnd:
return s.ch.InsertMobileSession(session)
case *messages.MobileEvent:
return s.ch.InsertMobileCustom(session, m)
case *messages.MobileClickEvent:
return s.ch.InsertMobileClick(session, m)
case *messages.MobileSwipeEvent:
return s.ch.InsertMobileSwipe(session, m)
case *messages.MobileInputEvent:
return s.ch.InsertMobileInput(session, m)
case *messages.MobileNetworkCall:
return s.ch.InsertMobileRequest(session, m, session.SaveRequestPayload)
case *messages.MobileCrash:
return s.ch.InsertMobileCrash(session, m)
}
return nil
}

View file

@ -1,713 +0,0 @@
package clickhouse
import (
"errors"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"log"
"openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/hashid"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/sessions"
"openreplay/backend/pkg/url"
"os"
"strings"
"time"
"openreplay/backend/pkg/license"
)
type Connector interface {
Prepare() error
Commit() error
Stop() error
// Web
InsertWebSession(session *sessions.Session) error
InsertWebPageEvent(session *sessions.Session, msg *messages.PageEvent) error
InsertWebClickEvent(session *sessions.Session, msg *messages.MouseClick) error
InsertWebErrorEvent(session *sessions.Session, msg *types.ErrorEvent) error
InsertWebPerformanceTrackAggr(session *sessions.Session, msg *messages.PerformanceTrackAggr) error
InsertAutocomplete(session *sessions.Session, msgType, msgValue string) error
InsertRequest(session *sessions.Session, msg *messages.NetworkRequest, savePayload bool) error
InsertCustom(session *sessions.Session, msg *messages.CustomEvent) error
InsertGraphQL(session *sessions.Session, msg *messages.GraphQL) error
InsertIssue(session *sessions.Session, msg *messages.IssueEvent) error
InsertWebInputDuration(session *sessions.Session, msg *messages.InputChange) error
InsertMouseThrashing(session *sessions.Session, msg *messages.MouseThrashing) error
// Mobile
InsertMobileSession(session *sessions.Session) error
InsertMobileCustom(session *sessions.Session, msg *messages.MobileEvent) error
InsertMobileClick(session *sessions.Session, msg *messages.MobileClickEvent) error
InsertMobileSwipe(session *sessions.Session, msg *messages.MobileSwipeEvent) error
InsertMobileInput(session *sessions.Session, msg *messages.MobileInputEvent) error
InsertMobileRequest(session *sessions.Session, msg *messages.MobileNetworkCall, savePayload bool) error
InsertMobileCrash(session *sessions.Session, msg *messages.MobileCrash) error
}
type task struct {
bulks []Bulk
}
func NewTask() *task {
return &task{bulks: make([]Bulk, 0, 21)}
}
type connectorImpl struct {
conn driver.Conn
batches map[string]Bulk //driver.Batch
workerTask chan *task
done chan struct{}
finished chan struct{}
}
func getEnv(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
}
func NewConnector(url string) Connector {
license.CheckLicense()
url = strings.TrimPrefix(url, "tcp://")
url = strings.TrimSuffix(url, "/default")
userName := getEnv("CH_USERNAME", "default")
password := getEnv("CH_PASSWORD", "")
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{url},
Auth: clickhouse.Auth{
Database: "default",
Username: userName,
Password: password,
},
MaxOpenConns: 20,
MaxIdleConns: 15,
ConnMaxLifetime: 3 * time.Minute,
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
},
})
if err != nil {
log.Fatal(err)
}
c := &connectorImpl{
conn: conn,
batches: make(map[string]Bulk, 20),
workerTask: make(chan *task, 1),
done: make(chan struct{}),
finished: make(chan struct{}),
}
go c.worker()
return c
}
func (c *connectorImpl) newBatch(name, query string) error {
batch, err := NewBulk(c.conn, name, query)
if err != nil {
return fmt.Errorf("can't create new batch: %s", err)
}
c.batches[name] = batch
return nil
}
var batches = map[string]string{
// Web
"sessions": "INSERT INTO experimental.sessions (session_id, project_id, user_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, user_state, user_city, datetime, duration, pages_count, events_count, errors_count, issue_score, referrer, issue_types, tracker_version, user_browser, user_browser_version, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10, timezone, utm_source, utm_medium, utm_campaign) VALUES (?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), ?, ?, ?, ?)",
"autocompletes": "INSERT INTO experimental.autocomplete (project_id, type, value) VALUES (?, ?, SUBSTR(?, 1, 8000))",
"pages": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, url, request_start, response_start, response_end, dom_content_loaded_event_start, dom_content_loaded_event_end, load_event_start, load_event_end, first_paint, first_contentful_paint_time, speed_index, visually_complete, time_to_interactive, url_path, event_type) VALUES (?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), ?)",
"clicks": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, label, hesitation_time, event_type, selector, normalized_x, normalized_y, url, url_path) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000))",
"inputs": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, label, event_type, duration, hesitation_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
"errors": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, source, name, message, error_id, event_type, error_tags_keys, error_tags_values) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"performance": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, url, min_fps, avg_fps, max_fps, min_cpu, avg_cpu, max_cpu, min_total_js_heap_size, avg_total_js_heap_size, max_total_js_heap_size, min_used_js_heap_size, avg_used_js_heap_size, max_used_js_heap_size, event_type) VALUES (?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"requests": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, url, request_body, response_body, status, method, duration, success, event_type, transfer_size, url_path) VALUES (?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000))",
"custom": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, name, payload, event_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
"graphql": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, name, request_body, response_body, event_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
"issuesEvents": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, issue_id, issue_type, event_type, url, url_path) VALUES (?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000))",
"issues": "INSERT INTO experimental.issues (project_id, issue_id, type, context_string) VALUES (?, ?, ?, ?)",
//Mobile
"ios_sessions": "INSERT INTO experimental.sessions (session_id, project_id, user_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, user_state, user_city, datetime, duration, pages_count, events_count, errors_count, issue_score, referrer, issue_types, tracker_version, user_browser, user_browser_version, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10, platform, timezone) VALUES (?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), ?, ?)",
"ios_custom": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, name, payload, event_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
"ios_clicks": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, label, event_type) VALUES (?, ?, ?, ?, ?, ?)",
"ios_swipes": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, label, direction, event_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
"ios_inputs": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, label, event_type) VALUES (?, ?, ?, ?, ?, ?)",
"ios_requests": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, url, request_body, response_body, status, method, duration, success, event_type) VALUES (?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?)",
"ios_crashes": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, name, reason, stacktrace, event_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
}
func (c *connectorImpl) Prepare() error {
for table, query := range batches {
if err := c.newBatch(table, query); err != nil {
return fmt.Errorf("can't create %s batch: %s", table, err)
}
}
return nil
}
func (c *connectorImpl) Commit() error {
newTask := NewTask()
for _, b := range c.batches {
newTask.bulks = append(newTask.bulks, b)
}
c.batches = make(map[string]Bulk, 20)
if err := c.Prepare(); err != nil {
log.Printf("can't prepare new CH batch set: %s", err)
}
c.workerTask <- newTask
return nil
}
func (c *connectorImpl) Stop() error {
c.done <- struct{}{}
<-c.finished
return c.conn.Close()
}
func (c *connectorImpl) sendBulks(t *task) {
for _, b := range t.bulks {
if err := b.Send(); err != nil {
log.Printf("can't send batch: %s", err)
}
}
}
func (c *connectorImpl) worker() {
for {
select {
case t := <-c.workerTask:
c.sendBulks(t)
case <-c.done:
for t := range c.workerTask {
c.sendBulks(t)
}
c.finished <- struct{}{}
return
}
}
}
func (c *connectorImpl) checkError(name string, err error) {
if err != clickhouse.ErrBatchAlreadySent {
log.Printf("can't create %s batch after failed append operation: %s", name, err)
}
}
func (c *connectorImpl) InsertWebInputDuration(session *sessions.Session, msg *messages.InputChange) error {
if msg.Label == "" {
return nil
}
if err := c.batches["inputs"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MsgID(),
datetime(msg.Timestamp),
msg.Label,
"INPUT",
nullableUint16(uint16(msg.InputDuration)),
nullableUint32(uint32(msg.HesitationTime)),
); err != nil {
c.checkError("inputs", err)
return fmt.Errorf("can't append to inputs batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertMouseThrashing(session *sessions.Session, msg *messages.MouseThrashing) error {
issueID := hashid.MouseThrashingID(session.ProjectID, session.SessionID, msg.Timestamp)
// Insert issue event to batches
if err := c.batches["issuesEvents"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MsgID(),
datetime(msg.Timestamp),
issueID,
"mouse_thrashing",
"ISSUE",
msg.Url,
extractUrlPath(msg.Url),
); err != nil {
c.checkError("issuesEvents", err)
return fmt.Errorf("can't append to issuesEvents batch: %s", err)
}
if err := c.batches["issues"].Append(
uint16(session.ProjectID),
issueID,
"mouse_thrashing",
msg.Url,
); err != nil {
c.checkError("issues", err)
return fmt.Errorf("can't append to issues batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertIssue(session *sessions.Session, msg *messages.IssueEvent) error {
issueID := hashid.IssueID(session.ProjectID, msg)
// Check issue type before insert to avoid panic from clickhouse lib
switch msg.Type {
case "click_rage", "dead_click", "excessive_scrolling", "bad_request", "missing_resource", "memory", "cpu", "slow_resource", "slow_page_load", "crash", "ml_cpu", "ml_memory", "ml_dead_click", "ml_click_rage", "ml_mouse_thrashing", "ml_excessive_scrolling", "ml_slow_resources", "custom", "js_exception", "mouse_thrashing", "app_crash":
default:
return fmt.Errorf("unknown issueType: %s", msg.Type)
}
// Insert issue event to batches
if err := c.batches["issuesEvents"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MessageID,
datetime(msg.Timestamp),
issueID,
msg.Type,
"ISSUE",
msg.URL,
extractUrlPath(msg.URL),
); err != nil {
c.checkError("issuesEvents", err)
return fmt.Errorf("can't append to issuesEvents batch: %s", err)
}
if err := c.batches["issues"].Append(
uint16(session.ProjectID),
issueID,
msg.Type,
msg.ContextString,
); err != nil {
c.checkError("issues", err)
return fmt.Errorf("can't append to issues batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertWebSession(session *sessions.Session) error {
if session.Duration == nil {
return errors.New("trying to insert session with nil duration")
}
if err := c.batches["sessions"].Append(
session.SessionID,
uint16(session.ProjectID),
session.UserID,
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
session.UserState,
session.UserCity,
datetime(session.Timestamp),
uint32(*session.Duration),
uint16(session.PagesCount),
uint16(session.EventsCount),
uint16(session.ErrorsCount),
uint32(session.IssueScore),
session.Referrer,
session.IssueTypes,
session.TrackerVersion,
session.UserBrowser,
nullableString(session.UserBrowserVersion),
session.Metadata1,
session.Metadata2,
session.Metadata3,
session.Metadata4,
session.Metadata5,
session.Metadata6,
session.Metadata7,
session.Metadata8,
session.Metadata9,
session.Metadata10,
session.Timezone,
session.UtmSource,
session.UtmMedium,
session.UtmCampaign,
); err != nil {
c.checkError("sessions", err)
return fmt.Errorf("can't append to sessions batch: %s", err)
}
return nil
}
func extractUrlPath(fullUrl string) string {
_, path, query, err := url.GetURLParts(fullUrl)
if err != nil {
log.Printf("can't parse url: %s", err)
return ""
}
pathQuery := path
if query != "" {
pathQuery += "?" + query
}
return strings.ToLower(pathQuery)
}
func (c *connectorImpl) InsertWebPageEvent(session *sessions.Session, msg *messages.PageEvent) error {
if err := c.batches["pages"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MessageID,
datetime(msg.Timestamp),
msg.URL,
nullableUint16(uint16(msg.RequestStart)),
nullableUint16(uint16(msg.ResponseStart)),
nullableUint16(uint16(msg.ResponseEnd)),
nullableUint16(uint16(msg.DomContentLoadedEventStart)),
nullableUint16(uint16(msg.DomContentLoadedEventEnd)),
nullableUint16(uint16(msg.LoadEventStart)),
nullableUint16(uint16(msg.LoadEventEnd)),
nullableUint16(uint16(msg.FirstPaint)),
nullableUint16(uint16(msg.FirstContentfulPaint)),
nullableUint16(uint16(msg.SpeedIndex)),
nullableUint16(uint16(msg.VisuallyComplete)),
nullableUint16(uint16(msg.TimeToInteractive)),
extractUrlPath(msg.URL),
"LOCATION",
); err != nil {
c.checkError("pages", err)
return fmt.Errorf("can't append to pages batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertWebClickEvent(session *sessions.Session, msg *messages.MouseClick) error {
if msg.Label == "" {
return nil
}
var nX *float32 = nil
var nY *float32 = nil
if msg.NormalizedX != 101 && msg.NormalizedY != 101 {
// To support previous versions of tracker
if msg.NormalizedX <= 100 && msg.NormalizedY <= 100 {
msg.NormalizedX *= 100
msg.NormalizedY *= 100
}
normalizedX := float32(msg.NormalizedX) / 100.0
normalizedY := float32(msg.NormalizedY) / 100.0
nXVal := normalizedX
nX = &nXVal
nYVal := normalizedY
nY = &nYVal
}
if err := c.batches["clicks"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MsgID(),
datetime(msg.Timestamp),
msg.Label,
nullableUint32(uint32(msg.HesitationTime)),
"CLICK",
msg.Selector,
nX,
nY,
msg.Url,
extractUrlPath(msg.Url),
); err != nil {
c.checkError("clicks", err)
return fmt.Errorf("can't append to clicks batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertWebErrorEvent(session *sessions.Session, msg *types.ErrorEvent) error {
keys, values := make([]string, 0, len(msg.Tags)), make([]*string, 0, len(msg.Tags))
for k, v := range msg.Tags {
keys = append(keys, k)
values = append(values, v)
}
// Check error source before insert to avoid panic from clickhouse lib
switch msg.Source {
case "js_exception", "bugsnag", "cloudwatch", "datadog", "elasticsearch", "newrelic", "rollbar", "sentry", "stackdriver", "sumologic":
default:
return fmt.Errorf("unknown error source: %s", msg.Source)
}
msgID, _ := msg.ID(session.ProjectID)
// Insert event to batch
if err := c.batches["errors"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MessageID,
datetime(msg.Timestamp),
msg.Source,
nullableString(msg.Name),
msg.Message,
msgID,
"ERROR",
keys,
values,
); err != nil {
c.checkError("errors", err)
return fmt.Errorf("can't append to errors batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertWebPerformanceTrackAggr(session *sessions.Session, msg *messages.PerformanceTrackAggr) error {
var timestamp uint64 = (msg.TimestampStart + msg.TimestampEnd) / 2
if err := c.batches["performance"].Append(
session.SessionID,
uint16(session.ProjectID),
uint64(0), // TODO: find messageID for performance events
datetime(timestamp),
nullableString(msg.Meta().Url),
uint8(msg.MinFPS),
uint8(msg.AvgFPS),
uint8(msg.MaxFPS),
uint8(msg.MinCPU),
uint8(msg.AvgCPU),
uint8(msg.MaxCPU),
msg.MinTotalJSHeapSize,
msg.AvgTotalJSHeapSize,
msg.MaxTotalJSHeapSize,
msg.MinUsedJSHeapSize,
msg.AvgUsedJSHeapSize,
msg.MaxUsedJSHeapSize,
"PERFORMANCE",
); err != nil {
c.checkError("performance", err)
return fmt.Errorf("can't append to performance batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertAutocomplete(session *sessions.Session, msgType, msgValue string) error {
if len(msgValue) == 0 {
return nil
}
if err := c.batches["autocompletes"].Append(
uint16(session.ProjectID),
msgType,
msgValue,
); err != nil {
c.checkError("autocompletes", err)
return fmt.Errorf("can't append to autocompletes batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertRequest(session *sessions.Session, msg *messages.NetworkRequest, savePayload bool) error {
urlMethod := url.EnsureMethod(msg.Method)
if urlMethod == "" {
return fmt.Errorf("can't parse http method. sess: %d, method: %s", session.SessionID, msg.Method)
}
var request, response *string
if savePayload {
request = &msg.Request
response = &msg.Response
}
if err := c.batches["requests"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.Meta().Index,
datetime(uint64(msg.Meta().Timestamp)),
msg.URL,
request,
response,
uint16(msg.Status),
url.EnsureMethod(msg.Method),
uint16(msg.Duration),
msg.Status < 400,
"REQUEST",
uint32(msg.TransferredBodySize),
extractUrlPath(msg.URL),
); err != nil {
c.checkError("requests", err)
return fmt.Errorf("can't append to requests batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertCustom(session *sessions.Session, msg *messages.CustomEvent) error {
if err := c.batches["custom"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.Meta().Index,
datetime(uint64(msg.Meta().Timestamp)),
msg.Name,
msg.Payload,
"CUSTOM",
); err != nil {
c.checkError("custom", err)
return fmt.Errorf("can't append to custom batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertGraphQL(session *sessions.Session, msg *messages.GraphQL) error {
if err := c.batches["graphql"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.Meta().Index,
datetime(uint64(msg.Meta().Timestamp)),
msg.OperationName,
nullableString(msg.Variables),
nullableString(msg.Response),
"GRAPHQL",
); err != nil {
c.checkError("graphql", err)
return fmt.Errorf("can't append to graphql batch: %s", err)
}
return nil
}
// Mobile events
func (c *connectorImpl) InsertMobileSession(session *sessions.Session) error {
if session.Duration == nil {
return errors.New("trying to insert mobile session with nil duration")
}
if err := c.batches["ios_sessions"].Append(
session.SessionID,
uint16(session.ProjectID),
session.UserID,
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
session.UserState,
session.UserCity,
datetime(session.Timestamp),
uint32(*session.Duration),
uint16(session.PagesCount),
uint16(session.EventsCount),
uint16(session.ErrorsCount),
uint32(session.IssueScore),
session.Referrer,
session.IssueTypes,
session.TrackerVersion,
session.UserBrowser,
nullableString(session.UserBrowserVersion),
session.Metadata1,
session.Metadata2,
session.Metadata3,
session.Metadata4,
session.Metadata5,
session.Metadata6,
session.Metadata7,
session.Metadata8,
session.Metadata9,
session.Metadata10,
"ios",
session.Timezone,
); err != nil {
c.checkError("ios_sessions", err)
return fmt.Errorf("can't append to sessions batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertMobileCustom(session *sessions.Session, msg *messages.MobileEvent) error {
if err := c.batches["ios_custom"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.Meta().Index,
datetime(uint64(msg.Meta().Timestamp)),
msg.Name,
msg.Payload,
"CUSTOM",
); err != nil {
c.checkError("ios_custom", err)
return fmt.Errorf("can't append to mobile custom batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertMobileClick(session *sessions.Session, msg *messages.MobileClickEvent) error {
if msg.Label == "" {
return nil
}
if err := c.batches["ios_clicks"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MsgID(),
datetime(msg.Timestamp),
msg.Label,
"TAP",
); err != nil {
c.checkError("ios_clicks", err)
return fmt.Errorf("can't append to mobile clicks batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertMobileSwipe(session *sessions.Session, msg *messages.MobileSwipeEvent) error {
if msg.Label == "" {
return nil
}
if err := c.batches["ios_swipes"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MsgID(),
datetime(msg.Timestamp),
msg.Label,
nullableString(msg.Direction),
"SWIPE",
); err != nil {
c.checkError("ios_clicks", err)
return fmt.Errorf("can't append to mobile clicks batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertMobileInput(session *sessions.Session, msg *messages.MobileInputEvent) error {
if msg.Label == "" {
return nil
}
if err := c.batches["ios_inputs"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MsgID(),
datetime(msg.Timestamp),
msg.Label,
"INPUT",
); err != nil {
c.checkError("ios_inputs", err)
return fmt.Errorf("can't append to mobile inputs batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertMobileRequest(session *sessions.Session, msg *messages.MobileNetworkCall, savePayload bool) error {
urlMethod := url.EnsureMethod(msg.Method)
if urlMethod == "" {
return fmt.Errorf("can't parse http method. sess: %d, method: %s", session.SessionID, msg.Method)
}
var request, response *string
if savePayload {
request = &msg.Request
response = &msg.Response
}
if err := c.batches["ios_requests"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.Meta().Index,
datetime(uint64(msg.Meta().Timestamp)),
msg.URL,
request,
response,
uint16(msg.Status),
url.EnsureMethod(msg.Method),
uint16(msg.Duration),
msg.Status < 400,
"REQUEST",
); err != nil {
c.checkError("ios_requests", err)
return fmt.Errorf("can't append to mobile requests batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertMobileCrash(session *sessions.Session, msg *messages.MobileCrash) error {
if err := c.batches["ios_crashes"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MsgID(),
datetime(msg.Timestamp),
msg.Name,
msg.Reason,
msg.Stacktrace,
"CRASH",
); err != nil {
c.checkError("ios_crashes", err)
return fmt.Errorf("can't append to mobile crashges batch: %s", err)
}
return nil
}