openreplay/backend/pkg/sessions/updates.go
Alexander 993b6b58ab
feat(backend): added utm data to PG (#2152)
* feat(backend): added utm data to PG

* feat(backend): added utm data to CH

* feat(db): removed nullableString() calls
2024-05-24 08:33:28 +02:00

263 lines
6.8 KiB
Go

package sessions
import (
"context"
"fmt"
"time"
"github.com/jackc/pgx/v4"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/metrics/database"
)
type Updates interface {
AddUserID(sessionID uint64, userID string)
AddAnonID(sessionID uint64, userID string)
SetReferrer(sessionID uint64, referrer, baseReferrer string)
SetMetadata(sessionID uint64, keyNo uint, value string)
SetUTM(sessionID uint64, utmSource, utmMedium, utmCampaign string)
AddEvents(sessionID uint64, events, pages int)
AddIssues(sessionID uint64, errors, issues int)
Commit()
}
type updatesImpl struct {
log logger.Logger
db pool.Pool
updates map[uint64]*sessionUpdate
}
func NewSessionUpdates(log logger.Logger, db pool.Pool) Updates {
return &updatesImpl{
log: log,
db: db,
updates: make(map[uint64]*sessionUpdate),
}
}
func (u *updatesImpl) AddUserID(sessionID uint64, userID string) {
if u.updates[sessionID] == nil {
u.updates[sessionID] = NewSessionUpdate(sessionID)
}
u.updates[sessionID].setUserID(userID)
}
func (u *updatesImpl) AddAnonID(sessionID uint64, userID string) {
if u.updates[sessionID] == nil {
u.updates[sessionID] = NewSessionUpdate(sessionID)
}
u.updates[sessionID].setUserID(userID)
}
func (u *updatesImpl) SetReferrer(sessionID uint64, referrer, baseReferrer string) {
if u.updates[sessionID] == nil {
u.updates[sessionID] = NewSessionUpdate(sessionID)
}
u.updates[sessionID].setReferrer(referrer, baseReferrer)
}
func (u *updatesImpl) SetMetadata(sessionID uint64, keyNo uint, value string) {
if u.updates[sessionID] == nil {
u.updates[sessionID] = NewSessionUpdate(sessionID)
}
u.updates[sessionID].setMetadata(keyNo, value)
}
func (u *updatesImpl) SetUTM(sessionID uint64, utmSource, utmMedium, utmCampaign string) {
if u.updates[sessionID] == nil {
u.updates[sessionID] = NewSessionUpdate(sessionID)
}
u.updates[sessionID].setUTM(utmSource, utmMedium, utmCampaign)
}
func (u *updatesImpl) AddEvents(sessionID uint64, events, pages int) {
if u.updates[sessionID] == nil {
u.updates[sessionID] = NewSessionUpdate(sessionID)
}
u.updates[sessionID].addEvents(events, pages)
}
func (u *updatesImpl) AddIssues(sessionID uint64, errors, issues int) {
if u.updates[sessionID] == nil {
u.updates[sessionID] = NewSessionUpdate(sessionID)
}
u.updates[sessionID].addIssues(errors, issues)
}
func (u *updatesImpl) Commit() {
b := &pgx.Batch{}
for _, upd := range u.updates {
if str, args := upd.request(); str != "" {
b.Queue(str, args...)
}
}
// Record batch size
database.RecordBatchElements(float64(b.Len()))
start := time.Now()
// Send batch to db and execute
br := u.db.SendBatch(b)
l := b.Len()
failed := false
for i := 0; i < l; i++ {
if _, err := br.Exec(); err != nil {
u.log.Error(context.Background(), "error in PG batch.Exec(): %s", err)
failed = true
break
}
}
if err := br.Close(); err != nil {
u.log.Error(context.Background(), "error in PG batch.Close(): %s", err)
}
if failed {
for _, upd := range u.updates {
if str, args := upd.request(); str != "" {
if err := u.db.Exec(str, args...); err != nil {
u.log.Error(context.Background(), "error in PG Exec(): %s", err)
}
}
}
}
database.RecordBatchInsertDuration(float64(time.Now().Sub(start).Milliseconds()))
u.updates = make(map[uint64]*sessionUpdate)
}
type sessionUpdate struct {
sessionID uint64
userID *string
anonID *string
referrer *string
baseReferrer *string
metadata map[uint]string
pages int
events int
errors int
issues int
utmSource *string
utmMedium *string
utmCampaign *string
}
func NewSessionUpdate(sessionID uint64) *sessionUpdate {
return &sessionUpdate{
sessionID: sessionID,
pages: 0,
events: 0,
errors: 0,
issues: 0,
metadata: make(map[uint]string),
}
}
func (su *sessionUpdate) setUserID(userID string) {
su.userID = &userID
}
func (su *sessionUpdate) setAnonID(anonID string) {
su.anonID = &anonID
}
func (su *sessionUpdate) setReferrer(referrer, baseReferrer string) {
su.referrer = &referrer
su.baseReferrer = &baseReferrer
}
func (su *sessionUpdate) setMetadata(keyNo uint, value string) {
su.metadata[keyNo] = value
}
func (su *sessionUpdate) setUTM(utmSource, utmMedium, utmCampaign string) {
if utmSource != "" {
su.utmSource = &utmSource
}
if utmMedium != "" {
su.utmMedium = &utmMedium
}
if utmCampaign != "" {
su.utmCampaign = &utmCampaign
}
}
func (su *sessionUpdate) addEvents(events, pages int) {
su.events += events
su.pages += pages
}
func (su *sessionUpdate) addIssues(errors, issues int) {
su.errors += errors
su.issues += issues
}
func (su *sessionUpdate) request() (string, []interface{}) {
sqlReq := "UPDATE sessions SET"
sqlArgs := make([]interface{}, 0)
varsCounter := 0
if su.userID != nil {
varsCounter++
sqlReq += fmt.Sprintf(" user_id = LEFT($%d, 8000),", varsCounter)
sqlArgs = append(sqlArgs, *su.userID)
}
if su.anonID != nil {
varsCounter++
sqlReq += fmt.Sprintf(" user_anonymous_id = LEFT($%d, 8000),", varsCounter)
sqlArgs = append(sqlArgs, *su.anonID)
}
if su.referrer != nil {
varsCounter += 2
sqlReq += fmt.Sprintf(" referrer = LEFT($%d, 8000), base_referrer = LEFT($%d, 8000),", varsCounter-1, varsCounter)
sqlArgs = append(sqlArgs, *su.referrer, *su.baseReferrer)
}
for keyNo, value := range su.metadata {
varsCounter++
sqlReq += fmt.Sprintf(" metadata_%d = LEFT($%d, 8000),", keyNo, varsCounter)
sqlArgs = append(sqlArgs, value)
}
if su.pages > 0 {
varsCounter++
sqlReq += fmt.Sprintf(" pages_count = pages_count + $%d,", varsCounter)
sqlArgs = append(sqlArgs, su.pages)
}
if su.events > 0 {
varsCounter++
sqlReq += fmt.Sprintf(" events_count = events_count + $%d,", varsCounter)
sqlArgs = append(sqlArgs, su.events)
}
if su.errors > 0 {
varsCounter++
sqlReq += fmt.Sprintf(" errors_count = errors_count + $%d,", varsCounter)
sqlArgs = append(sqlArgs, su.errors)
}
if su.issues > 0 {
varsCounter++
sqlReq += fmt.Sprintf(" issue_score = issue_score + $%d,", varsCounter)
sqlArgs = append(sqlArgs, su.issues)
}
if su.utmSource != nil {
varsCounter++
sqlReq += fmt.Sprintf(" utm_source = LEFT($%d, 8000),", varsCounter)
sqlArgs = append(sqlArgs, *su.utmSource)
}
if su.utmMedium != nil {
varsCounter++
sqlReq += fmt.Sprintf(" utm_medium = LEFT($%d, 8000),", varsCounter)
sqlArgs = append(sqlArgs, *su.utmMedium)
}
if su.utmCampaign != nil {
varsCounter++
sqlReq += fmt.Sprintf(" utm_campaign = LEFT($%d, 8000),", varsCounter)
sqlArgs = append(sqlArgs, *su.utmCampaign)
}
if varsCounter == 0 {
return "", nil
}
varsCounter++
sqlReq = sqlReq[:len(sqlReq)-1] + fmt.Sprintf(" WHERE session_id = $%d", varsCounter)
sqlArgs = append(sqlArgs, su.sessionID)
return sqlReq, sqlArgs
}