openreplay/ee/backend/pkg/connector/redshift.go

103 lines
3.1 KiB
Go

package connector
import (
"context"
"database/sql"
"fmt"
_ "github.com/lib/pq"
"openreplay/backend/internal/config/connector"
"openreplay/backend/pkg/logger"
)
type Redshift struct {
log logger.Logger
cfg *connector.Config
ctx context.Context
db *sql.DB
batches *Batches
}
func NewRedshift(log logger.Logger, cfg *connector.Config, batches *Batches) (*Redshift, error) {
var source string
if cfg.ConnectionString != "" {
source = cfg.ConnectionString
} else {
source = fmt.Sprintf("postgres://%s:%s@%s:%d/%s",
cfg.Redshift.User, cfg.Redshift.Password, cfg.Redshift.Host, cfg.Redshift.Port, cfg.Redshift.Database)
}
log.Info(context.Background(), "Connecting to Redshift Source: ", source)
sqldb, err := sql.Open("postgres", source)
if err != nil {
return nil, err
}
if err := sqldb.Ping(); err != nil {
return nil, err
}
return &Redshift{
log: log,
cfg: cfg,
ctx: context.Background(),
db: sqldb,
batches: batches,
}, nil
}
func (r *Redshift) InsertSessions(batch []map[string]string) error {
fileName := generateName(r.cfg.SessionsTableName)
if err := r.batches.Insert(batch, fileName, sessionColumns); err != nil {
return fmt.Errorf("can't insert sessions batch: %s", err)
}
// Copy data from s3 bucket to redshift
if err := r.copy(r.cfg.SessionsTableName, fileName, "|", true, false); err != nil {
return fmt.Errorf("can't copy data from s3 to redshift: %s", err)
}
r.log.Info(context.Background(), "sessions batch of %d sessions is successfully saved", len(batch))
return nil
}
func (r *Redshift) InsertEvents(batch []map[string]string) error {
fileName := generateName(r.cfg.EventsTableName)
if err := r.batches.Insert(batch, fileName, eventColumns); err != nil {
return fmt.Errorf("can't insert events batch: %s", err)
}
// Copy data from s3 bucket to redshift
if err := r.copy(r.cfg.EventsTableName, fileName, "|", true, false); err != nil {
return fmt.Errorf("can't copy data from s3 to redshift: %s", err)
}
r.log.Info(context.Background(), "events batch of %d events is successfully saved", len(batch))
return nil
}
func (r *Redshift) copy(tableName, fileName, delimiter string, creds, gzip bool) error {
var (
credentials string
gzipSQL string
)
if creds {
if r.cfg.AWSAccessKeyID != "" && r.cfg.AWSSecretAccessKey != "" {
credentials = fmt.Sprintf(`ACCESS_KEY_ID '%s' SECRET_ACCESS_KEY '%s'`, r.cfg.AWSAccessKeyID, r.cfg.AWSSecretAccessKey)
} else if r.cfg.AWSIAMRole != "" {
credentials = fmt.Sprintf(`IAM_ROLE '%s'`, r.cfg.AWSIAMRole)
} else {
credentials = "IAM_ROLE default"
}
}
if gzip {
gzipSQL = "GZIP"
}
filePath := fmt.Sprintf("s3://%s/%s", r.cfg.Redshift.Bucket, fileName)
copySQL := fmt.Sprintf(`COPY "%s" FROM '%s' WITH %s TIMEFORMAT 'auto' DATEFORMAT 'auto' TRUNCATECOLUMNS STATUPDATE ON %s DELIMITER AS '%s' IGNOREHEADER 1 REMOVEQUOTES ESCAPE TRIMBLANKS EMPTYASNULL ACCEPTANYDATE`,
tableName, filePath, gzipSQL, credentials, delimiter)
r.log.Debug(context.Background(), "Executing COPY SQL: %s", copySQL)
_, err := r.db.ExecContext(r.ctx, copySQL)
return err
}
func (r *Redshift) Close() error {
return r.db.Close()
}