feat(backend): patched redshift connector (#2117)

This commit is contained in:
Alexander 2024-04-22 13:34:23 +02:00 committed by GitHub
parent 5a6969e1eb
commit 22a3dc9f8e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 347 additions and 117 deletions

View file

@ -48,6 +48,7 @@ type Redshift struct {
User string `env:"REDSHIFT_USER"`
Password string `env:"REDSHIFT_PASSWORD"`
Database string `env:"REDSHIFT_DATABASE"`
Bucket string `env:"REDSHIFT_BUCKET,default=rdshftbucket"`
}
// Clickhouse config
@ -58,3 +59,16 @@ type Clickhouse struct {
UserName string `env:"CLICKHOUSE_USERNAME,default=default"`
Password string `env:"CLICKHOUSE_PASSWORD,default="`
}
// ElasticSearch config
type ElasticSearch struct {
URLs string `env:"ELASTICSEARCH_URLS"`
UseAWS bool `env:"ELASTICSEARCH_IN_AWS,default=false"`
User string `env:"ELASTICSEARCH_USER"`
Password string `env:"ELASTICSEARCH_PASSWORD"`
}
func (cfg *ElasticSearch) GetURLs() []string {
return strings.Split(cfg.URLs, ",")
}

View file

@ -14,6 +14,7 @@ type ObjectsConfig struct {
AzureAccountName string `env:"AZURE_ACCOUNT_NAME"`
AzureAccountKey string `env:"AZURE_ACCOUNT_KEY"`
UseS3Tags bool `env:"USE_S3_TAGS,default=true"`
AWSIAMRole string `env:"AWS_IAM_ROLE"`
}
func (c *ObjectsConfig) UseFileTags() bool {

View file

@ -0,0 +1,73 @@
package logger
import (
"context"
"fmt"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"os"
)
type Logger interface {
Debug(ctx context.Context, message string, args ...interface{})
Info(ctx context.Context, message string, args ...interface{})
Warn(ctx context.Context, message string, args ...interface{})
Error(ctx context.Context, message string, args ...interface{})
Fatal(ctx context.Context, message string, args ...interface{})
}
type loggerImpl struct {
l *zap.Logger
}
func New() Logger {
encoderConfig := zap.NewProductionEncoderConfig()
encoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05.000")
jsonEncoder := zapcore.NewJSONEncoder(encoderConfig)
core := zapcore.NewCore(jsonEncoder, zapcore.AddSync(os.Stdout), zap.InfoLevel)
baseLogger := zap.New(core, zap.AddCaller())
logger := baseLogger.WithOptions(zap.AddCallerSkip(1))
return &loggerImpl{l: logger}
}
func (l *loggerImpl) prepare(ctx context.Context, logger *zap.Logger) *zap.Logger {
if sID, ok := ctx.Value("sessionID").(string); ok {
logger = logger.With(zap.String("sessionID", sID))
}
if pID, ok := ctx.Value("projectID").(string); ok {
logger = logger.With(zap.String("projectID", pID))
}
if tVer, ok := ctx.Value("tracker").(string); ok {
logger = logger.With(zap.String("tracker", tVer))
}
if httpMethod, ok := ctx.Value("httpMethod").(string); ok {
logger = logger.With(zap.String("httpMethod", httpMethod))
}
if urlPath, ok := ctx.Value("url").(string); ok {
logger = logger.With(zap.String("url", urlPath))
}
if batch, ok := ctx.Value("batch").(string); ok {
logger = logger.With(zap.String("batch", batch))
}
return logger
}
func (l *loggerImpl) Debug(ctx context.Context, message string, args ...interface{}) {
l.prepare(ctx, l.l.With(zap.String("level", "debug"))).Debug(fmt.Sprintf(message, args...))
}
func (l *loggerImpl) Info(ctx context.Context, message string, args ...interface{}) {
l.prepare(ctx, l.l.With(zap.String("level", "info"))).Info(fmt.Sprintf(message, args...))
}
func (l *loggerImpl) Warn(ctx context.Context, message string, args ...interface{}) {
l.prepare(ctx, l.l.With(zap.String("level", "warn"))).Warn(fmt.Sprintf(message, args...))
}
func (l *loggerImpl) Error(ctx context.Context, message string, args ...interface{}) {
l.prepare(ctx, l.l.With(zap.String("level", "error"))).Error(fmt.Sprintf(message, args...))
}
func (l *loggerImpl) Fatal(ctx context.Context, message string, args ...interface{}) {
l.prepare(ctx, l.l.With(zap.String("level", "fatal"))).Fatal(fmt.Sprintf(message, args...))
}

View file

@ -1,59 +1,59 @@
package main
import (
"log"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/db/redis"
"openreplay/backend/pkg/projects"
"openreplay/backend/pkg/sessions"
"context"
config "openreplay/backend/internal/config/connector"
"openreplay/backend/internal/connector"
saver "openreplay/backend/pkg/connector"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/db/redis"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/memory"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/objectstorage/store"
"openreplay/backend/pkg/projects"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/sessions"
"openreplay/backend/pkg/terminator"
)
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
ctx := context.Background()
log := logger.New()
cfg := config.New()
objStore, err := store.NewStore(&cfg.ObjectsConfig)
if err != nil {
log.Fatalf("can't init object storage: %s", err)
log.Fatal(ctx, "can't init object storage: %s", err)
}
var db saver.Database
switch cfg.ConnectorType {
case "redshift":
if db, err = saver.NewRedshift(cfg, objStore); err != nil {
log.Fatalf("can't init redshift connection: %s", err)
if db, err = saver.NewRedshift(log, cfg, objStore); err != nil {
log.Fatal(ctx, "can't init redshift connection: %s", err)
}
case "clickhouse":
if db, err = saver.NewClickHouse(cfg); err != nil {
log.Fatalf("can't init clickhouse connection: %s", err)
if db, err = saver.NewClickHouse(log, cfg); err != nil {
log.Fatal(ctx, "can't init clickhouse connection: %s", err)
}
default:
log.Fatalf("unknown connector type: %s", cfg.ConnectorType)
log.Fatal(ctx, "unknown connector type: %s", cfg.ConnectorType)
}
defer db.Close()
// Init postgres connection
pgConn, err := pool.New(cfg.Postgres.String())
if err != nil {
log.Printf("can't init postgres connection: %s", err)
return
log.Fatal(ctx, "can't init postgres connection: %s", err)
}
defer pgConn.Close()
// Init redis connection
redisClient, err := redis.New(&cfg.Redis)
if err != nil {
log.Printf("can't init redis connection: %s", err)
log.Warn(ctx, "can't init redis connection: %s", err)
}
defer redisClient.Close()
@ -61,7 +61,7 @@ func main() {
sessManager := sessions.New(pgConn, projManager, redisClient)
// Saves messages to Redshift
dataSaver := saver.New(cfg, db, sessManager, projManager)
dataSaver := saver.New(log, cfg, db, sessManager, projManager)
// Message filter
msgFilter := []int{messages.MsgConsoleLog, messages.MsgCustomEvent, messages.MsgJSException,
@ -69,7 +69,13 @@ func main() {
messages.MsgSessionStart, messages.MsgSessionEnd, messages.MsgConnectionInformation,
messages.MsgMetadata, messages.MsgPageEvent, messages.MsgPerformanceTrackAggr, messages.MsgUserID,
messages.MsgUserAnonymousID, messages.MsgJSException, messages.MsgJSExceptionDeprecated,
messages.MsgInputEvent, messages.MsgMouseClick, messages.MsgIssueEventDeprecated}
messages.MsgInputEvent, messages.MsgMouseClick, messages.MsgIssueEventDeprecated,
// Mobile messages
messages.MsgIOSSessionStart, messages.MsgIOSSessionEnd, messages.MsgIOSUserID, messages.MsgIOSUserAnonymousID,
messages.MsgIOSMetadata, messages.MsgIOSEvent, messages.MsgIOSNetworkCall,
messages.MsgIOSClickEvent, messages.MsgIOSSwipeEvent, messages.MsgIOSInputEvent,
messages.MsgIOSCrash, messages.MsgIOSIssueEvent,
}
// Init consumer
consumer := queue.NewConsumer(
@ -86,12 +92,11 @@ func main() {
// Init memory manager
memoryManager, err := memory.NewManager(cfg.MemoryLimitMB, cfg.MaxMemoryUsage)
if err != nil {
log.Printf("can't init memory manager: %s", err)
return
log.Fatal(ctx, "can't init memory manager: %s", err)
}
// Run service and wait for TERM signal
service := connector.New(cfg, consumer, dataSaver, memoryManager)
log.Printf("Connector service started\n")
log.Info(ctx, "Connector service started")
terminator.Wait(service)
}

View file

@ -2,22 +2,23 @@ package connector
import (
"context"
"log"
"strconv"
"strings"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"openreplay/backend/internal/config/connector"
"openreplay/backend/pkg/logger"
)
type ClickHouse struct {
log logger.Logger
cfg *connector.Config
conn driver.Conn
}
func NewClickHouse(cfg *connector.Config) (*ClickHouse, error) {
func NewClickHouse(log logger.Logger, cfg *connector.Config) (*ClickHouse, error) {
url := cfg.Clickhouse.URL
url = strings.TrimPrefix(url, "tcp://")
url = strings.TrimSuffix(url, "/default")
@ -43,6 +44,7 @@ func NewClickHouse(cfg *connector.Config) (*ClickHouse, error) {
return nil, err
}
c := &ClickHouse{
log: log,
cfg: cfg,
conn: conn,
}
@ -57,8 +59,10 @@ func (c *ClickHouse) InsertEvents(batch []map[string]string) error {
return err
}
for _, event := range batch {
ctx := context.Background()
ctx = context.WithValue(ctx, "sessionID", c.Uint64(ctx, event["sessionid"]))
if err := bulk.Append(
Uint64(event["sessionid"]),
c.Uint64(ctx, event["sessionid"]),
nullableString(event["consolelog_level"]),
nullableString(event["consolelog_value"]),
nullableString(event["customevent_name"]),
@ -72,11 +76,11 @@ func (c *ClickHouse) InsertEvents(batch []map[string]string) error {
nullableString(event["networkrequest_url"]),
nullableString(event["networkrequest_request"]),
nullableString(event["networkrequest_response"]),
nullableUint64(event["networkrequest_status"]),
nullableUint64(event["networkrequest_timestamp"]),
nullableUint64(event["networkrequest_duration"]),
c.nullableUint64(ctx, event["networkrequest_status"]),
c.nullableUint64(ctx, event["networkrequest_timestamp"]),
c.nullableUint64(ctx, event["networkrequest_duration"]),
nullableString(event["issueevent_message_id"]),
nullableUint64(event["issueevent_timestamp"]),
c.nullableUint64(ctx, event["issueevent_timestamp"]),
nullableString(event["issueevent_type"]),
nullableString(event["issueevent_context_string"]),
nullableString(event["issueevent_context"]),
@ -84,10 +88,10 @@ func (c *ClickHouse) InsertEvents(batch []map[string]string) error {
nullableString(event["issueevent_url"]),
nullableString(event["customissue_name"]),
nullableString(event["customissue_payload"]),
nullableUint64(event["received_at"]),
nullableUint64(event["batch_order_number"]),
c.nullableUint64(ctx, event["received_at"]),
c.nullableUint64(ctx, event["batch_order_number"]),
); err != nil {
log.Printf("can't append value set to batch, err: %s", err)
c.log.Error(ctx, "can't append value set to batch, err: ", err)
}
}
return bulk.Send()
@ -101,42 +105,44 @@ func (c *ClickHouse) InsertSessions(batch []map[string]string) error {
return err
}
for _, sess := range batch {
ctx := context.Background()
ctx = context.WithValue(ctx, "sessionID", c.Uint64(ctx, sess["sessionid"]))
if err := bulk.Append(
Uint64(sess["sessionid"]),
c.Uint64(ctx, sess["sessionid"]),
nullableString(sess["user_agent"]),
nullableString(sess["user_browser"]),
nullableString(sess["user_browser_version"]),
nullableString(sess["user_country"]),
nullableString(sess["user_device"]),
nullableUint64(sess["user_device_heap_size"]),
nullableUint64(sess["user_device_memory_size"]),
c.nullableUint64(ctx, sess["user_device_heap_size"]),
c.nullableUint64(ctx, sess["user_device_memory_size"]),
nullableString(sess["user_device_type"]),
nullableString(sess["user_os"]),
nullableString(sess["user_os_version"]),
nullableString(sess["user_uuid"]),
nullableUint64(sess["connection_effective_bandwidth"]),
c.nullableUint64(ctx, sess["connection_effective_bandwidth"]),
nullableString(sess["connection_type"]),
nullableString(sess["referrer"]),
nullableString(sess["user_anonymous_id"]),
nullableString(sess["user_id"]),
nullableUint64(sess["session_start_timestamp"]),
nullableUint64(sess["session_end_timestamp"]),
nullableUint64(sess["session_duration"]),
nullableUint64(sess["first_contentful_paint"]),
nullableUint64(sess["speed_index"]),
nullableUint64(sess["visually_complete"]),
nullableUint64(sess["timing_time_to_interactive"]),
nullableUint64(sess["avg_cpu"]),
nullableUint64(sess["avg_fps"]),
nullableUint64(sess["max_cpu"]),
nullableUint64(sess["max_fps"]),
nullableUint64(sess["max_total_js_heap_size"]),
nullableUint64(sess["max_used_js_heap_size"]),
nullableUint64(sess["js_exceptions_count"]),
nullableUint64(sess["inputs_count"]),
nullableUint64(sess["clicks_count"]),
nullableUint64(sess["issues_count"]),
nullableUint64(sess["pages_count"]),
c.nullableUint64(ctx, sess["session_start_timestamp"]),
c.nullableUint64(ctx, sess["session_end_timestamp"]),
c.nullableUint64(ctx, sess["session_duration"]),
c.nullableUint64(ctx, sess["first_contentful_paint"]),
c.nullableUint64(ctx, sess["speed_index"]),
c.nullableUint64(ctx, sess["visually_complete"]),
c.nullableUint64(ctx, sess["timing_time_to_interactive"]),
c.nullableUint64(ctx, sess["avg_cpu"]),
c.nullableUint64(ctx, sess["avg_fps"]),
c.nullableUint64(ctx, sess["max_cpu"]),
c.nullableUint64(ctx, sess["max_fps"]),
c.nullableUint64(ctx, sess["max_total_js_heap_size"]),
c.nullableUint64(ctx, sess["max_used_js_heap_size"]),
c.nullableUint64(ctx, sess["js_exceptions_count"]),
c.nullableUint64(ctx, sess["inputs_count"]),
c.nullableUint64(ctx, sess["clicks_count"]),
c.nullableUint64(ctx, sess["issues_count"]),
c.nullableUint64(ctx, sess["pages_count"]),
nullableString(sess["metadata_1"]),
nullableString(sess["metadata_2"]),
nullableString(sess["metadata_3"]),
@ -148,7 +154,7 @@ func (c *ClickHouse) InsertSessions(batch []map[string]string) error {
nullableString(sess["metadata_9"]),
nullableString(sess["metadata_10"]),
); err != nil {
log.Printf("can't append value set to batch, err: %s", err)
c.log.Error(ctx, "can't append value set to batch, err: ", err)
}
}
return bulk.Send()
@ -158,13 +164,13 @@ func (c *ClickHouse) Close() error {
return c.conn.Close()
}
func Uint64(v string) uint64 {
func (c *ClickHouse) Uint64(ctx context.Context, v string) uint64 {
if v == "" {
return 0
}
res, err := strconv.Atoi(v)
if err != nil {
log.Printf("can't convert string to uint64, err: %s", err)
c.log.Error(ctx, "can't convert string to uint64, err: %s", err)
return 0
}
return uint64(res)
@ -178,12 +184,12 @@ func nullableString(v string) *string {
return p
}
func nullableUint64(v string) *uint64 {
func (c *ClickHouse) nullableUint64(ctx context.Context, v string) *uint64 {
var p *uint64 = nil
if v != "" {
res, err := strconv.Atoi(v)
if err != nil {
log.Printf("can't convert string to uint64, err: %s", err)
c.log.Error(ctx, "can't convert string to uint64, err: %s", err)
return nil
}
a := uint64(res)

View file

@ -4,10 +4,11 @@ import "strconv"
var sessionColumns = []string{
"sessionid",
"user_agent",
"user_browser",
"user_browser_version",
"user_country",
"user_city",
"user_state",
"user_device",
"user_device_heap_size",
"user_device_memory_size",
@ -20,6 +21,8 @@ var sessionColumns = []string{
"referrer",
"user_anonymous_id",
"user_id",
"tracker_version",
"rev_id",
"session_start_timestamp",
"session_end_timestamp",
"session_duration",
@ -98,8 +101,52 @@ var eventColumns = []string{
"issueevent_url",
"customissue_name",
"customissue_payload",
"received_at",
"batch_order_number",
"mobile_event_name",
"mobile_event_payload",
"mobile_networkcall_type",
"mobile_networkcall_method",
"mobile_networkcall_url",
"mobile_networkcall_request",
"mobile_networkcall_response",
"mobile_networkcall_status",
"mobile_networkcall_timestamp",
"mobile_networkcall_duration",
"mobile_clickevent_x",
"mobile_clickevent_y",
"mobile_clickevent_timestamp",
"mobile_clickevent_label",
"mobile_swipeevent_x",
"mobile_swipeevent_y",
"mobile_swipeevent_timestamp",
"mobile_swipeevent_label",
"mobile_inputevent_label",
"mobile_inputevent_value",
"mobile_crash_name",
"mobile_crash_reason",
"mobile_crash_stacktrace",
"mobile_issueevent_timestamp",
"mobile_issueevent_type",
"mobile_issueevent_context_string",
"mobile_issueevent_context",
"mobile_issueevent_payload",
}
var eventInts = []string{
"networkrequest_status",
"networkrequest_timestamp",
"networkrequest_duration",
"issueevent_message_id",
"issueevent_timestamp",
"mobile_networkcall_status",
"mobile_networkcall_timestamp",
"mobile_networkcall_duration",
"mobile_clickevent_x",
"mobile_clickevent_y",
"mobile_clickevent_timestamp",
"mobile_swipeevent_x",
"mobile_swipeevent_y",
"mobile_swipeevent_timestamp",
"mobile_issueevent_timestamp",
}
func QUOTES(s string) string {

View file

@ -5,23 +5,24 @@ import (
"context"
"database/sql"
"fmt"
"github.com/google/uuid"
"log"
"openreplay/backend/pkg/objectstorage"
_ "github.com/lib/pq"
"openreplay/backend/internal/config/connector"
_ "github.com/lib/pq"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/objectstorage"
)
type Redshift struct {
log logger.Logger
cfg *connector.Config
ctx context.Context
db *sql.DB
objStorage objectstorage.ObjectStorage
}
func NewRedshift(cfg *connector.Config, objStorage objectstorage.ObjectStorage) (*Redshift, error) {
func NewRedshift(log logger.Logger, cfg *connector.Config, objStorage objectstorage.ObjectStorage) (*Redshift, error) {
var source string
if cfg.ConnectionString != "" {
source = cfg.ConnectionString
@ -29,7 +30,7 @@ func NewRedshift(cfg *connector.Config, objStorage objectstorage.ObjectStorage)
source = fmt.Sprintf("postgres://%s:%s@%s:%d/%s",
cfg.Redshift.User, cfg.Redshift.Password, cfg.Redshift.Host, cfg.Redshift.Port, cfg.Redshift.Database)
}
log.Println("Connecting to Redshift Source: ", source)
log.Info(context.Background(), "Connecting to Redshift Source: ", source)
sqldb, err := sql.Open("postgres", source)
if err != nil {
return nil, err
@ -38,6 +39,7 @@ func NewRedshift(cfg *connector.Config, objStorage objectstorage.ObjectStorage)
return nil, err
}
return &Redshift{
log: log,
cfg: cfg,
ctx: context.Background(),
db: sqldb,
@ -73,15 +75,13 @@ func (r *Redshift) InsertEvents(batch []map[string]string) error {
reader := bytes.NewReader(buf.Bytes())
if err := r.objStorage.Upload(reader, fileName, "text/csv", objectstorage.NoCompression); err != nil {
log.Printf("can't upload file to s3: %s", err)
return err
return fmt.Errorf("can't upload file to s3: %s", err)
}
// Copy data from s3 bucket to redshift
if err := r.Copy(r.cfg.EventsTableName, fileName, "|", true, false); err != nil {
log.Printf("can't copy data from s3 to redshift: %s", err)
return err
return fmt.Errorf("can't copy data from s3 to redshift: %s", err)
}
log.Printf("events batch of %d events is successfully saved", len(batch))
r.log.Info(context.Background(), "events batch of %d events is successfully saved", len(batch))
return nil
}
@ -113,15 +113,13 @@ func (r *Redshift) InsertSessions(batch []map[string]string) error {
reader := bytes.NewReader(buf.Bytes())
if err := r.objStorage.Upload(reader, fileName, "text/csv", objectstorage.NoCompression); err != nil {
log.Printf("can't upload file to s3: %s", err)
return err
return fmt.Errorf("can't upload file to s3: %s", err)
}
// Copy data from s3 bucket to redshift
if err := r.Copy(r.cfg.SessionsTableName, fileName, "|", true, false); err != nil {
log.Printf("can't copy data from s3 to redshift: %s", err)
return err
return fmt.Errorf("can't copy data from s3 to redshift: %s", err)
}
log.Printf("sessions batch of %d sessions is successfully saved", len(batch))
r.log.Info(context.Background(), "sessions batch of %d sessions is successfully saved", len(batch))
return nil
}
@ -131,19 +129,23 @@ func (r *Redshift) Copy(tableName, fileName, delimiter string, creds, gzip bool)
gzipSQL string
)
if creds {
credentials = fmt.Sprintf(`ACCESS_KEY_ID '%s' SECRET_ACCESS_KEY '%s'`, r.cfg.AWSAccessKeyID, r.cfg.AWSSecretAccessKey)
if r.cfg.AWSAccessKeyID != "" && r.cfg.AWSSecretAccessKey != "" {
credentials = fmt.Sprintf(`ACCESS_KEY_ID '%s' SECRET_ACCESS_KEY '%s'`, r.cfg.AWSAccessKeyID, r.cfg.AWSSecretAccessKey)
} else if r.cfg.AWSIAMRole != "" {
credentials = fmt.Sprintf(`IAM_ROLE '%s'`, "r.cfg.AWSIAMRole")
} else {
credentials = "IAM_ROLE default"
}
}
if gzip {
gzipSQL = "GZIP"
}
bucketName := "rdshftbucket"
filePath := fmt.Sprintf("s3://%s/%s", bucketName, fileName)
filePath := fmt.Sprintf("s3://%s/%s", r.cfg.Redshift.Bucket, fileName)
copySQL := fmt.Sprintf(`COPY "%s" FROM '%s' WITH %s TIMEFORMAT 'auto' DATEFORMAT 'auto' TRUNCATECOLUMNS
STATUPDATE ON %s DELIMITER AS '%s' IGNOREHEADER 1 REMOVEQUOTES ESCAPE TRIMBLANKS EMPTYASNULL ACCEPTANYDATE`,
copySQL := fmt.Sprintf(`COPY "%s" FROM '%s' WITH %s TIMEFORMAT 'auto' DATEFORMAT 'auto' TRUNCATECOLUMNS STATUPDATE ON %s DELIMITER AS '%s' IGNOREHEADER 1 REMOVEQUOTES ESCAPE TRIMBLANKS EMPTYASNULL ACCEPTANYDATE`,
tableName, filePath, gzipSQL, credentials, delimiter)
log.Printf("Running command: %s", copySQL)
r.log.Debug(context.Background(), "Executing COPY SQL: %s", copySQL)
_, err := r.db.ExecContext(r.ctx, copySQL)
return err

View file

@ -1,20 +1,22 @@
package connector
import (
"context"
"fmt"
"log"
"openreplay/backend/internal/http/geoip"
"openreplay/backend/pkg/projects"
"openreplay/backend/pkg/sessions"
"strconv"
"time"
config "openreplay/backend/internal/config/connector"
"openreplay/backend/internal/http/geoip"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/projects"
"openreplay/backend/pkg/sessions"
)
// Saver collect sessions and events and saves them to Redshift
type Saver struct {
log logger.Logger
cfg *config.Config
db Database
sessModule sessions.Sessions
@ -26,19 +28,21 @@ type Saver struct {
events []map[string]string
}
func New(cfg *config.Config, db Database, sessions sessions.Sessions, projects projects.Projects) *Saver {
func New(log logger.Logger, cfg *config.Config, db Database, sessions sessions.Sessions, projects projects.Projects) *Saver {
ctx := context.Background()
if cfg == nil {
log.Fatal("connector config is empty")
log.Fatal(ctx, "connector config is empty")
}
// Validate column names in sessions table
if err := validateColumnNames(sessionColumns); err != nil {
log.Printf("can't validate column names: %s", err)
log.Error(ctx, "can't validate sessions column names: %s", err)
}
// Validate column names in events table
if err := validateColumnNames(eventColumns); err != nil {
log.Printf("can't validate column names: %s", err)
log.Error(ctx, "can't validate events column names: %s", err)
}
return &Saver{
log: log,
cfg: cfg,
db: db,
sessModule: sessions,
@ -83,14 +87,48 @@ func handleEvent(msg messages.Message) map[string]string {
case *messages.CustomIssue:
event["customissue_name"] = QUOTES(m.Name)
event["customissue_payload"] = QUOTES(m.Payload)
// Mobile events
case *messages.IOSEvent:
event["mobile_event_name"] = QUOTES(m.Name)
event["mobile_event_payload"] = QUOTES(m.Payload)
case *messages.IOSNetworkCall:
event["mobile_networkcall_type"] = QUOTES(m.Type)
event["mobile_networkcall_method"] = QUOTES(m.Method)
event["mobile_networkcall_url"] = QUOTES(m.URL)
event["mobile_networkcall_request"] = QUOTES(m.Request)
event["mobile_networkcall_response"] = QUOTES(m.Response)
event["mobile_networkcall_status"] = fmt.Sprintf("%d", m.Status)
event["mobile_networkcall_timestamp"] = fmt.Sprintf("%d", m.Timestamp)
event["mobile_networkcall_duration"] = fmt.Sprintf("%d", m.Duration)
case *messages.IOSClickEvent:
event["mobile_clickevent_x"] = fmt.Sprintf("%d", m.X)
event["mobile_clickevent_y"] = fmt.Sprintf("%d", m.Y)
event["mobile_clickevent_timestamp"] = fmt.Sprintf("%d", m.Timestamp)
event["mobile_clickevent_label"] = QUOTES(m.Label)
case *messages.IOSSwipeEvent:
event["mobile_swipeevent_x"] = fmt.Sprintf("%d", m.X)
event["mobile_swipeevent_y"] = fmt.Sprintf("%d", m.Y)
event["mobile_swipeevent_timestamp"] = fmt.Sprintf("%d", m.Timestamp)
event["mobile_swipeevent_label"] = QUOTES(m.Label)
case *messages.IOSInputEvent:
event["mobile_inputevent_label"] = QUOTES(m.Label)
event["mobile_inputevent_value"] = QUOTES(m.Value)
case *messages.IOSCrash:
event["mobile_crash_name"] = QUOTES(m.Name)
event["mobile_crash_reason"] = QUOTES(m.Reason)
event["mobile_crash_stacktrace"] = QUOTES(m.Stacktrace)
case *messages.IOSIssueEvent:
event["mobile_issueevent_timestamp"] = fmt.Sprintf("%d", m.Timestamp)
event["mobile_issueevent_type"] = QUOTES(m.Type)
event["mobile_issueevent_context_string"] = QUOTES(m.ContextString)
event["mobile_issueevent_context"] = QUOTES(m.Context)
event["mobile_issueevent_payload"] = QUOTES(m.Payload)
}
if len(event) == 0 {
return nil
}
event["sessionid"] = fmt.Sprintf("%d", msg.SessionID())
event["received_at"] = fmt.Sprintf("%d", uint64(time.Now().UnixMilli()))
event["batch_order_number"] = fmt.Sprintf("%d", 0)
return event
}
@ -110,21 +148,19 @@ func (s *Saver) updateSessionInfoFromCache(sessID uint64, sess map[string]string
sess["session_end_timestamp"] = fmt.Sprintf("%d", info.Timestamp+*info.Duration)
}
if sess["session_duration"] == "" && sess["session_start_timestamp"] != "" && sess["session_end_timestamp"] != "" {
ctx := context.WithValue(context.Background(), "sessionID", sessID)
start, err := strconv.Atoi(sess["session_start_timestamp"])
if err != nil {
log.Printf("Error parsing session_start_timestamp: %v", err)
s.log.Error(ctx, "error parsing session_start_timestamp: %s", err)
}
end, err := strconv.Atoi(sess["session_end_timestamp"])
if err != nil {
log.Printf("Error parsing session_end_timestamp: %v", err)
s.log.Error(ctx, "error parsing session_end_timestamp: %s", err)
}
if start != 0 && end != 0 {
sess["session_duration"] = fmt.Sprintf("%d", end-start)
}
}
if sess["user_agent"] == "" && info.UserAgent != "" {
sess["user_agent"] = QUOTES(info.UserAgent)
}
if sess["user_browser"] == "" && info.UserBrowser != "" {
sess["user_browser"] = QUOTES(info.UserBrowser)
}
@ -200,19 +236,23 @@ func (s *Saver) handleSession(msg messages.Message) {
case *messages.SessionStart, *messages.SessionEnd, *messages.ConnectionInformation, *messages.Metadata,
*messages.PageEvent, *messages.PerformanceTrackAggr, *messages.UserID, *messages.UserAnonymousID,
*messages.JSException, *messages.JSExceptionDeprecated, *messages.InputEvent, *messages.MouseClick,
*messages.IssueEvent, *messages.IssueEventDeprecated:
*messages.IssueEvent, *messages.IssueEventDeprecated,
// Mobile messages
*messages.IOSSessionStart, *messages.IOSSessionEnd, *messages.IOSUserID, *messages.IOSUserAnonymousID,
*messages.IOSMetadata:
default:
return
}
if s.sessions == nil {
s.sessions = make(map[uint64]map[string]string)
}
ctx := context.WithValue(context.Background(), "sessionID", msg.SessionID())
sess, ok := s.sessions[msg.SessionID()]
if !ok {
// Try to load session from cache
cached, err := s.sessModule.GetCached(msg.SessionID())
if err != nil && err != sessions.ErrSessionNotFound {
log.Printf("Failed to get cached session: %v", err)
s.log.Warn(ctx, "failed to get cached session: %s", err)
}
if cached != nil {
sess = cached
@ -233,7 +273,6 @@ func (s *Saver) handleSession(msg messages.Message) {
case *messages.SessionStart:
sess["session_start_timestamp"] = fmt.Sprintf("%d", m.Timestamp)
sess["user_uuid"] = QUOTES(m.UserUUID)
sess["user_agent"] = QUOTES(m.UserAgent)
sess["user_os"] = QUOTES(m.UserOS)
sess["user_os_version"] = QUOTES(m.UserOSVersion)
sess["user_browser"] = QUOTES(m.UserBrowser)
@ -251,7 +290,7 @@ func (s *Saver) handleSession(msg messages.Message) {
case *messages.SessionEnd:
sess["session_end_timestamp"] = fmt.Sprintf("%d", m.Timestamp)
if err := s.updateSessionInfoFromCache(msg.SessionID(), sess); err != nil {
log.Printf("Error updating session info from cache: %v", err)
s.log.Warn(ctx, "failed to update session info from cache: %s", err)
}
case *messages.ConnectionInformation:
sess["connection_effective_bandwidth"] = fmt.Sprintf("%d", m.Downlink)
@ -259,12 +298,12 @@ func (s *Saver) handleSession(msg messages.Message) {
case *messages.Metadata:
session, err := s.sessModule.Get(msg.SessionID())
if err != nil {
log.Printf("Error getting session info: %v", err)
s.log.Error(ctx, "error getting session info: %s", err)
break
}
project, err := s.projModule.GetProject(session.ProjectID)
if err != nil {
log.Printf("Error getting project info: %v", err)
s.log.Error(ctx, "error getting project info: %s", err)
break
}
keyNo := project.GetMetadataNo(m.Key)
@ -320,6 +359,43 @@ func (s *Saver) handleSession(msg messages.Message) {
currIssuesCount = 0
}
sess["issues_count"] = fmt.Sprintf("%d", currIssuesCount+1)
// Mobile messages
case *messages.IOSSessionStart:
sess["session_start_timestamp"] = fmt.Sprintf("%d", m.Timestamp)
sess["user_uuid"] = QUOTES(m.UserUUID)
sess["user_os"] = QUOTES(m.UserOS)
sess["user_os_version"] = QUOTES(m.UserOSVersion)
sess["user_device"] = QUOTES(m.UserDevice)
sess["user_device_type"] = QUOTES(m.UserDeviceType)
sess["tracker_version"] = QUOTES(m.TrackerVersion)
sess["rev_id"] = QUOTES(m.RevID)
case *messages.IOSSessionEnd:
sess["session_end_timestamp"] = fmt.Sprintf("%d", m.Timestamp)
if err := s.updateSessionInfoFromCache(msg.SessionID(), sess); err != nil {
s.log.Warn(ctx, "failed to update session info from cache: %s", err)
}
case *messages.IOSMetadata:
session, err := s.sessModule.Get(msg.SessionID())
if err != nil {
s.log.Error(ctx, "error getting session info: %s", err)
break
}
project, err := s.projModule.GetProject(session.ProjectID)
if err != nil {
s.log.Error(ctx, "error getting project info: %s", err)
break
}
keyNo := project.GetMetadataNo(m.Key)
if keyNo == 0 {
break
}
sess[fmt.Sprintf("metadata_%d", keyNo)] = QUOTES(m.Value)
case *messages.IOSUserID:
if m.ID != "" {
sess["user_id"] = QUOTES(m.ID)
}
case *messages.IOSUserAnonymousID:
sess["user_anonymous_id"] = QUOTES(m.ID)
default:
updated = false
}
@ -342,7 +418,7 @@ func (s *Saver) Handle(msg messages.Message) {
s.events = append(s.events, newEvent)
}
s.handleSession(msg)
if msg.TypeID() == messages.MsgSessionEnd {
if msg.TypeID() == messages.MsgSessionEnd || msg.TypeID() == messages.MsgIOSSessionEnd {
if s.finishedSessions == nil {
s.finishedSessions = make([]uint64, 0)
}
@ -353,18 +429,18 @@ func (s *Saver) Handle(msg messages.Message) {
func (s *Saver) commitEvents() {
if len(s.events) == 0 {
log.Printf("empty events batch")
s.log.Info(context.Background(), "empty events batch")
return
}
if err := s.db.InsertEvents(s.events); err != nil {
log.Printf("can't insert events: %s", err)
s.log.Error(context.Background(), "can't insert events: %s", err)
}
s.events = nil
}
func (s *Saver) commitSessions() {
if len(s.finishedSessions) == 0 {
log.Printf("empty sessions batch")
s.log.Info(context.Background(), "empty sessions batch")
return
}
l := len(s.finishedSessions)
@ -380,10 +456,14 @@ func (s *Saver) commitSessions() {
toSend = append(toSend, sessionID)
}
}
if err := s.db.InsertSessions(sessions); err != nil {
log.Printf("can't insert sessions: %s", err)
if len(sessions) == 0 {
s.log.Info(context.Background(), "empty sessions batch to send")
return
}
log.Printf("finished: %d, to keep: %d, to send: %d", l, len(toKeep), len(toSend))
if err := s.db.InsertSessions(sessions); err != nil {
s.log.Error(context.Background(), "can't insert sessions: %s", err)
}
s.log.Info(context.Background(), "finished: %d, to keep: %d, to send: %d", l, len(toKeep), len(toSend))
// Clear current list of finished sessions
for _, sessionID := range toSend {
delete(s.sessions, sessionID) // delete session info
@ -398,10 +478,11 @@ func (s *Saver) Commit() {
start := time.Now()
for sessionID, _ := range s.updatedSessions {
if err := s.sessModule.AddCached(sessionID, s.sessions[sessionID]); err != nil {
log.Printf("Error adding session to cache: %v", err)
ctx := context.WithValue(context.Background(), "sessionID", sessionID)
s.log.Error(ctx, "can't add session to cache: %s", err)
}
}
log.Printf("Cached %d sessions in %s", len(s.updatedSessions), time.Since(start))
s.log.Info(context.Background(), "cached %d sessions in %s", len(s.updatedSessions), time.Since(start))
s.updatedSessions = nil
// Commit events and sessions (send to Redshift)
s.commitEvents()
@ -428,12 +509,13 @@ func (s *Saver) checkZombieSessions() {
// Do that several times (save attempts number) after last attempt delete session from memory to avoid sessions with not filled fields
zombieSession := s.sessions[sessionID]
if zombieSession["session_start_timestamp"] == "" || zombieSession["session_end_timestamp"] == "" {
ctx := context.WithValue(context.Background(), "sessionID", sessionID)
// Let's try to load session from cache
if err := s.updateSessionInfoFromCache(sessionID, zombieSession); err != nil {
log.Printf("Error updating zombie session info from cache: %v", err)
s.log.Warn(ctx, "failed to update zombie session info from cache: %s", err)
} else {
s.sessions[sessionID] = zombieSession
log.Printf("Updated zombie session info from cache: %v", zombieSession)
s.log.Debug(ctx, "updated zombie session info from cache: %v", zombieSession)
}
}
if zombieSession["session_start_timestamp"] == "" || zombieSession["session_end_timestamp"] == "" {
@ -445,7 +527,7 @@ func (s *Saver) checkZombieSessions() {
}
}
if zombieSessionsCount > 0 {
log.Printf("Found %d zombie sessions", zombieSessionsCount)
s.log.Info(context.Background(), "found %d zombie sessions", zombieSessionsCount)
}
}