feat(connector): added s3 connector + small improvements

This commit is contained in:
Alexander 2024-05-22 15:23:45 +02:00
parent bdcde733a9
commit de9ffeb6a9
9 changed files with 268 additions and 112 deletions

View file

@ -48,6 +48,7 @@ type Redshift struct {
User string `env:"REDSHIFT_USER"`
Password string `env:"REDSHIFT_PASSWORD"`
Database string `env:"REDSHIFT_DATABASE"`
Bucket string `env:"REDSHIFT_BUCKET,default=rdshftbucket"`
}
// Clickhouse config

View file

@ -14,6 +14,7 @@ type ObjectsConfig struct {
AzureAccountName string `env:"AZURE_ACCOUNT_NAME"`
AzureAccountKey string `env:"AZURE_ACCOUNT_KEY"`
UseS3Tags bool `env:"USE_S3_TAGS,default=true"`
AWSIAMRole string `env:"AWS_IAM_ROLE"`
}
func (c *ObjectsConfig) UseFileTags() bool {

View file

@ -9,6 +9,7 @@ import (
)
type Logger interface {
Debug(ctx context.Context, message string, args ...interface{})
Info(ctx context.Context, message string, args ...interface{})
Warn(ctx context.Context, message string, args ...interface{})
Error(ctx context.Context, message string, args ...interface{})
@ -51,6 +52,10 @@ func (l *loggerImpl) prepare(ctx context.Context, logger *zap.Logger) *zap.Logge
return logger
}
func (l *loggerImpl) Debug(ctx context.Context, message string, args ...interface{}) {
l.prepare(ctx, l.l.With(zap.String("level", "debug"))).Debug(fmt.Sprintf(message, args...))
}
func (l *loggerImpl) Info(ctx context.Context, message string, args ...interface{}) {
l.prepare(ctx, l.l.With(zap.String("level", "info"))).Info(fmt.Sprintf(message, args...))
}

View file

@ -27,21 +27,29 @@ func main() {
if err != nil {
log.Fatal(ctx, "can't init object storage: %s", err)
}
batches, err := saver.NewBatches(cfg, objStore)
if err != nil {
log.Fatal(ctx, "can't init s3 buckets: %s", err)
}
var db saver.Database
switch cfg.ConnectorType {
case "redshift":
if db, err = saver.NewRedshift(log, cfg, objStore); err != nil {
if db, err = saver.NewRedshift(log, cfg, batches); err != nil {
log.Fatal(ctx, "can't init redshift connection: %s", err)
}
case "clickhouse":
if db, err = saver.NewClickHouse(log, cfg); err != nil {
if db, err = saver.NewClickHouse(log, cfg, batches); err != nil {
log.Fatal(ctx, "can't init clickhouse connection: %s", err)
}
case "elasticsearch":
if db, err = saver.NewElasticSearch(log, cfg); err != nil {
log.Fatal(ctx, "can't init elasticsearch connection: %s", err)
}
case "s3":
if db, err = saver.NewS3Storage(log, cfg, batches); err != nil {
log.Fatal(ctx, "can't init s3 connection: %s", err)
}
default:
log.Fatal(ctx, "unknown connector type: %s", cfg.ConnectorType)
}
@ -73,7 +81,13 @@ func main() {
messages.MsgSessionStart, messages.MsgSessionEnd, messages.MsgConnectionInformation,
messages.MsgMetadata, messages.MsgPageEvent, messages.MsgPerformanceTrackAggr, messages.MsgUserID,
messages.MsgUserAnonymousID, messages.MsgJSException, messages.MsgJSExceptionDeprecated,
messages.MsgInputEvent, messages.MsgMouseClick, messages.MsgIssueEventDeprecated}
messages.MsgInputEvent, messages.MsgMouseClick, messages.MsgIssueEventDeprecated,
// Mobile messages
messages.MsgMobileSessionStart, messages.MsgMobileSessionEnd, messages.MsgMobileUserID, messages.MsgMobileUserAnonymousID,
messages.MsgMobileMetadata, messages.MsgMobileEvent, messages.MsgMobileNetworkCall,
messages.MsgMobileClickEvent, messages.MsgMobileSwipeEvent, messages.MsgMobileInputEvent,
messages.MsgMobileCrash, messages.MsgMobileIssueEvent,
}
// Init consumer
consumer := queue.NewConsumer(

View file

@ -13,12 +13,13 @@ import (
)
type ClickHouse struct {
log logger.Logger
cfg *connector.Config
conn driver.Conn
log logger.Logger
cfg *connector.Config
conn driver.Conn
batches *Batches
}
func NewClickHouse(log logger.Logger, cfg *connector.Config) (*ClickHouse, error) {
func NewClickHouse(log logger.Logger, cfg *connector.Config, batches *Batches) (*ClickHouse, error) {
url := cfg.Clickhouse.URL
url = strings.TrimPrefix(url, "tcp://")
url = strings.TrimSuffix(url, "/default")
@ -44,16 +45,21 @@ func NewClickHouse(log logger.Logger, cfg *connector.Config) (*ClickHouse, error
return nil, err
}
c := &ClickHouse{
log: log,
cfg: cfg,
conn: conn,
log: log,
cfg: cfg,
conn: conn,
batches: batches,
}
return c, nil
}
func (c *ClickHouse) InsertEvents(batch []map[string]string) error {
return c.insertEventsUsingBuffer(batch)
}
const eventsSQL = "INSERT INTO connector_events_buffer (sessionid, consolelog_level, consolelog_value, customevent_name, customevent_payload, jsexception_message, jsexception_name, jsexception_payload, jsexception_metadata, networkrequest_type, networkrequest_method, networkrequest_url, networkrequest_request, networkrequest_response, networkrequest_status, networkrequest_timestamp, networkrequest_duration, issueevent_message_id, issueevent_timestamp, issueevent_type, issueevent_context_string, issueevent_context, issueevent_payload, issueevent_url, customissue_name, customissue_payload, received_at, batch_order_number) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
func (c *ClickHouse) InsertEvents(batch []map[string]string) error {
func (c *ClickHouse) insertEventsUsingBuffer(batch []map[string]string) error {
bulk, err := c.conn.PrepareBatch(context.Background(), eventsSQL)
if err != nil {
return err
@ -97,9 +103,13 @@ func (c *ClickHouse) InsertEvents(batch []map[string]string) error {
return bulk.Send()
}
func (c *ClickHouse) InsertSessions(batch []map[string]string) error {
return c.insertSessionsUsingBuffer(batch)
}
const sessionsSQL = "INSERT INTO connector_user_sessions_buffer (sessionid, user_agent, user_browser, user_browser_version, user_country, user_device, user_device_heap_size, user_device_memory_size, user_device_type, user_os, user_os_version, user_uuid, connection_effective_bandwidth, connection_type, referrer, user_anonymous_id, user_id, session_start_timestamp, session_end_timestamp, session_duration, first_contentful_paint, speed_index, visually_complete, timing_time_to_interactive, avg_cpu, avg_fps, max_cpu, max_fps, max_total_js_heap_size, max_used_js_heap_size, js_exceptions_count, inputs_count, clicks_count, issues_count, pages_count, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
func (c *ClickHouse) InsertSessions(batch []map[string]string) error {
func (c *ClickHouse) insertSessionsUsingBuffer(batch []map[string]string) error {
bulk, err := c.conn.PrepareBatch(context.Background(), sessionsSQL)
if err != nil {
return err

View file

@ -1,28 +1,25 @@
package connector
import (
"bytes"
"context"
"database/sql"
"fmt"
"github.com/google/uuid"
_ "github.com/lib/pq"
"openreplay/backend/internal/config/connector"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/objectstorage"
)
type Redshift struct {
log logger.Logger
cfg *connector.Config
ctx context.Context
db *sql.DB
objStorage objectstorage.ObjectStorage
log logger.Logger
cfg *connector.Config
ctx context.Context
db *sql.DB
batches *Batches
}
func NewRedshift(log logger.Logger, cfg *connector.Config, objStorage objectstorage.ObjectStorage) (*Redshift, error) {
func NewRedshift(log logger.Logger, cfg *connector.Config, batches *Batches) (*Redshift, error) {
var source string
if cfg.ConnectionString != "" {
source = cfg.ConnectionString
@ -39,117 +36,68 @@ func NewRedshift(log logger.Logger, cfg *connector.Config, objStorage objectstor
return nil, err
}
return &Redshift{
log: log,
cfg: cfg,
ctx: context.Background(),
db: sqldb,
objStorage: objStorage,
log: log,
cfg: cfg,
ctx: context.Background(),
db: sqldb,
batches: batches,
}, nil
}
func eventsToBuffer(batch []map[string]string) *bytes.Buffer {
buf := bytes.NewBuffer(nil)
// Write header
for _, column := range eventColumns {
buf.WriteString(column + "|")
}
buf.Truncate(buf.Len() - 1)
// Write data
for _, event := range batch {
buf.WriteString("\n")
for _, column := range eventColumns {
buf.WriteString(event[column] + "|")
}
buf.Truncate(buf.Len() - 1)
}
return buf
}
func (r *Redshift) InsertEvents(batch []map[string]string) error {
// Send data to S3
fileName := fmt.Sprintf("connector_data/%s-%s.csv", r.cfg.EventsTableName, uuid.New().String())
// Create csv file
buf := eventsToBuffer(batch)
reader := bytes.NewReader(buf.Bytes())
if err := r.objStorage.Upload(reader, fileName, "text/csv", objectstorage.NoCompression); err != nil {
return fmt.Errorf("can't upload file to s3: %s", err)
}
// Copy data from s3 bucket to redshift
if err := r.Copy(r.cfg.EventsTableName, fileName, "|", true, false); err != nil {
return fmt.Errorf("can't copy data from s3 to redshift: %s", err)
}
r.log.Info(context.Background(), "events batch of %d events is successfully saved", len(batch))
return nil
}
func sessionsToBuffer(batch []map[string]string) *bytes.Buffer {
buf := bytes.NewBuffer(nil)
// Write header
for _, column := range sessionColumns {
buf.WriteString(column + "|")
}
buf.Truncate(buf.Len() - 1)
// Write data
for _, sess := range batch {
buf.WriteString("\n")
for _, column := range sessionColumns {
buf.WriteString(sess[column] + "|")
}
buf.Truncate(buf.Len() - 1)
}
return buf
}
func (r *Redshift) InsertSessions(batch []map[string]string) error {
// Send data to S3
fileName := fmt.Sprintf("connector_data/%s-%s.csv", r.cfg.SessionsTableName, uuid.New().String())
// Create csv file
buf := sessionsToBuffer(batch)
reader := bytes.NewReader(buf.Bytes())
if err := r.objStorage.Upload(reader, fileName, "text/csv", objectstorage.NoCompression); err != nil {
return fmt.Errorf("can't upload file to s3: %s", err)
fileName := generateName(r.cfg.SessionsTableName)
if err := r.batches.Insert(batch, fileName, sessionColumns); err != nil {
return fmt.Errorf("can't insert sessions batch: %s", err)
}
// Copy data from s3 bucket to redshift
if err := r.Copy(r.cfg.SessionsTableName, fileName, "|", true, false); err != nil {
if err := r.copy(r.cfg.SessionsTableName, fileName, "|", true, false); err != nil {
return fmt.Errorf("can't copy data from s3 to redshift: %s", err)
}
r.log.Info(context.Background(), "sessions batch of %d sessions is successfully saved", len(batch))
return nil
}
func (r *Redshift) Copy(tableName, fileName, delimiter string, creds, gzip bool) error {
func (r *Redshift) InsertEvents(batch []map[string]string) error {
fileName := generateName(r.cfg.EventsTableName)
if err := r.batches.Insert(batch, fileName, eventColumns); err != nil {
return fmt.Errorf("can't insert events batch: %s", err)
}
// Copy data from s3 bucket to redshift
if err := r.copy(r.cfg.EventsTableName, fileName, "|", true, false); err != nil {
return fmt.Errorf("can't copy data from s3 to redshift: %s", err)
}
r.log.Info(context.Background(), "events batch of %d events is successfully saved", len(batch))
return nil
}
func (r *Redshift) copy(tableName, fileName, delimiter string, creds, gzip bool) error {
var (
credentials string
gzipSQL string
)
if creds {
credentials = fmt.Sprintf(`ACCESS_KEY_ID '%s' SECRET_ACCESS_KEY '%s'`, r.cfg.AWSAccessKeyID, r.cfg.AWSSecretAccessKey)
if r.cfg.AWSAccessKeyID != "" && r.cfg.AWSSecretAccessKey != "" {
credentials = fmt.Sprintf(`ACCESS_KEY_ID '%s' SECRET_ACCESS_KEY '%s'`, r.cfg.AWSAccessKeyID, r.cfg.AWSSecretAccessKey)
} else if r.cfg.AWSIAMRole != "" {
credentials = fmt.Sprintf(`IAM_ROLE '%s'`, r.cfg.AWSIAMRole)
} else {
credentials = "IAM_ROLE default"
}
}
if gzip {
gzipSQL = "GZIP"
}
bucketName := "rdshftbucket"
filePath := fmt.Sprintf("s3://%s/%s", bucketName, fileName)
filePath := fmt.Sprintf("s3://%s/%s", r.cfg.Redshift.Bucket, fileName)
copySQL := fmt.Sprintf(`COPY "%s" FROM '%s' WITH %s TIMEFORMAT 'auto' DATEFORMAT 'auto' TRUNCATECOLUMNS
STATUPDATE ON %s DELIMITER AS '%s' IGNOREHEADER 1 REMOVEQUOTES ESCAPE TRIMBLANKS EMPTYASNULL ACCEPTANYDATE`,
copySQL := fmt.Sprintf(`COPY "%s" FROM '%s' WITH %s TIMEFORMAT 'auto' DATEFORMAT 'auto' TRUNCATECOLUMNS STATUPDATE ON %s DELIMITER AS '%s' IGNOREHEADER 1 REMOVEQUOTES ESCAPE TRIMBLANKS EMPTYASNULL ACCEPTANYDATE`,
tableName, filePath, gzipSQL, credentials, delimiter)
r.log.Debug(context.Background(), "Executing COPY SQL: %s", copySQL)
_, err := r.db.ExecContext(r.ctx, copySQL)
return err
}
func (r *Redshift) ExecutionDuration(fileName string) (int, error) {
return 0, nil
}
func (r *Redshift) Close() error {
return r.db.Close()
}

View file

@ -0,0 +1,47 @@
package connector
import (
"context"
"fmt"
"openreplay/backend/internal/config/connector"
"openreplay/backend/pkg/logger"
)
type S3Storage struct {
log logger.Logger
cfg *connector.Config
ctx context.Context
batches *Batches
}
func NewS3Storage(log logger.Logger, cfg *connector.Config, buckets *Batches) (*S3Storage, error) {
return &S3Storage{
log: log,
cfg: cfg,
ctx: context.Background(),
batches: buckets,
}, nil
}
func (ds *S3Storage) InsertSessions(batch []map[string]string) error {
fileName := generateName(ds.cfg.SessionsTableName)
if err := ds.batches.Insert(batch, fileName, sessionColumns); err != nil {
return fmt.Errorf("can't insert sessions batch: %s", err)
}
ds.log.Info(context.Background(), "sessions batch of %d sessions is successfully saved", len(batch))
return nil
}
func (ds *S3Storage) InsertEvents(batch []map[string]string) error {
fileName := generateName(ds.cfg.EventsTableName)
if err := ds.batches.Insert(batch, fileName, eventColumns); err != nil {
return fmt.Errorf("can't insert events batch: %s", err)
}
ds.log.Info(context.Background(), "events batch of %d events is successfully saved", len(batch))
return nil
}
func (ds *S3Storage) Close() error {
return nil
}

View file

@ -0,0 +1,56 @@
package connector
import (
"bytes"
"fmt"
"github.com/google/uuid"
"openreplay/backend/internal/config/connector"
"openreplay/backend/pkg/objectstorage"
)
type Batches struct {
cfg *connector.Config
objStorage objectstorage.ObjectStorage
}
func NewBatches(cfg *connector.Config, objStorage objectstorage.ObjectStorage) (*Batches, error) {
return &Batches{
cfg: cfg,
objStorage: objStorage,
}, nil
}
func (ds *Batches) Insert(batch []map[string]string, fileName string, columns []string) error {
buf := dataToCSV(batch, columns)
reader := bytes.NewReader(buf.Bytes())
if err := ds.objStorage.Upload(reader, fileName, "text/csv", objectstorage.NoCompression); err != nil {
return fmt.Errorf("can't upload file to s3: %s", err)
}
return nil
}
func generateName(table string) string {
return fmt.Sprintf("connector_data/%s-%s.csv", table, uuid.New().String())
}
func dataToCSV(batch []map[string]string, columns []string) *bytes.Buffer {
buf := bytes.NewBuffer(nil)
// Write header (column names)
for _, column := range columns {
buf.WriteString(column + "|")
}
buf.Truncate(buf.Len() - 1)
// Write data (rows)
for _, data := range batch {
buf.WriteString("\n")
for _, column := range columns {
buf.WriteString(data[column] + "|")
}
buf.Truncate(buf.Len() - 1)
}
return buf
}

View file

@ -87,14 +87,48 @@ func handleEvent(msg messages.Message) map[string]string {
case *messages.CustomIssue:
event["customissue_name"] = QUOTES(m.Name)
event["customissue_payload"] = QUOTES(m.Payload)
// Mobile events
case *messages.MobileEvent:
event["mobile_event_name"] = QUOTES(m.Name)
event["mobile_event_payload"] = QUOTES(m.Payload)
case *messages.MobileNetworkCall:
event["mobile_networkcall_type"] = QUOTES(m.Type)
event["mobile_networkcall_method"] = QUOTES(m.Method)
event["mobile_networkcall_url"] = QUOTES(m.URL)
event["mobile_networkcall_request"] = QUOTES(m.Request)
event["mobile_networkcall_response"] = QUOTES(m.Response)
event["mobile_networkcall_status"] = fmt.Sprintf("%d", m.Status)
event["mobile_networkcall_timestamp"] = fmt.Sprintf("%d", m.Timestamp)
event["mobile_networkcall_duration"] = fmt.Sprintf("%d", m.Duration)
case *messages.MobileClickEvent:
event["mobile_clickevent_x"] = fmt.Sprintf("%d", m.X)
event["mobile_clickevent_y"] = fmt.Sprintf("%d", m.Y)
event["mobile_clickevent_timestamp"] = fmt.Sprintf("%d", m.Timestamp)
event["mobile_clickevent_label"] = QUOTES(m.Label)
case *messages.MobileSwipeEvent:
event["mobile_swipeevent_x"] = fmt.Sprintf("%d", m.X)
event["mobile_swipeevent_y"] = fmt.Sprintf("%d", m.Y)
event["mobile_swipeevent_timestamp"] = fmt.Sprintf("%d", m.Timestamp)
event["mobile_swipeevent_label"] = QUOTES(m.Label)
case *messages.MobileInputEvent:
event["mobile_inputevent_label"] = QUOTES(m.Label)
event["mobile_inputevent_value"] = QUOTES(m.Value)
case *messages.MobileCrash:
event["mobile_crash_name"] = QUOTES(m.Name)
event["mobile_crash_reason"] = QUOTES(m.Reason)
event["mobile_crash_stacktrace"] = QUOTES(m.Stacktrace)
case *messages.MobileIssueEvent:
event["mobile_issueevent_timestamp"] = fmt.Sprintf("%d", m.Timestamp)
event["mobile_issueevent_type"] = QUOTES(m.Type)
event["mobile_issueevent_context_string"] = QUOTES(m.ContextString)
event["mobile_issueevent_context"] = QUOTES(m.Context)
event["mobile_issueevent_payload"] = QUOTES(m.Payload)
}
if len(event) == 0 {
return nil
}
event["sessionid"] = fmt.Sprintf("%d", msg.SessionID())
event["received_at"] = fmt.Sprintf("%d", uint64(time.Now().UnixMilli()))
event["batch_order_number"] = fmt.Sprintf("%d", 0)
return event
}
@ -127,9 +161,6 @@ func (s *Saver) updateSessionInfoFromCache(sessID uint64, sess map[string]string
sess["session_duration"] = fmt.Sprintf("%d", end-start)
}
}
if sess["user_agent"] == "" && info.UserAgent != "" {
sess["user_agent"] = QUOTES(info.UserAgent)
}
if sess["user_browser"] == "" && info.UserBrowser != "" {
sess["user_browser"] = QUOTES(info.UserBrowser)
}
@ -205,7 +236,10 @@ func (s *Saver) handleSession(msg messages.Message) {
case *messages.SessionStart, *messages.SessionEnd, *messages.ConnectionInformation, *messages.Metadata,
*messages.PageEvent, *messages.PerformanceTrackAggr, *messages.UserID, *messages.UserAnonymousID,
*messages.JSException, *messages.JSExceptionDeprecated, *messages.InputEvent, *messages.MouseClick,
*messages.IssueEvent, *messages.IssueEventDeprecated:
*messages.IssueEvent, *messages.IssueEventDeprecated,
// Mobile messages
*messages.MobileSessionStart, *messages.MobileSessionEnd, *messages.MobileUserID, *messages.MobileUserAnonymousID,
*messages.MobileMetadata:
default:
return
}
@ -239,7 +273,6 @@ func (s *Saver) handleSession(msg messages.Message) {
case *messages.SessionStart:
sess["session_start_timestamp"] = fmt.Sprintf("%d", m.Timestamp)
sess["user_uuid"] = QUOTES(m.UserUUID)
sess["user_agent"] = QUOTES(m.UserAgent)
sess["user_os"] = QUOTES(m.UserOS)
sess["user_os_version"] = QUOTES(m.UserOSVersion)
sess["user_browser"] = QUOTES(m.UserBrowser)
@ -326,6 +359,43 @@ func (s *Saver) handleSession(msg messages.Message) {
currIssuesCount = 0
}
sess["issues_count"] = fmt.Sprintf("%d", currIssuesCount+1)
// Mobile messages
case *messages.MobileSessionStart:
sess["session_start_timestamp"] = fmt.Sprintf("%d", m.Timestamp)
sess["user_uuid"] = QUOTES(m.UserUUID)
sess["user_os"] = QUOTES(m.UserOS)
sess["user_os_version"] = QUOTES(m.UserOSVersion)
sess["user_device"] = QUOTES(m.UserDevice)
sess["user_device_type"] = QUOTES(m.UserDeviceType)
sess["tracker_version"] = QUOTES(m.TrackerVersion)
sess["rev_id"] = QUOTES(m.RevID)
case *messages.MobileSessionEnd:
sess["session_end_timestamp"] = fmt.Sprintf("%d", m.Timestamp)
if err := s.updateSessionInfoFromCache(msg.SessionID(), sess); err != nil {
s.log.Warn(ctx, "failed to update session info from cache: %s", err)
}
case *messages.MobileMetadata:
session, err := s.sessModule.Get(msg.SessionID())
if err != nil {
s.log.Error(ctx, "error getting session info: %s", err)
break
}
project, err := s.projModule.GetProject(session.ProjectID)
if err != nil {
s.log.Error(ctx, "error getting project info: %s", err)
break
}
keyNo := project.GetMetadataNo(m.Key)
if keyNo == 0 {
break
}
sess[fmt.Sprintf("metadata_%d", keyNo)] = QUOTES(m.Value)
case *messages.MobileUserID:
if m.ID != "" {
sess["user_id"] = QUOTES(m.ID)
}
case *messages.MobileUserAnonymousID:
sess["user_anonymous_id"] = QUOTES(m.ID)
default:
updated = false
}
@ -348,7 +418,7 @@ func (s *Saver) Handle(msg messages.Message) {
s.events = append(s.events, newEvent)
}
s.handleSession(msg)
if msg.TypeID() == messages.MsgSessionEnd {
if msg.TypeID() == messages.MsgSessionEnd || msg.TypeID() == messages.MsgMobileSessionEnd {
if s.finishedSessions == nil {
s.finishedSessions = make([]uint64, 0)
}
@ -386,6 +456,10 @@ func (s *Saver) commitSessions() {
toSend = append(toSend, sessionID)
}
}
if len(sessions) == 0 {
s.log.Info(context.Background(), "empty sessions batch to send")
return
}
if err := s.db.InsertSessions(sessions); err != nil {
s.log.Error(context.Background(), "can't insert sessions: %s", err)
}
@ -441,7 +515,7 @@ func (s *Saver) checkZombieSessions() {
s.log.Warn(ctx, "failed to update zombie session info from cache: %s", err)
} else {
s.sessions[sessionID] = zombieSession
s.log.Info(ctx, "updated zombie session info from cache: %v", zombieSession)
s.log.Debug(ctx, "updated zombie session info from cache: %v", zombieSession)
}
}
if zombieSession["session_start_timestamp"] == "" || zombieSession["session_end_timestamp"] == "" {