From 75eb6924c17917a22670626d6fa816bccf464eec Mon Sep 17 00:00:00 2001 From: Alexander Date: Fri, 5 Jul 2024 16:16:02 +0200 Subject: [PATCH] feat(connector): added s3 batches support for CH connector --- backend/internal/config/connector/config.go | 1 + ee/backend/pkg/connector/clickhouse.go | 46 +++++++++++++++++++-- ee/backend/pkg/connector/model.go | 5 ++- ee/backend/pkg/connector/s3batches.go | 33 +++++++++++++++ ee/backend/pkg/connector/saver.go | 1 + 5 files changed, 82 insertions(+), 4 deletions(-) diff --git a/backend/internal/config/connector/config.go b/backend/internal/config/connector/config.go index 32128fd6b..4a2d0eb85 100644 --- a/backend/internal/config/connector/config.go +++ b/backend/internal/config/connector/config.go @@ -29,6 +29,7 @@ type Config struct { CommitBatchTimeout time.Duration `env:"COMMIT_BATCH_TIMEOUT,default=5s"` UseProfiler bool `env:"PROFILER_ENABLED,default=false"` ProjectIDs string `env:"PROJECT_IDS"` + UseS3Batches bool `env:"CLICKHOUSE_USE_S3_BATCHES,default=true"` } func New(log logger.Logger) *Config { diff --git a/ee/backend/pkg/connector/clickhouse.go b/ee/backend/pkg/connector/clickhouse.go index 0b0882462..a52b4f3a4 100644 --- a/ee/backend/pkg/connector/clickhouse.go +++ b/ee/backend/pkg/connector/clickhouse.go @@ -2,11 +2,11 @@ package connector import ( "context" + "fmt" "strconv" "strings" "time" - "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "openreplay/backend/internal/config/connector" @@ -55,9 +55,29 @@ func NewClickHouse(log logger.Logger, cfg *connector.Config, batches *Batches) ( } func (c *ClickHouse) InsertEvents(batch []map[string]string) error { + if c.cfg.UseS3Batches { + return c.insertEventsUsingS3Batch(batch) + } return c.insertEventsUsingBuffer(batch) } +const batchEventsSQL = "INSERT INTO %s SELECT * FROM s3('s3://%s/%s', 'TSVWithNames', 'sessionid UInt64, consolelog_level Nullable(String), consolelog_value Nullable(String), customevent_name Nullable(String), customevent_payload Nullable(String), jsexception_message Nullable(String), jsexception_name Nullable(String), jsexception_payload Nullable(String), jsexception_metadata Nullable(String), networkrequest_type Nullable(String), networkrequest_method Nullable(String), networkrequest_url Nullable(String), networkrequest_request Nullable(String), networkrequest_response Nullable(String), networkrequest_status Nullable(UInt64), networkrequest_timestamp Nullable(UInt64), networkrequest_duration Nullable(UInt64), issueevent_message_id Nullable(UInt64), issueevent_timestamp Nullable(UInt64), issueevent_type Nullable(String), issueevent_context_string Nullable(String), issueevent_context Nullable(String), issueevent_payload Nullable(String), issueevent_url Nullable(String), customissue_name Nullable(String), customissue_payload Nullable(String), mobile_event_name Nullable(String), mobile_event_payload Nullable(String), mobile_networkcall_type Nullable(String), mobile_networkcall_method Nullable(String), mobile_networkcall_url Nullable(String), mobile_networkcall_request Nullable(String), mobile_networkcall_response Nullable(String), mobile_networkcall_status Nullable(UInt64), mobile_networkcall_timestamp Nullable(UInt64), mobile_networkcall_duration Nullable(UInt64), mobile_clickevent_x Nullable(UInt64), mobile_clickevent_y Nullable(UInt64), mobile_clickevent_timestamp Nullable(UInt64), mobile_clickevent_label Nullable(String), mobile_swipeevent_x Nullable(UInt64), mobile_swipeevent_y Nullable(UInt64), mobile_swipeevent_timestamp Nullable(UInt64), mobile_swipeevent_label Nullable(String), mobile_inputevent_label Nullable(String), mobile_inputevent_value Nullable(String), mobile_crash_name Nullable(String), mobile_crash_reason Nullable(String), mobile_crash_stacktrace Nullable(String), mobile_issueevent_timestamp Nullable(UInt64), mobile_issueevent_type Nullable(String), mobile_issueevent_context_string Nullable(String), mobile_issueevent_context Nullable(String), mobile_issueevent_payload Nullable(String), mouseclick_label Nullable(String), mouseclick_selector Nullable(String), mouseclick_url Nullable(String), mouseclick_hesitation_time Nullable(UInt64), mouseclick_timestamp Nullable(UInt64), pageevent_url Nullable(String), pageevent_referrer Nullable(String), pageevent_speed_index Nullable(UInt64), pageevent_timestamp Nullable(UInt64), inputevent_label Nullable(String), inputevent_hesitation_time Nullable(UInt64), inputevent_input_duration Nullable(UInt64), inputevent_timestamp Nullable(UInt64), mobile_viewcomponentevent_screen_name Nullable(String), mobile_viewcomponentevent_view_name Nullable(String), mobile_viewcomponentevent_visible Nullable(String), mobile_viewcomponentevent_timestamp Nullable(UInt64), received_at UInt64') SETTINGS format_csv_delimiter = '|'" + +func (c *ClickHouse) insertEventsUsingS3Batch(batch []map[string]string) error { + fileName := generateTSVName(c.cfg.EventsTableName) + //if err := c.batches.Insert(batch, fileName, eventColumns); err != nil { + if err := c.batches.InsertTSV(batch, fileName, eventColumns); err != nil { + return fmt.Errorf("can't insert events batch: %s", err) + } + // Copy data from s3 bucket to ClickHouse + sql := fmt.Sprintf(batchEventsSQL, c.cfg.EventsTableName, c.cfg.BucketName, fileName) + if err := c.conn.Exec(context.Background(), sql); err != nil { + return fmt.Errorf("can't copy data from s3 to ClickHouse: %s", err) + } + c.log.Info(context.Background(), "events batch of %d events is successfully saved", len(batch)) + return nil +} + const eventsSQL = "INSERT INTO connector_events_buffer (sessionid, consolelog_level, consolelog_value, customevent_name, customevent_payload, jsexception_message, jsexception_name, jsexception_payload, jsexception_metadata, networkrequest_type, networkrequest_method, networkrequest_url, networkrequest_request, networkrequest_response, networkrequest_status, networkrequest_timestamp, networkrequest_duration, issueevent_message_id, issueevent_timestamp, issueevent_type, issueevent_context_string, issueevent_context, issueevent_payload, issueevent_url, customissue_name, customissue_payload, received_at, batch_order_number) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" func (c *ClickHouse) insertEventsUsingBuffer(batch []map[string]string) error { @@ -105,13 +125,33 @@ func (c *ClickHouse) insertEventsUsingBuffer(batch []map[string]string) error { } func (c *ClickHouse) InsertSessions(batch []map[string]string) error { + if c.cfg.UseS3Batches { + return c.insertSessionsUsingS3Batch(batch) + } return c.insertSessionsUsingBuffer(batch) } -const sessionsSQL = "INSERT INTO connector_user_sessions_buffer (sessionid, user_agent, user_browser, user_browser_version, user_country, user_device, user_device_heap_size, user_device_memory_size, user_device_type, user_os, user_os_version, user_uuid, connection_effective_bandwidth, connection_type, referrer, user_anonymous_id, user_id, session_start_timestamp, session_end_timestamp, session_duration, first_contentful_paint, speed_index, visually_complete, timing_time_to_interactive, avg_cpu, avg_fps, max_cpu, max_fps, max_total_js_heap_size, max_used_js_heap_size, js_exceptions_count, inputs_count, clicks_count, issues_count, pages_count, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" +const batchSessionsSQL = "INSERT INTO %s SELECT * FROM s3('s3://%s/%s', 'TSVWithNames', 'sessionid UInt64, user_browser Nullable(String), user_browser_version Nullable(String), user_country Nullable(String), user_city Nullable(String), user_state Nullable(String), user_device Nullable(String), user_device_heap_size Nullable(UInt64), user_device_memory_size Nullable(UInt64), user_device_type Nullable(String), user_os Nullable(String), user_os_version Nullable(String), user_uuid Nullable(String), connection_effective_bandwidth Nullable(UInt64), connection_type Nullable(String), referrer Nullable(String), user_anonymous_id Nullable(String), user_id Nullable(String), tracker_version Nullable(String), rev_id Nullable(String), session_start_timestamp Nullable(UInt64), session_end_timestamp Nullable(UInt64), session_duration Nullable(UInt64), first_contentful_paint Nullable(UInt64), speed_index Nullable(UInt64), visually_complete Nullable(UInt64), timing_time_to_interactive Nullable(UInt64), avg_cpu Nullable(UInt64), avg_fps Nullable(UInt64), max_cpu Nullable(UInt64), max_fps Nullable(UInt64), max_total_js_heap_size Nullable(UInt64), max_used_js_heap_size Nullable(UInt64), js_exceptions_count Nullable(UInt64), inputs_count Nullable(UInt64), clicks_count Nullable(UInt64), issues_count Nullable(UInt64), pages_count Nullable(UInt64), metadata_1 Nullable(String), metadata_2 Nullable(String), metadata_3 Nullable(String), metadata_4 Nullable(String), metadata_5 Nullable(String), metadata_6 Nullable(String), metadata_7 Nullable(String), metadata_8 Nullable(String), metadata_9 Nullable(String), metadata_10 Nullable(String)') SETTINGS format_csv_delimiter = '|'" + +func (c *ClickHouse) insertSessionsUsingS3Batch(batch []map[string]string) error { + fileName := generateTSVName(c.cfg.SessionsTableName) + if err := c.batches.InsertTSV(batch, fileName, sessionColumns); err != nil { + return fmt.Errorf("can't insert sessions batch: %s", err) + } + // Copy data from s3 bucket to ClickHouse + sql := fmt.Sprintf(batchSessionsSQL, c.cfg.SessionsTableName, c.cfg.BucketName, fileName) + if err := c.conn.Exec(context.Background(), sql); err != nil { + return fmt.Errorf("can't copy data from s3 to ClickHouse: %s", err) + } + c.log.Info(context.Background(), "sessions batch of %d sessions is successfully saved", len(batch)) + return nil +} + +const sessionsSQL = "INSERT INTO %s_buffer (sessionid, user_agent, user_browser, user_browser_version, user_country, user_device, user_device_heap_size, user_device_memory_size, user_device_type, user_os, user_os_version, user_uuid, connection_effective_bandwidth, connection_type, referrer, user_anonymous_id, user_id, session_start_timestamp, session_end_timestamp, session_duration, first_contentful_paint, speed_index, visually_complete, timing_time_to_interactive, avg_cpu, avg_fps, max_cpu, max_fps, max_total_js_heap_size, max_used_js_heap_size, js_exceptions_count, inputs_count, clicks_count, issues_count, pages_count, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" func (c *ClickHouse) insertSessionsUsingBuffer(batch []map[string]string) error { - bulk, err := c.conn.PrepareBatch(context.Background(), sessionsSQL) + sql := fmt.Sprintf(sessionsSQL, c.cfg.SessionsTableName) + bulk, err := c.conn.PrepareBatch(context.Background(), sql) if err != nil { return err } diff --git a/ee/backend/pkg/connector/model.go b/ee/backend/pkg/connector/model.go index fe675b843..4dc2144d9 100644 --- a/ee/backend/pkg/connector/model.go +++ b/ee/backend/pkg/connector/model.go @@ -1,6 +1,8 @@ package connector -import "strconv" +import ( + "strconv" +) var sessionColumns = []string{ "sessionid", @@ -146,6 +148,7 @@ var eventColumns = []string{ "mobile_viewcomponentevent_view_name", "mobile_viewcomponentevent_visible", "mobile_viewcomponentevent_timestamp", + "received_at", } var eventInts = []string{ diff --git a/ee/backend/pkg/connector/s3batches.go b/ee/backend/pkg/connector/s3batches.go index 38669ff7b..518ec316f 100644 --- a/ee/backend/pkg/connector/s3batches.go +++ b/ee/backend/pkg/connector/s3batches.go @@ -31,10 +31,23 @@ func (ds *Batches) Insert(batch []map[string]string, fileName string, columns [] return nil } +func (ds *Batches) InsertTSV(batch []map[string]string, fileName string, columns []string) error { + buf := dataToTSV(batch, columns) + reader := bytes.NewReader(buf.Bytes()) + if err := ds.objStorage.Upload(reader, fileName, "text/tsv", objectstorage.NoCompression); err != nil { + return fmt.Errorf("can't upload file to s3: %s", err) + } + return nil +} + func generateName(table string) string { return fmt.Sprintf("connector_data/%s-%s.csv", table, uuid.New().String()) } +func generateTSVName(table string) string { + return fmt.Sprintf("connector_data/%s-%s.tsv", table, uuid.New().String()) +} + func dataToCSV(batch []map[string]string, columns []string) *bytes.Buffer { buf := bytes.NewBuffer(nil) @@ -54,3 +67,23 @@ func dataToCSV(batch []map[string]string, columns []string) *bytes.Buffer { } return buf } + +func dataToTSV(batch []map[string]string, columns []string) *bytes.Buffer { + buf := bytes.NewBuffer(nil) + + // Write header (column names) + for _, column := range columns { + buf.WriteString(column + "\t") + } + buf.Truncate(buf.Len() - 1) + + // Write data (rows) + for _, data := range batch { + buf.WriteString("\n") + for _, column := range columns { + buf.WriteString(data[column] + "\t") + } + buf.Truncate(buf.Len() - 1) + } + return buf +} diff --git a/ee/backend/pkg/connector/saver.go b/ee/backend/pkg/connector/saver.go index bcaba336a..b8a92e431 100644 --- a/ee/backend/pkg/connector/saver.go +++ b/ee/backend/pkg/connector/saver.go @@ -162,6 +162,7 @@ func handleEvent(msg messages.Message) map[string]string { return nil } event["sessionid"] = fmt.Sprintf("%d", msg.SessionID()) + event["received_at"] = fmt.Sprintf("%d", time.Now().Unix()) return event }