New S3 connector (#2199)
* feat(connector): added s3 connector + small improvements * feat(connector): added missing import
This commit is contained in:
parent
0507a00acf
commit
7a7fd44c3b
6 changed files with 180 additions and 112 deletions
|
|
@ -27,17 +27,29 @@ func main() {
|
|||
if err != nil {
|
||||
log.Fatal(ctx, "can't init object storage: %s", err)
|
||||
}
|
||||
batches, err := saver.NewBatches(cfg, objStore)
|
||||
if err != nil {
|
||||
log.Fatal(ctx, "can't init s3 buckets: %s", err)
|
||||
}
|
||||
|
||||
var db saver.Database
|
||||
switch cfg.ConnectorType {
|
||||
case "redshift":
|
||||
if db, err = saver.NewRedshift(log, cfg, objStore); err != nil {
|
||||
if db, err = saver.NewRedshift(log, cfg, batches); err != nil {
|
||||
log.Fatal(ctx, "can't init redshift connection: %s", err)
|
||||
}
|
||||
case "clickhouse":
|
||||
if db, err = saver.NewClickHouse(log, cfg); err != nil {
|
||||
if db, err = saver.NewClickHouse(log, cfg, batches); err != nil {
|
||||
log.Fatal(ctx, "can't init clickhouse connection: %s", err)
|
||||
}
|
||||
case "elasticsearch":
|
||||
if db, err = saver.NewElasticSearch(log, cfg); err != nil {
|
||||
log.Fatal(ctx, "can't init elasticsearch connection: %s", err)
|
||||
}
|
||||
case "s3":
|
||||
if db, err = saver.NewS3Storage(log, cfg, batches); err != nil {
|
||||
log.Fatal(ctx, "can't init s3 connection: %s", err)
|
||||
}
|
||||
default:
|
||||
log.Fatal(ctx, "unknown connector type: %s", cfg.ConnectorType)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,12 +14,13 @@ import (
|
|||
)
|
||||
|
||||
type ClickHouse struct {
|
||||
log logger.Logger
|
||||
cfg *connector.Config
|
||||
conn driver.Conn
|
||||
log logger.Logger
|
||||
cfg *connector.Config
|
||||
conn driver.Conn
|
||||
batches *Batches
|
||||
}
|
||||
|
||||
func NewClickHouse(log logger.Logger, cfg *connector.Config) (*ClickHouse, error) {
|
||||
func NewClickHouse(log logger.Logger, cfg *connector.Config, batches *Batches) (*ClickHouse, error) {
|
||||
url := cfg.Clickhouse.URL
|
||||
url = strings.TrimPrefix(url, "tcp://")
|
||||
url = strings.TrimSuffix(url, "/default")
|
||||
|
|
@ -45,16 +46,21 @@ func NewClickHouse(log logger.Logger, cfg *connector.Config) (*ClickHouse, error
|
|||
return nil, err
|
||||
}
|
||||
c := &ClickHouse{
|
||||
log: log,
|
||||
cfg: cfg,
|
||||
conn: conn,
|
||||
log: log,
|
||||
cfg: cfg,
|
||||
conn: conn,
|
||||
batches: batches,
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *ClickHouse) InsertEvents(batch []map[string]string) error {
|
||||
return c.insertEventsUsingBuffer(batch)
|
||||
}
|
||||
|
||||
const eventsSQL = "INSERT INTO connector_events_buffer (sessionid, consolelog_level, consolelog_value, customevent_name, customevent_payload, jsexception_message, jsexception_name, jsexception_payload, jsexception_metadata, networkrequest_type, networkrequest_method, networkrequest_url, networkrequest_request, networkrequest_response, networkrequest_status, networkrequest_timestamp, networkrequest_duration, issueevent_message_id, issueevent_timestamp, issueevent_type, issueevent_context_string, issueevent_context, issueevent_payload, issueevent_url, customissue_name, customissue_payload, received_at, batch_order_number) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
|
||||
|
||||
func (c *ClickHouse) InsertEvents(batch []map[string]string) error {
|
||||
func (c *ClickHouse) insertEventsUsingBuffer(batch []map[string]string) error {
|
||||
bulk, err := c.conn.PrepareBatch(context.Background(), eventsSQL)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
@ -98,9 +104,13 @@ func (c *ClickHouse) InsertEvents(batch []map[string]string) error {
|
|||
return bulk.Send()
|
||||
}
|
||||
|
||||
func (c *ClickHouse) InsertSessions(batch []map[string]string) error {
|
||||
return c.insertSessionsUsingBuffer(batch)
|
||||
}
|
||||
|
||||
const sessionsSQL = "INSERT INTO connector_user_sessions_buffer (sessionid, user_agent, user_browser, user_browser_version, user_country, user_device, user_device_heap_size, user_device_memory_size, user_device_type, user_os, user_os_version, user_uuid, connection_effective_bandwidth, connection_type, referrer, user_anonymous_id, user_id, session_start_timestamp, session_end_timestamp, session_duration, first_contentful_paint, speed_index, visually_complete, timing_time_to_interactive, avg_cpu, avg_fps, max_cpu, max_fps, max_total_js_heap_size, max_used_js_heap_size, js_exceptions_count, inputs_count, clicks_count, issues_count, pages_count, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
|
||||
|
||||
func (c *ClickHouse) InsertSessions(batch []map[string]string) error {
|
||||
func (c *ClickHouse) insertSessionsUsingBuffer(batch []map[string]string) error {
|
||||
bulk, err := c.conn.PrepareBatch(context.Background(), sessionsSQL)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -1,28 +1,25 @@
|
|||
package connector
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"github.com/google/uuid"
|
||||
_ "github.com/lib/pq"
|
||||
|
||||
"openreplay/backend/internal/config/connector"
|
||||
"openreplay/backend/pkg/logger"
|
||||
"openreplay/backend/pkg/objectstorage"
|
||||
)
|
||||
|
||||
type Redshift struct {
|
||||
log logger.Logger
|
||||
cfg *connector.Config
|
||||
ctx context.Context
|
||||
db *sql.DB
|
||||
objStorage objectstorage.ObjectStorage
|
||||
log logger.Logger
|
||||
cfg *connector.Config
|
||||
ctx context.Context
|
||||
db *sql.DB
|
||||
batches *Batches
|
||||
}
|
||||
|
||||
func NewRedshift(log logger.Logger, cfg *connector.Config, objStorage objectstorage.ObjectStorage) (*Redshift, error) {
|
||||
func NewRedshift(log logger.Logger, cfg *connector.Config, batches *Batches) (*Redshift, error) {
|
||||
var source string
|
||||
if cfg.ConnectionString != "" {
|
||||
source = cfg.ConnectionString
|
||||
|
|
@ -39,91 +36,41 @@ func NewRedshift(log logger.Logger, cfg *connector.Config, objStorage objectstor
|
|||
return nil, err
|
||||
}
|
||||
return &Redshift{
|
||||
log: log,
|
||||
cfg: cfg,
|
||||
ctx: context.Background(),
|
||||
db: sqldb,
|
||||
objStorage: objStorage,
|
||||
log: log,
|
||||
cfg: cfg,
|
||||
ctx: context.Background(),
|
||||
db: sqldb,
|
||||
batches: batches,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func eventsToBuffer(batch []map[string]string) *bytes.Buffer {
|
||||
buf := bytes.NewBuffer(nil)
|
||||
|
||||
// Write header
|
||||
for _, column := range eventColumns {
|
||||
buf.WriteString(column + "|")
|
||||
}
|
||||
buf.Truncate(buf.Len() - 1)
|
||||
|
||||
// Write data
|
||||
for _, event := range batch {
|
||||
buf.WriteString("\n")
|
||||
for _, column := range eventColumns {
|
||||
buf.WriteString(event[column] + "|")
|
||||
}
|
||||
buf.Truncate(buf.Len() - 1)
|
||||
}
|
||||
return buf
|
||||
}
|
||||
|
||||
func (r *Redshift) InsertEvents(batch []map[string]string) error {
|
||||
// Send data to S3
|
||||
fileName := fmt.Sprintf("connector_data/%s-%s.csv", r.cfg.EventsTableName, uuid.New().String())
|
||||
// Create csv file
|
||||
buf := eventsToBuffer(batch)
|
||||
|
||||
reader := bytes.NewReader(buf.Bytes())
|
||||
if err := r.objStorage.Upload(reader, fileName, "text/csv", objectstorage.NoCompression); err != nil {
|
||||
return fmt.Errorf("can't upload file to s3: %s", err)
|
||||
}
|
||||
// Copy data from s3 bucket to redshift
|
||||
if err := r.Copy(r.cfg.EventsTableName, fileName, "|", true, false); err != nil {
|
||||
return fmt.Errorf("can't copy data from s3 to redshift: %s", err)
|
||||
}
|
||||
r.log.Info(context.Background(), "events batch of %d events is successfully saved", len(batch))
|
||||
return nil
|
||||
}
|
||||
|
||||
func sessionsToBuffer(batch []map[string]string) *bytes.Buffer {
|
||||
buf := bytes.NewBuffer(nil)
|
||||
|
||||
// Write header
|
||||
for _, column := range sessionColumns {
|
||||
buf.WriteString(column + "|")
|
||||
}
|
||||
buf.Truncate(buf.Len() - 1)
|
||||
|
||||
// Write data
|
||||
for _, sess := range batch {
|
||||
buf.WriteString("\n")
|
||||
for _, column := range sessionColumns {
|
||||
buf.WriteString(sess[column] + "|")
|
||||
}
|
||||
buf.Truncate(buf.Len() - 1)
|
||||
}
|
||||
return buf
|
||||
}
|
||||
|
||||
func (r *Redshift) InsertSessions(batch []map[string]string) error {
|
||||
// Send data to S3
|
||||
fileName := fmt.Sprintf("connector_data/%s-%s.csv", r.cfg.SessionsTableName, uuid.New().String())
|
||||
// Create csv file
|
||||
buf := sessionsToBuffer(batch)
|
||||
|
||||
reader := bytes.NewReader(buf.Bytes())
|
||||
if err := r.objStorage.Upload(reader, fileName, "text/csv", objectstorage.NoCompression); err != nil {
|
||||
return fmt.Errorf("can't upload file to s3: %s", err)
|
||||
fileName := generateName(r.cfg.SessionsTableName)
|
||||
if err := r.batches.Insert(batch, fileName, sessionColumns); err != nil {
|
||||
return fmt.Errorf("can't insert sessions batch: %s", err)
|
||||
}
|
||||
// Copy data from s3 bucket to redshift
|
||||
if err := r.Copy(r.cfg.SessionsTableName, fileName, "|", true, false); err != nil {
|
||||
if err := r.copy(r.cfg.SessionsTableName, fileName, "|", true, false); err != nil {
|
||||
return fmt.Errorf("can't copy data from s3 to redshift: %s", err)
|
||||
}
|
||||
r.log.Info(context.Background(), "sessions batch of %d sessions is successfully saved", len(batch))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Redshift) Copy(tableName, fileName, delimiter string, creds, gzip bool) error {
|
||||
func (r *Redshift) InsertEvents(batch []map[string]string) error {
|
||||
fileName := generateName(r.cfg.EventsTableName)
|
||||
if err := r.batches.Insert(batch, fileName, eventColumns); err != nil {
|
||||
return fmt.Errorf("can't insert events batch: %s", err)
|
||||
}
|
||||
// Copy data from s3 bucket to redshift
|
||||
if err := r.copy(r.cfg.EventsTableName, fileName, "|", true, false); err != nil {
|
||||
return fmt.Errorf("can't copy data from s3 to redshift: %s", err)
|
||||
}
|
||||
r.log.Info(context.Background(), "events batch of %d events is successfully saved", len(batch))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Redshift) copy(tableName, fileName, delimiter string, creds, gzip bool) error {
|
||||
var (
|
||||
credentials string
|
||||
gzipSQL string
|
||||
|
|
@ -151,10 +98,6 @@ func (r *Redshift) Copy(tableName, fileName, delimiter string, creds, gzip bool)
|
|||
return err
|
||||
}
|
||||
|
||||
func (r *Redshift) ExecutionDuration(fileName string) (int, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (r *Redshift) Close() error {
|
||||
return r.db.Close()
|
||||
}
|
||||
|
|
|
|||
47
ee/backend/pkg/connector/s3.go
Normal file
47
ee/backend/pkg/connector/s3.go
Normal file
|
|
@ -0,0 +1,47 @@
|
|||
package connector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"openreplay/backend/internal/config/connector"
|
||||
"openreplay/backend/pkg/logger"
|
||||
)
|
||||
|
||||
type S3Storage struct {
|
||||
log logger.Logger
|
||||
cfg *connector.Config
|
||||
ctx context.Context
|
||||
batches *Batches
|
||||
}
|
||||
|
||||
func NewS3Storage(log logger.Logger, cfg *connector.Config, buckets *Batches) (*S3Storage, error) {
|
||||
return &S3Storage{
|
||||
log: log,
|
||||
cfg: cfg,
|
||||
ctx: context.Background(),
|
||||
batches: buckets,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (ds *S3Storage) InsertSessions(batch []map[string]string) error {
|
||||
fileName := generateName(ds.cfg.SessionsTableName)
|
||||
if err := ds.batches.Insert(batch, fileName, sessionColumns); err != nil {
|
||||
return fmt.Errorf("can't insert sessions batch: %s", err)
|
||||
}
|
||||
ds.log.Info(context.Background(), "sessions batch of %d sessions is successfully saved", len(batch))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ds *S3Storage) InsertEvents(batch []map[string]string) error {
|
||||
fileName := generateName(ds.cfg.EventsTableName)
|
||||
if err := ds.batches.Insert(batch, fileName, eventColumns); err != nil {
|
||||
return fmt.Errorf("can't insert events batch: %s", err)
|
||||
}
|
||||
ds.log.Info(context.Background(), "events batch of %d events is successfully saved", len(batch))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ds *S3Storage) Close() error {
|
||||
return nil
|
||||
}
|
||||
56
ee/backend/pkg/connector/s3batches.go
Normal file
56
ee/backend/pkg/connector/s3batches.go
Normal file
|
|
@ -0,0 +1,56 @@
|
|||
package connector
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
|
||||
"github.com/google/uuid"
|
||||
|
||||
"openreplay/backend/internal/config/connector"
|
||||
"openreplay/backend/pkg/objectstorage"
|
||||
)
|
||||
|
||||
type Batches struct {
|
||||
cfg *connector.Config
|
||||
objStorage objectstorage.ObjectStorage
|
||||
}
|
||||
|
||||
func NewBatches(cfg *connector.Config, objStorage objectstorage.ObjectStorage) (*Batches, error) {
|
||||
return &Batches{
|
||||
cfg: cfg,
|
||||
objStorage: objStorage,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (ds *Batches) Insert(batch []map[string]string, fileName string, columns []string) error {
|
||||
buf := dataToCSV(batch, columns)
|
||||
reader := bytes.NewReader(buf.Bytes())
|
||||
if err := ds.objStorage.Upload(reader, fileName, "text/csv", "", objectstorage.NoCompression); err != nil {
|
||||
return fmt.Errorf("can't upload file to s3: %s", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func generateName(table string) string {
|
||||
return fmt.Sprintf("connector_data/%s-%s.csv", table, uuid.New().String())
|
||||
}
|
||||
|
||||
func dataToCSV(batch []map[string]string, columns []string) *bytes.Buffer {
|
||||
buf := bytes.NewBuffer(nil)
|
||||
|
||||
// Write header (column names)
|
||||
for _, column := range columns {
|
||||
buf.WriteString(column + "|")
|
||||
}
|
||||
buf.Truncate(buf.Len() - 1)
|
||||
|
||||
// Write data (rows)
|
||||
for _, data := range batch {
|
||||
buf.WriteString("\n")
|
||||
for _, column := range columns {
|
||||
buf.WriteString(data[column] + "|")
|
||||
}
|
||||
buf.Truncate(buf.Len() - 1)
|
||||
}
|
||||
return buf
|
||||
}
|
||||
|
|
@ -100,10 +100,10 @@ func handleEvent(msg messages.Message) map[string]string {
|
|||
event["customissue_name"] = QUOTES(m.Name)
|
||||
event["customissue_payload"] = QUOTES(m.Payload)
|
||||
// Mobile events
|
||||
case *messages.IOSEvent:
|
||||
case *messages.MobileEvent:
|
||||
event["mobile_event_name"] = QUOTES(m.Name)
|
||||
event["mobile_event_payload"] = QUOTES(m.Payload)
|
||||
case *messages.IOSNetworkCall:
|
||||
case *messages.MobileNetworkCall:
|
||||
event["mobile_networkcall_type"] = QUOTES(m.Type)
|
||||
event["mobile_networkcall_method"] = QUOTES(m.Method)
|
||||
event["mobile_networkcall_url"] = QUOTES(m.URL)
|
||||
|
|
@ -112,24 +112,24 @@ func handleEvent(msg messages.Message) map[string]string {
|
|||
event["mobile_networkcall_status"] = fmt.Sprintf("%d", m.Status)
|
||||
event["mobile_networkcall_timestamp"] = fmt.Sprintf("%d", m.Timestamp)
|
||||
event["mobile_networkcall_duration"] = fmt.Sprintf("%d", m.Duration)
|
||||
case *messages.IOSClickEvent:
|
||||
case *messages.MobileClickEvent:
|
||||
event["mobile_clickevent_x"] = fmt.Sprintf("%d", m.X)
|
||||
event["mobile_clickevent_y"] = fmt.Sprintf("%d", m.Y)
|
||||
event["mobile_clickevent_timestamp"] = fmt.Sprintf("%d", m.Timestamp)
|
||||
event["mobile_clickevent_label"] = QUOTES(m.Label)
|
||||
case *messages.IOSSwipeEvent:
|
||||
case *messages.MobileSwipeEvent:
|
||||
event["mobile_swipeevent_x"] = fmt.Sprintf("%d", m.X)
|
||||
event["mobile_swipeevent_y"] = fmt.Sprintf("%d", m.Y)
|
||||
event["mobile_swipeevent_timestamp"] = fmt.Sprintf("%d", m.Timestamp)
|
||||
event["mobile_swipeevent_label"] = QUOTES(m.Label)
|
||||
case *messages.IOSInputEvent:
|
||||
case *messages.MobileInputEvent:
|
||||
event["mobile_inputevent_label"] = QUOTES(m.Label)
|
||||
event["mobile_inputevent_value"] = QUOTES(m.Value)
|
||||
case *messages.IOSCrash:
|
||||
case *messages.MobileCrash:
|
||||
event["mobile_crash_name"] = QUOTES(m.Name)
|
||||
event["mobile_crash_reason"] = QUOTES(m.Reason)
|
||||
event["mobile_crash_stacktrace"] = QUOTES(m.Stacktrace)
|
||||
case *messages.IOSIssueEvent:
|
||||
case *messages.MobileIssueEvent:
|
||||
event["mobile_issueevent_timestamp"] = fmt.Sprintf("%d", m.Timestamp)
|
||||
event["mobile_issueevent_type"] = QUOTES(m.Type)
|
||||
event["mobile_issueevent_context_string"] = QUOTES(m.ContextString)
|
||||
|
|
@ -250,8 +250,8 @@ func (s *Saver) handleSession(msg messages.Message) {
|
|||
*messages.JSException, *messages.JSExceptionDeprecated, *messages.InputEvent, *messages.MouseClick,
|
||||
*messages.IssueEvent, *messages.IssueEventDeprecated,
|
||||
// Mobile messages
|
||||
*messages.IOSSessionStart, *messages.IOSSessionEnd, *messages.IOSUserID, *messages.IOSUserAnonymousID,
|
||||
*messages.IOSMetadata:
|
||||
*messages.MobileSessionStart, *messages.MobileSessionEnd, *messages.MobileUserID, *messages.MobileUserAnonymousID,
|
||||
*messages.MobileMetadata:
|
||||
default:
|
||||
return
|
||||
}
|
||||
|
|
@ -372,7 +372,7 @@ func (s *Saver) handleSession(msg messages.Message) {
|
|||
}
|
||||
sess["issues_count"] = fmt.Sprintf("%d", currIssuesCount+1)
|
||||
// Mobile messages
|
||||
case *messages.IOSSessionStart:
|
||||
case *messages.MobileSessionStart:
|
||||
sess["session_start_timestamp"] = fmt.Sprintf("%d", m.Timestamp)
|
||||
sess["user_uuid"] = QUOTES(m.UserUUID)
|
||||
sess["user_os"] = QUOTES(m.UserOS)
|
||||
|
|
@ -381,12 +381,12 @@ func (s *Saver) handleSession(msg messages.Message) {
|
|||
sess["user_device_type"] = QUOTES(m.UserDeviceType)
|
||||
sess["tracker_version"] = QUOTES(m.TrackerVersion)
|
||||
sess["rev_id"] = QUOTES(m.RevID)
|
||||
case *messages.IOSSessionEnd:
|
||||
case *messages.MobileSessionEnd:
|
||||
sess["session_end_timestamp"] = fmt.Sprintf("%d", m.Timestamp)
|
||||
if err := s.updateSessionInfoFromCache(msg.SessionID(), sess); err != nil {
|
||||
s.log.Warn(ctx, "failed to update session info from cache: %s", err)
|
||||
}
|
||||
case *messages.IOSMetadata:
|
||||
case *messages.MobileMetadata:
|
||||
session, err := s.sessModule.Get(msg.SessionID())
|
||||
if err != nil {
|
||||
s.log.Error(ctx, "error getting session info: %s", err)
|
||||
|
|
@ -402,11 +402,11 @@ func (s *Saver) handleSession(msg messages.Message) {
|
|||
break
|
||||
}
|
||||
sess[fmt.Sprintf("metadata_%d", keyNo)] = QUOTES(m.Value)
|
||||
case *messages.IOSUserID:
|
||||
case *messages.MobileUserID:
|
||||
if m.ID != "" {
|
||||
sess["user_id"] = QUOTES(m.ID)
|
||||
}
|
||||
case *messages.IOSUserAnonymousID:
|
||||
case *messages.MobileUserAnonymousID:
|
||||
sess["user_anonymous_id"] = QUOTES(m.ID)
|
||||
default:
|
||||
updated = false
|
||||
|
|
@ -443,7 +443,7 @@ func (s *Saver) Handle(msg messages.Message) {
|
|||
s.events = append(s.events, newEvent)
|
||||
}
|
||||
s.handleSession(msg)
|
||||
if msg.TypeID() == messages.MsgSessionEnd || msg.TypeID() == messages.MsgIOSSessionEnd {
|
||||
if msg.TypeID() == messages.MsgSessionEnd || msg.TypeID() == messages.MsgMobileSessionEnd {
|
||||
if s.finishedSessions == nil {
|
||||
s.finishedSessions = make([]uint64, 0)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue