diff --git a/backend/internal/config/common/config.go b/backend/internal/config/common/config.go index 14401d5bf..95739cf91 100644 --- a/backend/internal/config/common/config.go +++ b/backend/internal/config/common/config.go @@ -42,10 +42,18 @@ func (cfg *Postgres) String() string { // Redshift config type Redshift struct { - ConnectioString string `env:"REDSHIFT_STRING"` - Host string `env:"REDSHIFT_HOST"` - Port int `env:"REDSHIFT_PORT"` - User string `env:"REDSHIFT_USER"` - Password string `env:"REDSHIFT_PASSWORD"` - Database string `env:"REDSHIFT_DATABASE"` + ConnectionString string `env:"REDSHIFT_STRING"` + Host string `env:"REDSHIFT_HOST"` + Port int `env:"REDSHIFT_PORT"` + User string `env:"REDSHIFT_USER"` + Password string `env:"REDSHIFT_PASSWORD"` + Database string `env:"REDSHIFT_DATABASE"` +} + +// Clickhouse config + +type Clickhouse struct { + URL string `env:"CLICKHOUSE_STRING"` + UserName string `env:"CLICKHOUSE_USERNAME,default=default"` + Password string `env:"CLICKHOUSE_PASSWORD,default="` } diff --git a/ee/backend/cmd/connector/main.go b/ee/backend/cmd/connector/main.go index bbbad6a0f..c854a6fba 100644 --- a/ee/backend/cmd/connector/main.go +++ b/ee/backend/cmd/connector/main.go @@ -2,7 +2,6 @@ package main import ( "log" - "openreplay/backend/pkg/db/postgres" "openreplay/backend/pkg/db/postgres/pool" "openreplay/backend/pkg/db/redis" "openreplay/backend/pkg/projects" @@ -28,10 +27,18 @@ func main() { log.Fatalf("can't init object storage: %s", err) } - db, err := saver.NewRedshift(cfg) - if err != nil { - log.Printf("can't init redshift connection: %s", err) - return + var db saver.Database + switch cfg.ConnectorType { + case "redshift": + if db, err = saver.NewRedshift(cfg, objStore); err != nil { + log.Fatalf("can't init redshift connection: %s", err) + } + case "clickhouse": + if db, err = saver.NewClickHouse(cfg); err != nil { + log.Fatalf("can't init clickhouse connection: %s", err) + } + default: + log.Fatalf("unknown connector type: %s", cfg.ConnectorType) } defer db.Close() @@ -43,10 +50,6 @@ func main() { } defer pgConn.Close() - // Init events module - pg := postgres.NewConn(pgConn) - defer pg.Close() - // Init redis connection redisClient, err := redis.New(&cfg.Redis) if err != nil { @@ -58,7 +61,7 @@ func main() { sessManager := sessions.New(pgConn, projManager, redisClient) // Saves messages to Redshift - dataSaver := saver.New(cfg, objStore, db, sessManager) + dataSaver := saver.New(cfg, db, sessManager, projManager) // Message filter msgFilter := []int{messages.MsgConsoleLog, messages.MsgCustomEvent, messages.MsgJSException, diff --git a/ee/backend/pkg/connector/clickhouse.go b/ee/backend/pkg/connector/clickhouse.go new file mode 100644 index 000000000..f5a4c286c --- /dev/null +++ b/ee/backend/pkg/connector/clickhouse.go @@ -0,0 +1,193 @@ +package connector + +import ( + "context" + "log" + "strconv" + "strings" + "time" + + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "openreplay/backend/internal/config/connector" +) + +type ClickHouse struct { + cfg *connector.Config + conn driver.Conn +} + +func NewClickHouse(cfg *connector.Config) (*ClickHouse, error) { + url := cfg.Clickhouse.URL + url = strings.TrimPrefix(url, "tcp://") + url = strings.TrimSuffix(url, "/default") + conn, err := clickhouse.Open(&clickhouse.Options{ + Addr: []string{url}, + Auth: clickhouse.Auth{ + Database: cfg.Clickhouse.Database, + Username: cfg.Clickhouse.UserName, + Password: cfg.Clickhouse.Password, + }, + MaxOpenConns: 20, + MaxIdleConns: 15, + ConnMaxLifetime: 3 * time.Minute, + Compression: &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }, + }) + if err != nil { + return nil, err + } + ctx, _ := context.WithTimeout(context.Background(), 15*time.Second) + if err := conn.Ping(ctx); err != nil { + return nil, err + } + c := &ClickHouse{ + cfg: cfg, + conn: conn, + } + return c, nil +} + +const eventsSQL = "INSERT INTO connector_events_buffer (sessionid, consolelog_level, consolelog_value, customevent_name, customevent_payload, jsexception_message, jsexception_name, jsexception_payload, jsexception_metadata, networkrequest_type, networkrequest_method, networkrequest_url, networkrequest_request, networkrequest_response, networkrequest_status, networkrequest_timestamp, networkrequest_duration, issueevent_message_id, issueevent_timestamp, issueevent_type, issueevent_context_string, issueevent_context, issueevent_payload, issueevent_url, customissue_name, customissue_payload, received_at, batch_order_number) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" + +func (c *ClickHouse) InsertEvents(batch []map[string]string) error { + bulk, err := c.conn.PrepareBatch(context.Background(), eventsSQL) + if err != nil { + return err + } + for _, event := range batch { + if err := bulk.Append( + Uint64(event["sessionid"]), + nullableString(event["consolelog_level"]), + nullableString(event["consolelog_value"]), + nullableString(event["customevent_name"]), + nullableString(event["customevent_payload"]), + nullableString(event["jsexception_message"]), + nullableString(event["jsexception_name"]), + nullableString(event["jsexception_payload"]), + nullableString(event["jsexception_metadata"]), + nullableString(event["networkrequest_type"]), + nullableString(event["networkrequest_method"]), + nullableString(event["networkrequest_url"]), + nullableString(event["networkrequest_request"]), + nullableString(event["networkrequest_response"]), + nullableUint64(event["networkrequest_status"]), + nullableUint64(event["networkrequest_timestamp"]), + nullableUint64(event["networkrequest_duration"]), + nullableString(event["issueevent_message_id"]), + nullableUint64(event["issueevent_timestamp"]), + nullableString(event["issueevent_type"]), + nullableString(event["issueevent_context_string"]), + nullableString(event["issueevent_context"]), + nullableString(event["issueevent_payload"]), + nullableString(event["issueevent_url"]), + nullableString(event["customissue_name"]), + nullableString(event["customissue_payload"]), + nullableUint64(event["received_at"]), + nullableUint64(event["batch_order_number"]), + ); err != nil { + log.Printf("can't append value set to batch, err: %s", err) + } + } + return bulk.Send() +} + +const sessionsSQL = "INSERT INTO connector_user_sessions_buffer (sessionid, user_agent, user_browser, user_browser_version, user_country, user_device, user_device_heap_size, user_device_memory_size, user_device_type, user_os, user_os_version, user_uuid, connection_effective_bandwidth, connection_type, referrer, user_anonymous_id, user_id, session_start_timestamp, session_end_timestamp, session_duration, first_contentful_paint, speed_index, visually_complete, timing_time_to_interactive, avg_cpu, avg_fps, max_cpu, max_fps, max_total_js_heap_size, max_used_js_heap_size, js_exceptions_count, inputs_count, clicks_count, issues_count, pages_count, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" + +func (c *ClickHouse) InsertSessions(batch []map[string]string) error { + bulk, err := c.conn.PrepareBatch(context.Background(), sessionsSQL) + if err != nil { + return err + } + for _, sess := range batch { + if err := bulk.Append( + Uint64(sess["sessionid"]), + nullableString(sess["user_agent"]), + nullableString(sess["user_browser"]), + nullableString(sess["user_browser_version"]), + nullableString(sess["user_country"]), + nullableString(sess["user_device"]), + nullableUint64(sess["user_device_heap_size"]), + nullableUint64(sess["user_device_memory_size"]), + nullableString(sess["user_device_type"]), + nullableString(sess["user_os"]), + nullableString(sess["user_os_version"]), + nullableString(sess["user_uuid"]), + nullableUint64(sess["connection_effective_bandwidth"]), + nullableString(sess["connection_type"]), + nullableString(sess["referrer"]), + nullableString(sess["user_anonymous_id"]), + nullableString(sess["user_id"]), + nullableUint64(sess["session_start_timestamp"]), + nullableUint64(sess["session_end_timestamp"]), + nullableUint64(sess["session_duration"]), + nullableUint64(sess["first_contentful_paint"]), + nullableUint64(sess["speed_index"]), + nullableUint64(sess["visually_complete"]), + nullableUint64(sess["timing_time_to_interactive"]), + nullableUint64(sess["avg_cpu"]), + nullableUint64(sess["avg_fps"]), + nullableUint64(sess["max_cpu"]), + nullableUint64(sess["max_fps"]), + nullableUint64(sess["max_total_js_heap_size"]), + nullableUint64(sess["max_used_js_heap_size"]), + nullableUint64(sess["js_exceptions_count"]), + nullableUint64(sess["inputs_count"]), + nullableUint64(sess["clicks_count"]), + nullableUint64(sess["issues_count"]), + nullableUint64(sess["pages_count"]), + nullableString(sess["metadata_1"]), + nullableString(sess["metadata_2"]), + nullableString(sess["metadata_3"]), + nullableString(sess["metadata_4"]), + nullableString(sess["metadata_5"]), + nullableString(sess["metadata_6"]), + nullableString(sess["metadata_7"]), + nullableString(sess["metadata_8"]), + nullableString(sess["metadata_9"]), + nullableString(sess["metadata_10"]), + ); err != nil { + log.Printf("can't append value set to batch, err: %s", err) + } + } + return bulk.Send() +} + +func (c *ClickHouse) Close() error { + return c.conn.Close() +} + +func Uint64(v string) uint64 { + if v == "" { + return 0 + } + res, err := strconv.Atoi(v) + if err != nil { + log.Printf("can't convert string to uint64, err: %s", err) + return 0 + } + return uint64(res) +} + +func nullableString(v string) *string { + var p *string = nil + if v != "" { + p = &v + } + return p +} + +func nullableUint64(v string) *uint64 { + var p *uint64 = nil + if v != "" { + res, err := strconv.Atoi(v) + if err != nil { + log.Printf("can't convert string to uint64, err: %s", err) + return nil + } + a := uint64(res) + return &a + } + return p +} diff --git a/ee/backend/pkg/connector/db.go b/ee/backend/pkg/connector/db.go new file mode 100644 index 000000000..435cefdf4 --- /dev/null +++ b/ee/backend/pkg/connector/db.go @@ -0,0 +1,7 @@ +package connector + +type Database interface { + InsertEvents(batch []map[string]string) error + InsertSessions(batch []map[string]string) error + Close() error +} diff --git a/ee/backend/pkg/connector/model.go b/ee/backend/pkg/connector/model.go new file mode 100644 index 000000000..0bbbaa5af --- /dev/null +++ b/ee/backend/pkg/connector/model.go @@ -0,0 +1,107 @@ +package connector + +import "strconv" + +var sessionColumns = []string{ + "sessionid", + "user_agent", + "user_browser", + "user_browser_version", + "user_country", + "user_device", + "user_device_heap_size", + "user_device_memory_size", + "user_device_type", + "user_os", + "user_os_version", + "user_uuid", + "connection_effective_bandwidth", + "connection_type", + "referrer", + "user_anonymous_id", + "user_id", + "session_start_timestamp", + "session_end_timestamp", + "session_duration", + "first_contentful_paint", + "speed_index", + "visually_complete", + "timing_time_to_interactive", + "avg_cpu", + "avg_fps", + "max_cpu", + "max_fps", + "max_total_js_heap_size", + "max_used_js_heap_size", + "js_exceptions_count", + "inputs_count", + "clicks_count", + "issues_count", + "pages_count", + "metadata_1", + "metadata_2", + "metadata_3", + "metadata_4", + "metadata_5", + "metadata_6", + "metadata_7", + "metadata_8", + "metadata_9", + "metadata_10", +} + +var sessionInts = []string{ + "user_device_heap_size", + "user_device_memory_size", + "connection_effective_bandwidth", + "first_contentful_paint", + "speed_index", + "visually_complete", + "timing_time_to_interactive", + "avg_cpu", + "avg_fps", + "max_cpu", + "max_fps", + "max_total_js_heap_size", + "max_used_js_heap_size", + "js_exceptions_count", + "inputs_count", + "clicks_count", + "issues_count", + "pages_count", +} + +var eventColumns = []string{ + "sessionid", + "consolelog_level", + "consolelog_value", + "customevent_name", + "customevent_payload", + "jsexception_message", + "jsexception_name", + "jsexception_payload", + "jsexception_metadata", + "networkrequest_type", + "networkrequest_method", + "networkrequest_url", + "networkrequest_request", + "networkrequest_response", + "networkrequest_status", + "networkrequest_timestamp", + "networkrequest_duration", + "issueevent_message_id", + "issueevent_timestamp", + "issueevent_type", + "issueevent_context_string", + "issueevent_context", + "issueevent_payload", + "issueevent_url", + "customissue_name", + "customissue_payload", + "received_at", + "batch_order_number", +} + +func QUOTES(s string) string { + return strconv.Quote(s) +} diff --git a/ee/backend/pkg/connector/redshift.go b/ee/backend/pkg/connector/redshift.go index 36dc96b0e..54aba0037 100644 --- a/ee/backend/pkg/connector/redshift.go +++ b/ee/backend/pkg/connector/redshift.go @@ -1,10 +1,13 @@ package connector import ( + "bytes" "context" "database/sql" "fmt" + "github.com/google/uuid" "log" + "openreplay/backend/pkg/objectstorage" "openreplay/backend/internal/config/connector" @@ -12,17 +15,19 @@ import ( ) type Redshift struct { - cfg *connector.Config - ctx context.Context - db *sql.DB + cfg *connector.Config + ctx context.Context + db *sql.DB + objStorage objectstorage.ObjectStorage } -func NewRedshift(cfg *connector.Config) (*Redshift, error) { +func NewRedshift(cfg *connector.Config, objStorage objectstorage.ObjectStorage) (*Redshift, error) { var source string - if cfg.ConnectioString != "" { - source = cfg.ConnectioString + if cfg.ConnectionString != "" { + source = cfg.ConnectionString } else { - source = fmt.Sprintf("postgres://%s:%s@%s:%d/%s", cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.Database) + source = fmt.Sprintf("postgres://%s:%s@%s:%d/%s", + cfg.Redshift.User, cfg.Redshift.Password, cfg.Redshift.Host, cfg.Redshift.Port, cfg.Redshift.Database) } log.Println("Connecting to Redshift Source: ", source) sqldb, err := sql.Open("postgres", source) @@ -33,12 +38,93 @@ func NewRedshift(cfg *connector.Config) (*Redshift, error) { return nil, err } return &Redshift{ - cfg: cfg, - ctx: context.Background(), - db: sqldb, + cfg: cfg, + ctx: context.Background(), + db: sqldb, + objStorage: objStorage, }, nil } +func eventsToBuffer(batch []map[string]string) *bytes.Buffer { + buf := bytes.NewBuffer(nil) + + // Write header + for _, column := range eventColumns { + buf.WriteString(column + "|") + } + buf.Truncate(buf.Len() - 1) + + // Write data + for _, event := range batch { + buf.WriteString("\n") + for _, column := range eventColumns { + buf.WriteString(event[column] + "|") + } + buf.Truncate(buf.Len() - 1) + } + return buf +} + +func (r *Redshift) InsertEvents(batch []map[string]string) error { + // Send data to S3 + fileName := fmt.Sprintf("connector_data/%s-%s.csv", r.cfg.EventsTableName, uuid.New().String()) + // Create csv file + buf := eventsToBuffer(batch) + + reader := bytes.NewReader(buf.Bytes()) + if err := r.objStorage.Upload(reader, fileName, "text/csv", objectstorage.NoCompression); err != nil { + log.Printf("can't upload file to s3: %s", err) + return err + } + // Copy data from s3 bucket to redshift + if err := r.Copy(r.cfg.EventsTableName, fileName, "|", true, false); err != nil { + log.Printf("can't copy data from s3 to redshift: %s", err) + return err + } + log.Printf("events batch of %d events is successfully saved", len(batch)) + return nil +} + +func sessionsToBuffer(batch []map[string]string) *bytes.Buffer { + buf := bytes.NewBuffer(nil) + + // Write header + for _, column := range sessionColumns { + buf.WriteString(column + "|") + } + buf.Truncate(buf.Len() - 1) + + // Write data + for _, sess := range batch { + buf.WriteString("\n") + for _, column := range sessionColumns { + buf.WriteString(sess[column] + "|") + } + buf.Truncate(buf.Len() - 1) + } + return buf +} + +func (r *Redshift) InsertSessions(batch []map[string]string) error { + // Send data to S3 + fileName := fmt.Sprintf("connector_data/%s-%s.csv", r.cfg.SessionsTableName, uuid.New().String()) + // Create csv file + buf := sessionsToBuffer(batch) + + reader := bytes.NewReader(buf.Bytes()) + if err := r.objStorage.Upload(reader, fileName, "text/csv", objectstorage.NoCompression); err != nil { + log.Printf("can't upload file to s3: %s", err) + return err + } + // Copy data from s3 bucket to redshift + if err := r.Copy(r.cfg.SessionsTableName, fileName, "|", true, false); err != nil { + log.Printf("can't copy data from s3 to redshift: %s", err) + return err + } + log.Printf("sessions batch of %d sessions is successfully saved", len(batch)) + return nil +} + func (r *Redshift) Copy(tableName, fileName, delimiter string, creds, gzip bool) error { var ( credentials string diff --git a/ee/backend/pkg/connector/saver.go b/ee/backend/pkg/connector/saver.go index b65f2f0ba..b6c835249 100644 --- a/ee/backend/pkg/connector/saver.go +++ b/ee/backend/pkg/connector/saver.go @@ -1,26 +1,24 @@ package connector import ( - "bytes" "fmt" - "github.com/google/uuid" "log" "openreplay/backend/internal/http/geoip" + "openreplay/backend/pkg/projects" "openreplay/backend/pkg/sessions" "strconv" "time" config "openreplay/backend/internal/config/connector" "openreplay/backend/pkg/messages" - "openreplay/backend/pkg/objectstorage" ) // Saver collect sessions and events and saves them to Redshift type Saver struct { cfg *config.Config - objStorage objectstorage.ObjectStorage - db *Redshift + db Database sessModule sessions.Sessions + projModule projects.Projects sessions map[uint64]map[string]string updatedSessions map[uint64]bool lastUpdate map[uint64]time.Time @@ -28,7 +26,7 @@ type Saver struct { events []map[string]string } -func New(cfg *config.Config, objStorage objectstorage.ObjectStorage, db *Redshift, sessions sessions.Sessions) *Saver { +func New(cfg *config.Config, db Database, sessions sessions.Sessions, projects projects.Projects) *Saver { if cfg == nil { log.Fatal("connector config is empty") } @@ -42,110 +40,14 @@ func New(cfg *config.Config, objStorage objectstorage.ObjectStorage, db *Redshif } return &Saver{ cfg: cfg, - objStorage: objStorage, db: db, sessModule: sessions, + projModule: projects, updatedSessions: make(map[uint64]bool, 0), lastUpdate: make(map[uint64]time.Time, 0), } } -var sessionColumns = []string{ - "sessionid", - "user_agent", - "user_browser", - "user_browser_version", - "user_country", - "user_device", - "user_device_heap_size", - "user_device_memory_size", - "user_device_type", - "user_os", - "user_os_version", - "user_uuid", - "connection_effective_bandwidth", - "connection_type", - "metadata_key", - "metadata_value", - "referrer", - "user_anonymous_id", - "user_id", - "session_start_timestamp", - "session_end_timestamp", - "session_duration", - "first_contentful_paint", - "speed_index", - "visually_complete", - "timing_time_to_interactive", - "avg_cpu", - "avg_fps", - "max_cpu", - "max_fps", - "max_total_js_heap_size", - "max_used_js_heap_size", - "js_exceptions_count", - "inputs_count", - "clicks_count", - "issues_count", - "urls_count", -} - -var sessionInts = []string{ - "user_device_heap_size", - "user_device_memory_size", - "connection_effective_bandwidth", - "first_contentful_paint", - "speed_index", - "visually_complete", - "timing_time_to_interactive", - "avg_cpu", - "avg_fps", - "max_cpu", - "max_fps", - "max_total_js_heap_size", - "max_used_js_heap_size", - "js_exceptions_count", - "inputs_count", - "clicks_count", - "issues_count", - "urls_count", -} - -var eventColumns = []string{ - "sessionid", - "consolelog_level", - "consolelog_value", - "customevent_name", - "customevent_payload", - "jsexception_message", - "jsexception_name", - "jsexception_payload", - "jsexception_metadata", - "networkrequest_type", - "networkrequest_method", - "networkrequest_url", - "networkrequest_request", - "networkrequest_response", - "networkrequest_status", - "networkrequest_timestamp", - "networkrequest_duration", - "issueevent_message_id", - "issueevent_timestamp", - "issueevent_type", - "issueevent_context_string", - "issueevent_context", - "issueevent_payload", - "issueevent_url", - "customissue_name", - "customissue_payload", - "received_at", - "batch_order_number", -} - -func QUOTES(s string) string { - return strconv.Quote(s) -} - func handleEvent(msg messages.Message) map[string]string { event := make(map[string]string) @@ -192,6 +94,106 @@ func handleEvent(msg messages.Message) map[string]string { return event } +func (s *Saver) updateSessionInfoFromCache(sessID uint64, sess map[string]string) error { + info, err := s.sessModule.Get(sessID) + if err != nil { + return err + } + // Check all required fields are present + if info.Duration != nil { + sess["session_duration"] = fmt.Sprintf("%d", *info.Duration) + } + if sess["session_start_timestamp"] == "" { + sess["session_start_timestamp"] = fmt.Sprintf("%d", info.Timestamp) + } + if sess["session_end_timestamp"] == "" && info.Duration != nil { + sess["session_end_timestamp"] = fmt.Sprintf("%d", info.Timestamp+*info.Duration) + } + if sess["session_duration"] == "" && sess["session_start_timestamp"] != "" && sess["session_end_timestamp"] != "" { + start, err := strconv.Atoi(sess["session_start_timestamp"]) + if err != nil { + log.Printf("Error parsing session_start_timestamp: %v", err) + } + end, err := strconv.Atoi(sess["session_end_timestamp"]) + if err != nil { + log.Printf("Error parsing session_end_timestamp: %v", err) + } + if start != 0 && end != 0 { + sess["session_duration"] = fmt.Sprintf("%d", end-start) + } + } + if sess["user_agent"] == "" && info.UserAgent != "" { + sess["user_agent"] = QUOTES(info.UserAgent) + } + if sess["user_browser"] == "" && info.UserBrowser != "" { + sess["user_browser"] = QUOTES(info.UserBrowser) + } + if sess["user_browser_version"] == "" && info.UserBrowserVersion != "" { + sess["user_browser_version"] = QUOTES(info.UserBrowserVersion) + } + if sess["user_os"] == "" && info.UserOS != "" { + sess["user_os"] = QUOTES(info.UserOS) + } + if sess["user_os_version"] == "" && info.UserOSVersion != "" { + sess["user_os_version"] = QUOTES(info.UserOSVersion) + } + if sess["user_device"] == "" && info.UserDevice != "" { + sess["user_device"] = QUOTES(info.UserDevice) + } + if sess["user_device_type"] == "" && info.UserDeviceType != "" { + sess["user_device_type"] = QUOTES(info.UserDeviceType) + } + if sess["user_device_memory_size"] == "" && info.UserDeviceMemorySize != 0 { + sess["user_device_memory_size"] = fmt.Sprintf("%d", info.UserDeviceMemorySize) + } + if sess["user_device_heap_size"] == "" && info.UserDeviceHeapSize != 0 { + sess["user_device_heap_size"] = fmt.Sprintf("%d", info.UserDeviceHeapSize) + } + if sess["user_country"] == "" && info.UserCountry != "" { + sess["user_country"] = QUOTES(info.UserCountry) + } + if sess["user_city"] == "" && info.UserCity != "" { + sess["user_city"] = QUOTES(info.UserCity) + } + if sess["user_state"] == "" && info.UserState != "" { + sess["user_state"] = QUOTES(info.UserState) + } + if sess["user_uuid"] == "" && info.UserUUID != "" { + sess["user_uuid"] = QUOTES(info.UserUUID) + } + if sess["session_start_timestamp"] == "" && info.Timestamp != 0 { + sess["session_start_timestamp"] = fmt.Sprintf("%d", info.Timestamp) + } + if sess["user_anonymous_id"] == "" && info.UserAnonymousID != nil { + sess["user_anonymous_id"] = QUOTES(*info.UserAnonymousID) + } + if sess["user_id"] == "" && info.UserID != nil { + sess["user_id"] = QUOTES(*info.UserID) + } + if sess["pages_count"] == "" && info.PagesCount != 0 { + sess["pages_count"] = fmt.Sprintf("%d", info.PagesCount) + } + if sess["tracker_version"] == "" && info.TrackerVersion != "" { + sess["tracker_version"] = QUOTES(info.TrackerVersion) + } + if sess["rev_id"] == "" && info.RevID != "" { + sess["rev_id"] = QUOTES(info.RevID) + } + if info.ErrorsCount != 0 { + sess["errors_count"] = fmt.Sprintf("%d", info.ErrorsCount) + } + if info.IssueScore != 0 { + sess["issue_score"] = fmt.Sprintf("%d", info.IssueScore) + } + // Check int fields + for _, field := range sessionInts { + if sess[field] == "" { + sess[field] = fmt.Sprintf("%d", 0) + } + } + return nil +} + func (s *Saver) handleSession(msg messages.Message) { // Filter out messages that are not related to session table switch msg.(type) { @@ -240,85 +242,47 @@ func (s *Saver) handleSession(msg messages.Message) { sess["user_device_type"] = QUOTES(m.UserDeviceType) sess["user_device_memory_size"] = fmt.Sprintf("%d", m.UserDeviceMemorySize) sess["user_device_heap_size"] = fmt.Sprintf("%d", m.UserDeviceHeapSize) + sess["tracker_version"] = QUOTES(m.TrackerVersion) + sess["rev_id"] = QUOTES(m.RevID) geoInfo := geoip.UnpackGeoRecord(m.UserCountry) sess["user_country"] = QUOTES(geoInfo.Country) + sess["user_city"] = QUOTES(geoInfo.City) + sess["user_state"] = QUOTES(geoInfo.State) case *messages.SessionEnd: sess["session_end_timestamp"] = fmt.Sprintf("%d", m.Timestamp) - info, err := s.sessModule.Get(msg.SessionID()) - if err != nil { - log.Printf("Error getting session info: %v", err) - break - } - // Check all required fields are present - sess["session_duration"] = fmt.Sprintf("%d", *info.Duration) - if sess["user_agent"] == "" && info.UserAgent != "" { - sess["user_agent"] = QUOTES(info.UserAgent) - } - if sess["user_browser"] == "" && info.UserBrowser != "" { - sess["user_browser"] = QUOTES(info.UserBrowser) - } - if sess["user_browser_version"] == "" && info.UserBrowserVersion != "" { - sess["user_browser_version"] = QUOTES(info.UserBrowserVersion) - } - if sess["user_os"] == "" && info.UserOS != "" { - sess["user_os"] = QUOTES(info.UserOS) - } - if sess["user_os_version"] == "" && info.UserOSVersion != "" { - sess["user_os_version"] = QUOTES(info.UserOSVersion) - } - if sess["user_device"] == "" && info.UserDevice != "" { - sess["user_device"] = QUOTES(info.UserDevice) - } - if sess["user_device_type"] == "" && info.UserDeviceType != "" { - sess["user_device_type"] = QUOTES(info.UserDeviceType) - } - if sess["user_device_memory_size"] == "" && info.UserDeviceMemorySize != 0 { - sess["user_device_memory_size"] = fmt.Sprintf("%d", info.UserDeviceMemorySize) - } - if sess["user_device_heap_size"] == "" && info.UserDeviceHeapSize != 0 { - sess["user_device_heap_size"] = fmt.Sprintf("%d", info.UserDeviceHeapSize) - } - if sess["user_country"] == "" && info.UserCountry != "" { - sess["user_country"] = QUOTES(info.UserCountry) - } - if sess["user_uuid"] == "" && info.UserUUID != "" { - sess["user_uuid"] = QUOTES(info.UserUUID) - } - if sess["session_start_timestamp"] == "" && info.Timestamp != 0 { - sess["session_start_timestamp"] = fmt.Sprintf("%d", info.Timestamp) - } - if sess["user_anonymous_id"] == "" && info.UserAnonymousID != nil { - sess["user_anonymous_id"] = QUOTES(*info.UserAnonymousID) - } - if sess["user_id"] == "" && info.UserID != nil { - sess["user_id"] = QUOTES(*info.UserID) - } - if sess["urls_count"] == "" && info.PagesCount != 0 { - sess["urls_count"] = fmt.Sprintf("%d", info.PagesCount) - } - // Check int fields - for _, field := range sessionInts { - if sess[field] == "" { - sess[field] = fmt.Sprintf("%d", 0) - } + if err := s.updateSessionInfoFromCache(msg.SessionID(), sess); err != nil { + log.Printf("Error updating session info from cache: %v", err) } case *messages.ConnectionInformation: sess["connection_effective_bandwidth"] = fmt.Sprintf("%d", m.Downlink) sess["connection_type"] = QUOTES(m.Type) case *messages.Metadata: - sess["metadata_key"] = QUOTES(m.Key) - sess["metadata_value"] = QUOTES(m.Value) + session, err := s.sessModule.Get(msg.SessionID()) + if err != nil { + log.Printf("Error getting session info: %v", err) + break + } + project, err := s.projModule.GetProject(session.ProjectID) + if err != nil { + log.Printf("Error getting project info: %v", err) + break + } + keyNo := project.GetMetadataNo(m.Key) + if keyNo == 0 { + break + } + sess[fmt.Sprintf("metadata_%d", keyNo)] = QUOTES(m.Value) case *messages.PageEvent: sess["referrer"] = QUOTES(m.Referrer) sess["first_contentful_paint"] = fmt.Sprintf("%d", m.FirstContentfulPaint) sess["speed_index"] = fmt.Sprintf("%d", m.SpeedIndex) sess["timing_time_to_interactive"] = fmt.Sprintf("%d", m.TimeToInteractive) sess["visually_complete"] = fmt.Sprintf("%d", m.VisuallyComplete) - currUrlsCount, err := strconv.Atoi(sess["urls_count"]) + currUrlsCount, err := strconv.Atoi(sess["pages_count"]) if err != nil { currUrlsCount = 0 } - sess["urls_count"] = fmt.Sprintf("%d", currUrlsCount+1) + sess["pages_count"] = fmt.Sprintf("%d", currUrlsCount+1) case *messages.PerformanceTrackAggr: sess["avg_cpu"] = fmt.Sprintf("%d", m.AvgCPU) sess["avg_fps"] = fmt.Sprintf("%d", m.AvgFPS) @@ -387,71 +351,15 @@ func (s *Saver) Handle(msg messages.Message) { return } -func eventsToBuffer(batch []map[string]string) *bytes.Buffer { - buf := bytes.NewBuffer(nil) - - // Write header - for _, column := range eventColumns { - buf.WriteString(column + "|") - } - buf.Truncate(buf.Len() - 1) - - // Write data - for _, event := range batch { - buf.WriteString("\n") - for _, column := range eventColumns { - buf.WriteString(event[column] + "|") - } - buf.Truncate(buf.Len() - 1) - } - return buf -} - func (s *Saver) commitEvents() { if len(s.events) == 0 { log.Printf("empty events batch") return } - l := len(s.events) - - // Send data to S3 - fileName := fmt.Sprintf("connector_data/%s-%s.csv", s.cfg.EventsTableName, uuid.New().String()) - // Create csv file - buf := eventsToBuffer(s.events) - // Clear events batch + if err := s.db.InsertEvents(s.events); err != nil { + log.Printf("can't insert events: %s", err) + } s.events = nil - - reader := bytes.NewReader(buf.Bytes()) - if err := s.objStorage.Upload(reader, fileName, "text/csv", objectstorage.NoCompression); err != nil { - log.Printf("can't upload file to s3: %s", err) - return - } - // Copy data from s3 bucket to redshift - if err := s.db.Copy(s.cfg.EventsTableName, fileName, "|", true, false); err != nil { - log.Printf("can't copy data from s3 to redshift: %s", err) - return - } - log.Printf("events batch of %d events is successfully saved", l) -} - -func sessionsToBuffer(batch []map[string]string) *bytes.Buffer { - buf := bytes.NewBuffer(nil) - - // Write header - for _, column := range sessionColumns { - buf.WriteString(column + "|") - } - buf.Truncate(buf.Len() - 1) - - // Write data - for _, sess := range batch { - buf.WriteString("\n") - for _, column := range sessionColumns { - buf.WriteString(sess[column] + "|") - } - buf.Truncate(buf.Len() - 1) - } - return buf } func (s *Saver) commitSessions() { @@ -472,30 +380,16 @@ func (s *Saver) commitSessions() { toSend = append(toSend, sessionID) } } + if err := s.db.InsertSessions(sessions); err != nil { + log.Printf("can't insert sessions: %s", err) + } log.Printf("finished: %d, to keep: %d, to send: %d", l, len(toKeep), len(toSend)) - - // Send data to S3 - fileName := fmt.Sprintf("connector_data/%s-%s.csv", s.cfg.SessionsTableName, uuid.New().String()) - // Create csv file - buf := sessionsToBuffer(sessions) - - reader := bytes.NewReader(buf.Bytes()) - if err := s.objStorage.Upload(reader, fileName, "text/csv", objectstorage.NoCompression); err != nil { - log.Printf("can't upload file to s3: %s", err) - return - } - // Copy data from s3 bucket to redshift - if err := s.db.Copy(s.cfg.SessionsTableName, fileName, "|", true, false); err != nil { - log.Printf("can't copy data from s3 to redshift: %s", err) - return - } // Clear current list of finished sessions for _, sessionID := range toSend { delete(s.sessions, sessionID) // delete session info delete(s.lastUpdate, sessionID) // delete last session update timestamp } s.finishedSessions = toKeep - log.Printf("sessions batch of %d sessions is successfully saved", l) } // Commit saves batch to Redshift @@ -528,6 +422,24 @@ func (s *Saver) checkZombieSessions() { continue } if s.lastUpdate[sessionID].Add(time.Minute * 5).Before(now) { + // Check that session is not in progress, check all critical values (startTs, endTs, etc) + // If session has been finished more than 5 minutes ago, send it to Redshift + // Else update last update timestamp and try to wait for session end. + // Do that several times (save attempts number) after last attempt delete session from memory to avoid sessions with not filled fields + zombieSession := s.sessions[sessionID] + if zombieSession["session_start_timestamp"] == "" || zombieSession["session_end_timestamp"] == "" { + // Let's try to load session from cache + if err := s.updateSessionInfoFromCache(sessionID, zombieSession); err != nil { + log.Printf("Error updating zombie session info from cache: %v", err) + } else { + s.sessions[sessionID] = zombieSession + log.Printf("Updated zombie session info from cache: %v", zombieSession) + } + } + if zombieSession["session_start_timestamp"] == "" || zombieSession["session_end_timestamp"] == "" { + s.lastUpdate[sessionID] = now + continue + } s.finishedSessions = append(s.finishedSessions, sessionID) zombieSessionsCount++ }