Added CH connector (#1476)

* feat(backend): added CH connector

* feat(backend): added fixed clickhouse connector
This commit is contained in:
Alexander 2023-10-16 14:50:51 +02:00 committed by GitHub
parent a2fce7e291
commit 7d4bbf733e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 582 additions and 266 deletions

View file

@ -42,10 +42,18 @@ func (cfg *Postgres) String() string {
// Redshift config
type Redshift struct {
ConnectioString string `env:"REDSHIFT_STRING"`
Host string `env:"REDSHIFT_HOST"`
Port int `env:"REDSHIFT_PORT"`
User string `env:"REDSHIFT_USER"`
Password string `env:"REDSHIFT_PASSWORD"`
Database string `env:"REDSHIFT_DATABASE"`
ConnectionString string `env:"REDSHIFT_STRING"`
Host string `env:"REDSHIFT_HOST"`
Port int `env:"REDSHIFT_PORT"`
User string `env:"REDSHIFT_USER"`
Password string `env:"REDSHIFT_PASSWORD"`
Database string `env:"REDSHIFT_DATABASE"`
}
// Clickhouse config
type Clickhouse struct {
URL string `env:"CLICKHOUSE_STRING"`
UserName string `env:"CLICKHOUSE_USERNAME,default=default"`
Password string `env:"CLICKHOUSE_PASSWORD,default="`
}

View file

@ -2,7 +2,6 @@ package main
import (
"log"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/db/redis"
"openreplay/backend/pkg/projects"
@ -28,10 +27,18 @@ func main() {
log.Fatalf("can't init object storage: %s", err)
}
db, err := saver.NewRedshift(cfg)
if err != nil {
log.Printf("can't init redshift connection: %s", err)
return
var db saver.Database
switch cfg.ConnectorType {
case "redshift":
if db, err = saver.NewRedshift(cfg, objStore); err != nil {
log.Fatalf("can't init redshift connection: %s", err)
}
case "clickhouse":
if db, err = saver.NewClickHouse(cfg); err != nil {
log.Fatalf("can't init clickhouse connection: %s", err)
}
default:
log.Fatalf("unknown connector type: %s", cfg.ConnectorType)
}
defer db.Close()
@ -43,10 +50,6 @@ func main() {
}
defer pgConn.Close()
// Init events module
pg := postgres.NewConn(pgConn)
defer pg.Close()
// Init redis connection
redisClient, err := redis.New(&cfg.Redis)
if err != nil {
@ -58,7 +61,7 @@ func main() {
sessManager := sessions.New(pgConn, projManager, redisClient)
// Saves messages to Redshift
dataSaver := saver.New(cfg, objStore, db, sessManager)
dataSaver := saver.New(cfg, db, sessManager, projManager)
// Message filter
msgFilter := []int{messages.MsgConsoleLog, messages.MsgCustomEvent, messages.MsgJSException,

View file

@ -0,0 +1,193 @@
package connector
import (
"context"
"log"
"strconv"
"strings"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"openreplay/backend/internal/config/connector"
)
type ClickHouse struct {
cfg *connector.Config
conn driver.Conn
}
func NewClickHouse(cfg *connector.Config) (*ClickHouse, error) {
url := cfg.Clickhouse.URL
url = strings.TrimPrefix(url, "tcp://")
url = strings.TrimSuffix(url, "/default")
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{url},
Auth: clickhouse.Auth{
Database: cfg.Clickhouse.Database,
Username: cfg.Clickhouse.UserName,
Password: cfg.Clickhouse.Password,
},
MaxOpenConns: 20,
MaxIdleConns: 15,
ConnMaxLifetime: 3 * time.Minute,
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
},
})
if err != nil {
return nil, err
}
ctx, _ := context.WithTimeout(context.Background(), 15*time.Second)
if err := conn.Ping(ctx); err != nil {
return nil, err
}
c := &ClickHouse{
cfg: cfg,
conn: conn,
}
return c, nil
}
const eventsSQL = "INSERT INTO connector_events_buffer (sessionid, consolelog_level, consolelog_value, customevent_name, customevent_payload, jsexception_message, jsexception_name, jsexception_payload, jsexception_metadata, networkrequest_type, networkrequest_method, networkrequest_url, networkrequest_request, networkrequest_response, networkrequest_status, networkrequest_timestamp, networkrequest_duration, issueevent_message_id, issueevent_timestamp, issueevent_type, issueevent_context_string, issueevent_context, issueevent_payload, issueevent_url, customissue_name, customissue_payload, received_at, batch_order_number) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
func (c *ClickHouse) InsertEvents(batch []map[string]string) error {
bulk, err := c.conn.PrepareBatch(context.Background(), eventsSQL)
if err != nil {
return err
}
for _, event := range batch {
if err := bulk.Append(
Uint64(event["sessionid"]),
nullableString(event["consolelog_level"]),
nullableString(event["consolelog_value"]),
nullableString(event["customevent_name"]),
nullableString(event["customevent_payload"]),
nullableString(event["jsexception_message"]),
nullableString(event["jsexception_name"]),
nullableString(event["jsexception_payload"]),
nullableString(event["jsexception_metadata"]),
nullableString(event["networkrequest_type"]),
nullableString(event["networkrequest_method"]),
nullableString(event["networkrequest_url"]),
nullableString(event["networkrequest_request"]),
nullableString(event["networkrequest_response"]),
nullableUint64(event["networkrequest_status"]),
nullableUint64(event["networkrequest_timestamp"]),
nullableUint64(event["networkrequest_duration"]),
nullableString(event["issueevent_message_id"]),
nullableUint64(event["issueevent_timestamp"]),
nullableString(event["issueevent_type"]),
nullableString(event["issueevent_context_string"]),
nullableString(event["issueevent_context"]),
nullableString(event["issueevent_payload"]),
nullableString(event["issueevent_url"]),
nullableString(event["customissue_name"]),
nullableString(event["customissue_payload"]),
nullableUint64(event["received_at"]),
nullableUint64(event["batch_order_number"]),
); err != nil {
log.Printf("can't append value set to batch, err: %s", err)
}
}
return bulk.Send()
}
const sessionsSQL = "INSERT INTO connector_user_sessions_buffer (sessionid, user_agent, user_browser, user_browser_version, user_country, user_device, user_device_heap_size, user_device_memory_size, user_device_type, user_os, user_os_version, user_uuid, connection_effective_bandwidth, connection_type, referrer, user_anonymous_id, user_id, session_start_timestamp, session_end_timestamp, session_duration, first_contentful_paint, speed_index, visually_complete, timing_time_to_interactive, avg_cpu, avg_fps, max_cpu, max_fps, max_total_js_heap_size, max_used_js_heap_size, js_exceptions_count, inputs_count, clicks_count, issues_count, pages_count, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
func (c *ClickHouse) InsertSessions(batch []map[string]string) error {
bulk, err := c.conn.PrepareBatch(context.Background(), sessionsSQL)
if err != nil {
return err
}
for _, sess := range batch {
if err := bulk.Append(
Uint64(sess["sessionid"]),
nullableString(sess["user_agent"]),
nullableString(sess["user_browser"]),
nullableString(sess["user_browser_version"]),
nullableString(sess["user_country"]),
nullableString(sess["user_device"]),
nullableUint64(sess["user_device_heap_size"]),
nullableUint64(sess["user_device_memory_size"]),
nullableString(sess["user_device_type"]),
nullableString(sess["user_os"]),
nullableString(sess["user_os_version"]),
nullableString(sess["user_uuid"]),
nullableUint64(sess["connection_effective_bandwidth"]),
nullableString(sess["connection_type"]),
nullableString(sess["referrer"]),
nullableString(sess["user_anonymous_id"]),
nullableString(sess["user_id"]),
nullableUint64(sess["session_start_timestamp"]),
nullableUint64(sess["session_end_timestamp"]),
nullableUint64(sess["session_duration"]),
nullableUint64(sess["first_contentful_paint"]),
nullableUint64(sess["speed_index"]),
nullableUint64(sess["visually_complete"]),
nullableUint64(sess["timing_time_to_interactive"]),
nullableUint64(sess["avg_cpu"]),
nullableUint64(sess["avg_fps"]),
nullableUint64(sess["max_cpu"]),
nullableUint64(sess["max_fps"]),
nullableUint64(sess["max_total_js_heap_size"]),
nullableUint64(sess["max_used_js_heap_size"]),
nullableUint64(sess["js_exceptions_count"]),
nullableUint64(sess["inputs_count"]),
nullableUint64(sess["clicks_count"]),
nullableUint64(sess["issues_count"]),
nullableUint64(sess["pages_count"]),
nullableString(sess["metadata_1"]),
nullableString(sess["metadata_2"]),
nullableString(sess["metadata_3"]),
nullableString(sess["metadata_4"]),
nullableString(sess["metadata_5"]),
nullableString(sess["metadata_6"]),
nullableString(sess["metadata_7"]),
nullableString(sess["metadata_8"]),
nullableString(sess["metadata_9"]),
nullableString(sess["metadata_10"]),
); err != nil {
log.Printf("can't append value set to batch, err: %s", err)
}
}
return bulk.Send()
}
func (c *ClickHouse) Close() error {
return c.conn.Close()
}
func Uint64(v string) uint64 {
if v == "" {
return 0
}
res, err := strconv.Atoi(v)
if err != nil {
log.Printf("can't convert string to uint64, err: %s", err)
return 0
}
return uint64(res)
}
func nullableString(v string) *string {
var p *string = nil
if v != "" {
p = &v
}
return p
}
func nullableUint64(v string) *uint64 {
var p *uint64 = nil
if v != "" {
res, err := strconv.Atoi(v)
if err != nil {
log.Printf("can't convert string to uint64, err: %s", err)
return nil
}
a := uint64(res)
return &a
}
return p
}

View file

@ -0,0 +1,7 @@
package connector
type Database interface {
InsertEvents(batch []map[string]string) error
InsertSessions(batch []map[string]string) error
Close() error
}

View file

@ -0,0 +1,107 @@
package connector
import "strconv"
var sessionColumns = []string{
"sessionid",
"user_agent",
"user_browser",
"user_browser_version",
"user_country",
"user_device",
"user_device_heap_size",
"user_device_memory_size",
"user_device_type",
"user_os",
"user_os_version",
"user_uuid",
"connection_effective_bandwidth",
"connection_type",
"referrer",
"user_anonymous_id",
"user_id",
"session_start_timestamp",
"session_end_timestamp",
"session_duration",
"first_contentful_paint",
"speed_index",
"visually_complete",
"timing_time_to_interactive",
"avg_cpu",
"avg_fps",
"max_cpu",
"max_fps",
"max_total_js_heap_size",
"max_used_js_heap_size",
"js_exceptions_count",
"inputs_count",
"clicks_count",
"issues_count",
"pages_count",
"metadata_1",
"metadata_2",
"metadata_3",
"metadata_4",
"metadata_5",
"metadata_6",
"metadata_7",
"metadata_8",
"metadata_9",
"metadata_10",
}
var sessionInts = []string{
"user_device_heap_size",
"user_device_memory_size",
"connection_effective_bandwidth",
"first_contentful_paint",
"speed_index",
"visually_complete",
"timing_time_to_interactive",
"avg_cpu",
"avg_fps",
"max_cpu",
"max_fps",
"max_total_js_heap_size",
"max_used_js_heap_size",
"js_exceptions_count",
"inputs_count",
"clicks_count",
"issues_count",
"pages_count",
}
var eventColumns = []string{
"sessionid",
"consolelog_level",
"consolelog_value",
"customevent_name",
"customevent_payload",
"jsexception_message",
"jsexception_name",
"jsexception_payload",
"jsexception_metadata",
"networkrequest_type",
"networkrequest_method",
"networkrequest_url",
"networkrequest_request",
"networkrequest_response",
"networkrequest_status",
"networkrequest_timestamp",
"networkrequest_duration",
"issueevent_message_id",
"issueevent_timestamp",
"issueevent_type",
"issueevent_context_string",
"issueevent_context",
"issueevent_payload",
"issueevent_url",
"customissue_name",
"customissue_payload",
"received_at",
"batch_order_number",
}
func QUOTES(s string) string {
return strconv.Quote(s)
}

View file

@ -1,10 +1,13 @@
package connector
import (
"bytes"
"context"
"database/sql"
"fmt"
"github.com/google/uuid"
"log"
"openreplay/backend/pkg/objectstorage"
"openreplay/backend/internal/config/connector"
@ -12,17 +15,19 @@ import (
)
type Redshift struct {
cfg *connector.Config
ctx context.Context
db *sql.DB
cfg *connector.Config
ctx context.Context
db *sql.DB
objStorage objectstorage.ObjectStorage
}
func NewRedshift(cfg *connector.Config) (*Redshift, error) {
func NewRedshift(cfg *connector.Config, objStorage objectstorage.ObjectStorage) (*Redshift, error) {
var source string
if cfg.ConnectioString != "" {
source = cfg.ConnectioString
if cfg.ConnectionString != "" {
source = cfg.ConnectionString
} else {
source = fmt.Sprintf("postgres://%s:%s@%s:%d/%s", cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.Database)
source = fmt.Sprintf("postgres://%s:%s@%s:%d/%s",
cfg.Redshift.User, cfg.Redshift.Password, cfg.Redshift.Host, cfg.Redshift.Port, cfg.Redshift.Database)
}
log.Println("Connecting to Redshift Source: ", source)
sqldb, err := sql.Open("postgres", source)
@ -33,12 +38,93 @@ func NewRedshift(cfg *connector.Config) (*Redshift, error) {
return nil, err
}
return &Redshift{
cfg: cfg,
ctx: context.Background(),
db: sqldb,
cfg: cfg,
ctx: context.Background(),
db: sqldb,
objStorage: objStorage,
}, nil
}
func eventsToBuffer(batch []map[string]string) *bytes.Buffer {
buf := bytes.NewBuffer(nil)
// Write header
for _, column := range eventColumns {
buf.WriteString(column + "|")
}
buf.Truncate(buf.Len() - 1)
// Write data
for _, event := range batch {
buf.WriteString("\n")
for _, column := range eventColumns {
buf.WriteString(event[column] + "|")
}
buf.Truncate(buf.Len() - 1)
}
return buf
}
func (r *Redshift) InsertEvents(batch []map[string]string) error {
// Send data to S3
fileName := fmt.Sprintf("connector_data/%s-%s.csv", r.cfg.EventsTableName, uuid.New().String())
// Create csv file
buf := eventsToBuffer(batch)
reader := bytes.NewReader(buf.Bytes())
if err := r.objStorage.Upload(reader, fileName, "text/csv", objectstorage.NoCompression); err != nil {
log.Printf("can't upload file to s3: %s", err)
return err
}
// Copy data from s3 bucket to redshift
if err := r.Copy(r.cfg.EventsTableName, fileName, "|", true, false); err != nil {
log.Printf("can't copy data from s3 to redshift: %s", err)
return err
}
log.Printf("events batch of %d events is successfully saved", len(batch))
return nil
}
func sessionsToBuffer(batch []map[string]string) *bytes.Buffer {
buf := bytes.NewBuffer(nil)
// Write header
for _, column := range sessionColumns {
buf.WriteString(column + "|")
}
buf.Truncate(buf.Len() - 1)
// Write data
for _, sess := range batch {
buf.WriteString("\n")
for _, column := range sessionColumns {
buf.WriteString(sess[column] + "|")
}
buf.Truncate(buf.Len() - 1)
}
return buf
}
func (r *Redshift) InsertSessions(batch []map[string]string) error {
// Send data to S3
fileName := fmt.Sprintf("connector_data/%s-%s.csv", r.cfg.SessionsTableName, uuid.New().String())
// Create csv file
buf := sessionsToBuffer(batch)
reader := bytes.NewReader(buf.Bytes())
if err := r.objStorage.Upload(reader, fileName, "text/csv", objectstorage.NoCompression); err != nil {
log.Printf("can't upload file to s3: %s", err)
return err
}
// Copy data from s3 bucket to redshift
if err := r.Copy(r.cfg.SessionsTableName, fileName, "|", true, false); err != nil {
log.Printf("can't copy data from s3 to redshift: %s", err)
return err
}
log.Printf("sessions batch of %d sessions is successfully saved", len(batch))
return nil
}
func (r *Redshift) Copy(tableName, fileName, delimiter string, creds, gzip bool) error {
var (
credentials string

View file

@ -1,26 +1,24 @@
package connector
import (
"bytes"
"fmt"
"github.com/google/uuid"
"log"
"openreplay/backend/internal/http/geoip"
"openreplay/backend/pkg/projects"
"openreplay/backend/pkg/sessions"
"strconv"
"time"
config "openreplay/backend/internal/config/connector"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/objectstorage"
)
// Saver collect sessions and events and saves them to Redshift
type Saver struct {
cfg *config.Config
objStorage objectstorage.ObjectStorage
db *Redshift
db Database
sessModule sessions.Sessions
projModule projects.Projects
sessions map[uint64]map[string]string
updatedSessions map[uint64]bool
lastUpdate map[uint64]time.Time
@ -28,7 +26,7 @@ type Saver struct {
events []map[string]string
}
func New(cfg *config.Config, objStorage objectstorage.ObjectStorage, db *Redshift, sessions sessions.Sessions) *Saver {
func New(cfg *config.Config, db Database, sessions sessions.Sessions, projects projects.Projects) *Saver {
if cfg == nil {
log.Fatal("connector config is empty")
}
@ -42,110 +40,14 @@ func New(cfg *config.Config, objStorage objectstorage.ObjectStorage, db *Redshif
}
return &Saver{
cfg: cfg,
objStorage: objStorage,
db: db,
sessModule: sessions,
projModule: projects,
updatedSessions: make(map[uint64]bool, 0),
lastUpdate: make(map[uint64]time.Time, 0),
}
}
var sessionColumns = []string{
"sessionid",
"user_agent",
"user_browser",
"user_browser_version",
"user_country",
"user_device",
"user_device_heap_size",
"user_device_memory_size",
"user_device_type",
"user_os",
"user_os_version",
"user_uuid",
"connection_effective_bandwidth",
"connection_type",
"metadata_key",
"metadata_value",
"referrer",
"user_anonymous_id",
"user_id",
"session_start_timestamp",
"session_end_timestamp",
"session_duration",
"first_contentful_paint",
"speed_index",
"visually_complete",
"timing_time_to_interactive",
"avg_cpu",
"avg_fps",
"max_cpu",
"max_fps",
"max_total_js_heap_size",
"max_used_js_heap_size",
"js_exceptions_count",
"inputs_count",
"clicks_count",
"issues_count",
"urls_count",
}
var sessionInts = []string{
"user_device_heap_size",
"user_device_memory_size",
"connection_effective_bandwidth",
"first_contentful_paint",
"speed_index",
"visually_complete",
"timing_time_to_interactive",
"avg_cpu",
"avg_fps",
"max_cpu",
"max_fps",
"max_total_js_heap_size",
"max_used_js_heap_size",
"js_exceptions_count",
"inputs_count",
"clicks_count",
"issues_count",
"urls_count",
}
var eventColumns = []string{
"sessionid",
"consolelog_level",
"consolelog_value",
"customevent_name",
"customevent_payload",
"jsexception_message",
"jsexception_name",
"jsexception_payload",
"jsexception_metadata",
"networkrequest_type",
"networkrequest_method",
"networkrequest_url",
"networkrequest_request",
"networkrequest_response",
"networkrequest_status",
"networkrequest_timestamp",
"networkrequest_duration",
"issueevent_message_id",
"issueevent_timestamp",
"issueevent_type",
"issueevent_context_string",
"issueevent_context",
"issueevent_payload",
"issueevent_url",
"customissue_name",
"customissue_payload",
"received_at",
"batch_order_number",
}
func QUOTES(s string) string {
return strconv.Quote(s)
}
func handleEvent(msg messages.Message) map[string]string {
event := make(map[string]string)
@ -192,6 +94,106 @@ func handleEvent(msg messages.Message) map[string]string {
return event
}
func (s *Saver) updateSessionInfoFromCache(sessID uint64, sess map[string]string) error {
info, err := s.sessModule.Get(sessID)
if err != nil {
return err
}
// Check all required fields are present
if info.Duration != nil {
sess["session_duration"] = fmt.Sprintf("%d", *info.Duration)
}
if sess["session_start_timestamp"] == "" {
sess["session_start_timestamp"] = fmt.Sprintf("%d", info.Timestamp)
}
if sess["session_end_timestamp"] == "" && info.Duration != nil {
sess["session_end_timestamp"] = fmt.Sprintf("%d", info.Timestamp+*info.Duration)
}
if sess["session_duration"] == "" && sess["session_start_timestamp"] != "" && sess["session_end_timestamp"] != "" {
start, err := strconv.Atoi(sess["session_start_timestamp"])
if err != nil {
log.Printf("Error parsing session_start_timestamp: %v", err)
}
end, err := strconv.Atoi(sess["session_end_timestamp"])
if err != nil {
log.Printf("Error parsing session_end_timestamp: %v", err)
}
if start != 0 && end != 0 {
sess["session_duration"] = fmt.Sprintf("%d", end-start)
}
}
if sess["user_agent"] == "" && info.UserAgent != "" {
sess["user_agent"] = QUOTES(info.UserAgent)
}
if sess["user_browser"] == "" && info.UserBrowser != "" {
sess["user_browser"] = QUOTES(info.UserBrowser)
}
if sess["user_browser_version"] == "" && info.UserBrowserVersion != "" {
sess["user_browser_version"] = QUOTES(info.UserBrowserVersion)
}
if sess["user_os"] == "" && info.UserOS != "" {
sess["user_os"] = QUOTES(info.UserOS)
}
if sess["user_os_version"] == "" && info.UserOSVersion != "" {
sess["user_os_version"] = QUOTES(info.UserOSVersion)
}
if sess["user_device"] == "" && info.UserDevice != "" {
sess["user_device"] = QUOTES(info.UserDevice)
}
if sess["user_device_type"] == "" && info.UserDeviceType != "" {
sess["user_device_type"] = QUOTES(info.UserDeviceType)
}
if sess["user_device_memory_size"] == "" && info.UserDeviceMemorySize != 0 {
sess["user_device_memory_size"] = fmt.Sprintf("%d", info.UserDeviceMemorySize)
}
if sess["user_device_heap_size"] == "" && info.UserDeviceHeapSize != 0 {
sess["user_device_heap_size"] = fmt.Sprintf("%d", info.UserDeviceHeapSize)
}
if sess["user_country"] == "" && info.UserCountry != "" {
sess["user_country"] = QUOTES(info.UserCountry)
}
if sess["user_city"] == "" && info.UserCity != "" {
sess["user_city"] = QUOTES(info.UserCity)
}
if sess["user_state"] == "" && info.UserState != "" {
sess["user_state"] = QUOTES(info.UserState)
}
if sess["user_uuid"] == "" && info.UserUUID != "" {
sess["user_uuid"] = QUOTES(info.UserUUID)
}
if sess["session_start_timestamp"] == "" && info.Timestamp != 0 {
sess["session_start_timestamp"] = fmt.Sprintf("%d", info.Timestamp)
}
if sess["user_anonymous_id"] == "" && info.UserAnonymousID != nil {
sess["user_anonymous_id"] = QUOTES(*info.UserAnonymousID)
}
if sess["user_id"] == "" && info.UserID != nil {
sess["user_id"] = QUOTES(*info.UserID)
}
if sess["pages_count"] == "" && info.PagesCount != 0 {
sess["pages_count"] = fmt.Sprintf("%d", info.PagesCount)
}
if sess["tracker_version"] == "" && info.TrackerVersion != "" {
sess["tracker_version"] = QUOTES(info.TrackerVersion)
}
if sess["rev_id"] == "" && info.RevID != "" {
sess["rev_id"] = QUOTES(info.RevID)
}
if info.ErrorsCount != 0 {
sess["errors_count"] = fmt.Sprintf("%d", info.ErrorsCount)
}
if info.IssueScore != 0 {
sess["issue_score"] = fmt.Sprintf("%d", info.IssueScore)
}
// Check int fields
for _, field := range sessionInts {
if sess[field] == "" {
sess[field] = fmt.Sprintf("%d", 0)
}
}
return nil
}
func (s *Saver) handleSession(msg messages.Message) {
// Filter out messages that are not related to session table
switch msg.(type) {
@ -240,85 +242,47 @@ func (s *Saver) handleSession(msg messages.Message) {
sess["user_device_type"] = QUOTES(m.UserDeviceType)
sess["user_device_memory_size"] = fmt.Sprintf("%d", m.UserDeviceMemorySize)
sess["user_device_heap_size"] = fmt.Sprintf("%d", m.UserDeviceHeapSize)
sess["tracker_version"] = QUOTES(m.TrackerVersion)
sess["rev_id"] = QUOTES(m.RevID)
geoInfo := geoip.UnpackGeoRecord(m.UserCountry)
sess["user_country"] = QUOTES(geoInfo.Country)
sess["user_city"] = QUOTES(geoInfo.City)
sess["user_state"] = QUOTES(geoInfo.State)
case *messages.SessionEnd:
sess["session_end_timestamp"] = fmt.Sprintf("%d", m.Timestamp)
info, err := s.sessModule.Get(msg.SessionID())
if err != nil {
log.Printf("Error getting session info: %v", err)
break
}
// Check all required fields are present
sess["session_duration"] = fmt.Sprintf("%d", *info.Duration)
if sess["user_agent"] == "" && info.UserAgent != "" {
sess["user_agent"] = QUOTES(info.UserAgent)
}
if sess["user_browser"] == "" && info.UserBrowser != "" {
sess["user_browser"] = QUOTES(info.UserBrowser)
}
if sess["user_browser_version"] == "" && info.UserBrowserVersion != "" {
sess["user_browser_version"] = QUOTES(info.UserBrowserVersion)
}
if sess["user_os"] == "" && info.UserOS != "" {
sess["user_os"] = QUOTES(info.UserOS)
}
if sess["user_os_version"] == "" && info.UserOSVersion != "" {
sess["user_os_version"] = QUOTES(info.UserOSVersion)
}
if sess["user_device"] == "" && info.UserDevice != "" {
sess["user_device"] = QUOTES(info.UserDevice)
}
if sess["user_device_type"] == "" && info.UserDeviceType != "" {
sess["user_device_type"] = QUOTES(info.UserDeviceType)
}
if sess["user_device_memory_size"] == "" && info.UserDeviceMemorySize != 0 {
sess["user_device_memory_size"] = fmt.Sprintf("%d", info.UserDeviceMemorySize)
}
if sess["user_device_heap_size"] == "" && info.UserDeviceHeapSize != 0 {
sess["user_device_heap_size"] = fmt.Sprintf("%d", info.UserDeviceHeapSize)
}
if sess["user_country"] == "" && info.UserCountry != "" {
sess["user_country"] = QUOTES(info.UserCountry)
}
if sess["user_uuid"] == "" && info.UserUUID != "" {
sess["user_uuid"] = QUOTES(info.UserUUID)
}
if sess["session_start_timestamp"] == "" && info.Timestamp != 0 {
sess["session_start_timestamp"] = fmt.Sprintf("%d", info.Timestamp)
}
if sess["user_anonymous_id"] == "" && info.UserAnonymousID != nil {
sess["user_anonymous_id"] = QUOTES(*info.UserAnonymousID)
}
if sess["user_id"] == "" && info.UserID != nil {
sess["user_id"] = QUOTES(*info.UserID)
}
if sess["urls_count"] == "" && info.PagesCount != 0 {
sess["urls_count"] = fmt.Sprintf("%d", info.PagesCount)
}
// Check int fields
for _, field := range sessionInts {
if sess[field] == "" {
sess[field] = fmt.Sprintf("%d", 0)
}
if err := s.updateSessionInfoFromCache(msg.SessionID(), sess); err != nil {
log.Printf("Error updating session info from cache: %v", err)
}
case *messages.ConnectionInformation:
sess["connection_effective_bandwidth"] = fmt.Sprintf("%d", m.Downlink)
sess["connection_type"] = QUOTES(m.Type)
case *messages.Metadata:
sess["metadata_key"] = QUOTES(m.Key)
sess["metadata_value"] = QUOTES(m.Value)
session, err := s.sessModule.Get(msg.SessionID())
if err != nil {
log.Printf("Error getting session info: %v", err)
break
}
project, err := s.projModule.GetProject(session.ProjectID)
if err != nil {
log.Printf("Error getting project info: %v", err)
break
}
keyNo := project.GetMetadataNo(m.Key)
if keyNo == 0 {
break
}
sess[fmt.Sprintf("metadata_%d", keyNo)] = QUOTES(m.Value)
case *messages.PageEvent:
sess["referrer"] = QUOTES(m.Referrer)
sess["first_contentful_paint"] = fmt.Sprintf("%d", m.FirstContentfulPaint)
sess["speed_index"] = fmt.Sprintf("%d", m.SpeedIndex)
sess["timing_time_to_interactive"] = fmt.Sprintf("%d", m.TimeToInteractive)
sess["visually_complete"] = fmt.Sprintf("%d", m.VisuallyComplete)
currUrlsCount, err := strconv.Atoi(sess["urls_count"])
currUrlsCount, err := strconv.Atoi(sess["pages_count"])
if err != nil {
currUrlsCount = 0
}
sess["urls_count"] = fmt.Sprintf("%d", currUrlsCount+1)
sess["pages_count"] = fmt.Sprintf("%d", currUrlsCount+1)
case *messages.PerformanceTrackAggr:
sess["avg_cpu"] = fmt.Sprintf("%d", m.AvgCPU)
sess["avg_fps"] = fmt.Sprintf("%d", m.AvgFPS)
@ -387,71 +351,15 @@ func (s *Saver) Handle(msg messages.Message) {
return
}
func eventsToBuffer(batch []map[string]string) *bytes.Buffer {
buf := bytes.NewBuffer(nil)
// Write header
for _, column := range eventColumns {
buf.WriteString(column + "|")
}
buf.Truncate(buf.Len() - 1)
// Write data
for _, event := range batch {
buf.WriteString("\n")
for _, column := range eventColumns {
buf.WriteString(event[column] + "|")
}
buf.Truncate(buf.Len() - 1)
}
return buf
}
func (s *Saver) commitEvents() {
if len(s.events) == 0 {
log.Printf("empty events batch")
return
}
l := len(s.events)
// Send data to S3
fileName := fmt.Sprintf("connector_data/%s-%s.csv", s.cfg.EventsTableName, uuid.New().String())
// Create csv file
buf := eventsToBuffer(s.events)
// Clear events batch
if err := s.db.InsertEvents(s.events); err != nil {
log.Printf("can't insert events: %s", err)
}
s.events = nil
reader := bytes.NewReader(buf.Bytes())
if err := s.objStorage.Upload(reader, fileName, "text/csv", objectstorage.NoCompression); err != nil {
log.Printf("can't upload file to s3: %s", err)
return
}
// Copy data from s3 bucket to redshift
if err := s.db.Copy(s.cfg.EventsTableName, fileName, "|", true, false); err != nil {
log.Printf("can't copy data from s3 to redshift: %s", err)
return
}
log.Printf("events batch of %d events is successfully saved", l)
}
func sessionsToBuffer(batch []map[string]string) *bytes.Buffer {
buf := bytes.NewBuffer(nil)
// Write header
for _, column := range sessionColumns {
buf.WriteString(column + "|")
}
buf.Truncate(buf.Len() - 1)
// Write data
for _, sess := range batch {
buf.WriteString("\n")
for _, column := range sessionColumns {
buf.WriteString(sess[column] + "|")
}
buf.Truncate(buf.Len() - 1)
}
return buf
}
func (s *Saver) commitSessions() {
@ -472,30 +380,16 @@ func (s *Saver) commitSessions() {
toSend = append(toSend, sessionID)
}
}
if err := s.db.InsertSessions(sessions); err != nil {
log.Printf("can't insert sessions: %s", err)
}
log.Printf("finished: %d, to keep: %d, to send: %d", l, len(toKeep), len(toSend))
// Send data to S3
fileName := fmt.Sprintf("connector_data/%s-%s.csv", s.cfg.SessionsTableName, uuid.New().String())
// Create csv file
buf := sessionsToBuffer(sessions)
reader := bytes.NewReader(buf.Bytes())
if err := s.objStorage.Upload(reader, fileName, "text/csv", objectstorage.NoCompression); err != nil {
log.Printf("can't upload file to s3: %s", err)
return
}
// Copy data from s3 bucket to redshift
if err := s.db.Copy(s.cfg.SessionsTableName, fileName, "|", true, false); err != nil {
log.Printf("can't copy data from s3 to redshift: %s", err)
return
}
// Clear current list of finished sessions
for _, sessionID := range toSend {
delete(s.sessions, sessionID) // delete session info
delete(s.lastUpdate, sessionID) // delete last session update timestamp
}
s.finishedSessions = toKeep
log.Printf("sessions batch of %d sessions is successfully saved", l)
}
// Commit saves batch to Redshift
@ -528,6 +422,24 @@ func (s *Saver) checkZombieSessions() {
continue
}
if s.lastUpdate[sessionID].Add(time.Minute * 5).Before(now) {
// Check that session is not in progress, check all critical values (startTs, endTs, etc)
// If session has been finished more than 5 minutes ago, send it to Redshift
// Else update last update timestamp and try to wait for session end.
// Do that several times (save attempts number) after last attempt delete session from memory to avoid sessions with not filled fields
zombieSession := s.sessions[sessionID]
if zombieSession["session_start_timestamp"] == "" || zombieSession["session_end_timestamp"] == "" {
// Let's try to load session from cache
if err := s.updateSessionInfoFromCache(sessionID, zombieSession); err != nil {
log.Printf("Error updating zombie session info from cache: %v", err)
} else {
s.sessions[sessionID] = zombieSession
log.Printf("Updated zombie session info from cache: %v", zombieSession)
}
}
if zombieSession["session_start_timestamp"] == "" || zombieSession["session_end_timestamp"] == "" {
s.lastUpdate[sessionID] = now
continue
}
s.finishedSessions = append(s.finishedSessions, sessionID)
zombieSessionsCount++
}