Sessions refactoring (#1371)

* feat(backend): moved sql requests related to sessions table to one place

* feat(backend): refactoring in db.Saver handler

* feat(backend): hude refactoring in db/postgres module

* fix(backend): workable feature flags

* fix(backend): workable integrations

* fix(backend): workable sessions and projects modules

* fix(backend): added missed projects module to sessions

* feat(backend): renaming

* feat(backend): moved session struct to sessions module and split methods into interface, cache and storage levels

* feat(backend): moved project struct to projects module

* feat(backend): added projects model

* feat(backend): implemented new in memory cache for sessions and projects

* feat(backend): implemented new cache in projects

* feat(backend): there are 2 methods in cache module now: Get() and GetAndRefresh()

* feat(backend): added cache update operations

* fix(backend): fixed import cycle

* fix(backend): fixed panic in db message handler

* fix(backend): fixed panic in projects module

* fix(backend): fixed panic in sessions.GetDuration

* feat(backend): added direct call to get session duration if session is already in cache

* feat(backend): used pg pool everywhere except db service

* fix(backend): added missing part after rebase

* fix(backend): removed old sessions file

* feat(backend): added refactored redis client with produce/consume options

* feat(backend): added cache layer for projects

* fix(backend): added missing redis config

* fix(backend): added missing method for producer

* feat(backend): cache integration for sessions

* feat(backend): temporary method to get session directly from db

* feat(backend): adapt EE version of message handler

* fix(backend): fixed issue in fts realisation

* fix(backend): added redis cache to sessions module

* fix(backend): set 0 duration or hesitation time for inputs without focus event

* feat(backend): added cache for session updates and failover mechanism for batch.Insert() operation

* feat(backend): debug log

* feat(backend): more debug log

* feat(backend): removed debug log

* fix(backend): fixed an issue of tracking input events with empty label

* fix(backend): disabled debug log in projects cache

* fix(backend): renamed session updater

* fix(backend): fixed closed pool issue in DB service

* fix(backend): fixed dead lock in db Stop() method

* fix(backend): fixed panic in heuristics service

* feat(backend): enabled redis cache in projects

* feat(backend): clear cache on each update operation

* feat(backend): fully integrated cache layer with auto switch

* feat(backend): small refactoring in session updates

* fix(backend): fixed wrong events counter issue

* feat(backend): enabled full cache support in ender and http services

* fix(backend/ee): added missed import

* feat(backend): added second cache layer for db to speed up the service

* feat(backend): disable redis cache

* feat(backend): moved redis cache to ee
This commit is contained in:
Alexander 2023-07-06 10:55:43 +02:00 committed by GitHub
parent 391c11c7ab
commit 4b8f3bee25
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
64 changed files with 2402 additions and 1676 deletions

View file

@ -2,17 +2,19 @@ package main
import (
"log"
"openreplay/backend/pkg/memory"
config "openreplay/backend/internal/config/db"
"openreplay/backend/internal/db"
"openreplay/backend/internal/db/datasaver"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/db/redis"
"openreplay/backend/pkg/memory"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/metrics"
databaseMetrics "openreplay/backend/pkg/metrics/database"
"openreplay/backend/pkg/projects"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/sessions"
"openreplay/backend/pkg/terminator"
)
@ -24,13 +26,30 @@ func main() {
cfg := config.New()
// Init database
pg := cache.NewPGCache(
postgres.NewConn(cfg.Postgres.String(), cfg.BatchQueueLimit, cfg.BatchSizeLimit), cfg.ProjectExpiration)
// Init postgres connection
pgConn, err := pool.New(cfg.Postgres.String())
if err != nil {
log.Printf("can't init postgres connection: %s", err)
return
}
defer pgConn.Close()
// Init events module
pg := postgres.NewConn(pgConn)
defer pg.Close()
// Init redis connection
redisClient, err := redis.New(&cfg.Redis)
if err != nil {
log.Printf("can't init redis connection: %s", err)
}
defer redisClient.Close()
projManager := projects.New(pgConn, redisClient)
sessManager := sessions.New(pgConn, projManager, redisClient)
// Init data saver
saver := datasaver.New(cfg, pg)
saver := datasaver.New(cfg, pg, sessManager)
// Message filter
msgFilter := []int{messages.MsgMetadata, messages.MsgIssueEvent, messages.MsgSessionStart, messages.MsgSessionEnd,
@ -62,7 +81,8 @@ func main() {
}
// Run service and wait for TERM signal
service := db.New(cfg, consumer, saver, memoryManager)
service := db.New(cfg, consumer, saver, memoryManager, sessManager)
log.Printf("Db service started\n")
terminator.Wait(service)
log.Printf("Db service stopped\n")
}

View file

@ -2,7 +2,11 @@ package main
import (
"log"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/db/redis"
"openreplay/backend/pkg/memory"
"openreplay/backend/pkg/projects"
"openreplay/backend/pkg/sessions"
"os"
"os/signal"
"strings"
@ -12,8 +16,6 @@ import (
"openreplay/backend/internal/config/ender"
"openreplay/backend/internal/sessionender"
"openreplay/backend/internal/storage"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/intervals"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/metrics"
@ -31,10 +33,25 @@ func main() {
cfg := ender.New()
pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres.String(), 0, 0), cfg.ProjectExpiration)
defer pg.Close()
// Init postgres connection
pgConn, err := pool.New(cfg.Postgres.String())
if err != nil {
log.Printf("can't init postgres connection: %s", err)
return
}
defer pgConn.Close()
sessions, err := sessionender.New(intervals.EVENTS_SESSION_END_TIMEOUT, cfg.PartitionsNumber)
// Init redis connection
redisClient, err := redis.New(&cfg.Redis)
if err != nil {
log.Printf("can't init redis connection: %s", err)
}
defer redisClient.Close()
projManager := projects.New(pgConn, redisClient)
sessManager := sessions.New(pgConn, projManager, redisClient)
sessionEndGenerator, err := sessionender.New(intervals.EVENTS_SESSION_END_TIMEOUT, cfg.PartitionsNumber)
if err != nil {
log.Printf("can't init ender service: %s", err)
return
@ -45,7 +62,7 @@ func main() {
cfg.GroupEnder,
[]string{cfg.TopicRawWeb},
messages.NewEnderMessageIterator(
func(msg messages.Message) { sessions.UpdateSession(msg) },
func(msg messages.Message) { sessionEndGenerator.UpdateSession(msg) },
[]int{messages.MsgTimestamp},
false),
false,
@ -79,13 +96,13 @@ func main() {
duplicatedSessionEnds := make(map[uint64]uint64)
// Find ended sessions and send notification to other services
sessions.HandleEndedSessions(func(sessionID uint64, timestamp uint64) bool {
sessionEndGenerator.HandleEndedSessions(func(sessionID uint64, timestamp uint64) bool {
msg := &messages.SessionEnd{Timestamp: timestamp}
currDuration, err := pg.GetSessionDuration(sessionID)
currDuration, err := sessManager.GetDuration(sessionID)
if err != nil {
log.Printf("getSessionDuration failed, sessID: %d, err: %s", sessionID, err)
}
newDuration, err := pg.InsertSessionEnd(sessionID, msg.Timestamp)
newDuration, err := sessManager.UpdateDuration(sessionID, msg.Timestamp)
if err != nil {
if strings.Contains(err.Error(), "integer out of range") {
// Skip session with broken duration
@ -102,7 +119,7 @@ func main() {
}
if cfg.UseEncryption {
if key := storage.GenerateEncryptionKey(); key != nil {
if err := pg.InsertSessionEncryptionKey(sessionID, key); err != nil {
if err := sessManager.UpdateEncryptionKey(sessionID, key); err != nil {
log.Printf("can't save session encryption key: %s, session will not be encrypted", err)
} else {
msg.EncryptionKey = string(key)

View file

@ -4,6 +4,7 @@ import (
"log"
config "openreplay/backend/internal/config/heuristics"
"openreplay/backend/internal/heuristics"
"openreplay/backend/pkg/builders"
"openreplay/backend/pkg/handlers"
"openreplay/backend/pkg/handlers/custom"
"openreplay/backend/pkg/handlers/web"
@ -12,7 +13,6 @@ import (
"openreplay/backend/pkg/metrics"
heuristicsMetrics "openreplay/backend/pkg/metrics/heuristics"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/sessions"
"openreplay/backend/pkg/terminator"
)
@ -37,7 +37,7 @@ func main() {
}
}
eventBuilder := sessions.NewBuilderMap(handlersFabric)
eventBuilder := builders.NewBuilderMap(handlersFabric)
producer := queue.NewProducer(cfg.MessageSizeLimit, true)
consumer := queue.NewConsumer(
cfg.GroupHeuristics,
@ -60,4 +60,5 @@ func main() {
service := heuristics.New(cfg, producer, consumer, eventBuilder, memoryManager)
log.Printf("Heuristics service started\n")
terminator.Wait(service)
log.Printf("Heuristics service stopped\n")
}

View file

@ -2,6 +2,8 @@ package main
import (
"log"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/db/redis"
"os"
"os/signal"
"syscall"
@ -10,8 +12,6 @@ import (
"openreplay/backend/internal/http/router"
"openreplay/backend/internal/http/server"
"openreplay/backend/internal/http/services"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/metrics"
databaseMetrics "openreplay/backend/pkg/metrics/database"
httpMetrics "openreplay/backend/pkg/metrics/http"
@ -31,12 +31,23 @@ func main() {
producer := queue.NewProducer(cfg.MessageSizeLimit, true)
defer producer.Close(15000)
// Connect to database
dbConn := cache.NewPGCache(postgres.NewConn(cfg.Postgres.String(), 0, 0), cfg.ProjectExpiration)
defer dbConn.Close()
// Init postgres connection
pgConn, err := pool.New(cfg.Postgres.String())
if err != nil {
log.Printf("can't init postgres connection: %s", err)
return
}
defer pgConn.Close()
// Init redis connection
redisClient, err := redis.New(&cfg.Redis)
if err != nil {
log.Printf("can't init redis connection: %s", err)
}
defer redisClient.Close()
// Build all services
services, err := services.New(cfg, producer, dbConn)
services, err := services.New(cfg, producer, pgConn, redisClient)
if err != nil {
log.Fatalf("failed while creating services: %s", err)
}

View file

@ -2,6 +2,8 @@ package main
import (
"log"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/integrations"
"os"
"os/signal"
"syscall"
@ -9,7 +11,6 @@ import (
config "openreplay/backend/internal/config/integrations"
"openreplay/backend/internal/integrations/clientManager"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/intervals"
"openreplay/backend/pkg/metrics"
databaseMetrics "openreplay/backend/pkg/metrics/database"
@ -25,14 +26,30 @@ func main() {
cfg := config.New()
pg := postgres.NewConn(cfg.Postgres.String(), 0, 0)
defer pg.Close()
// Init postgres connection
pgConn, err := pool.New(cfg.Postgres.String())
if err != nil {
log.Printf("can't init postgres connection: %s", err)
return
}
defer pgConn.Close()
tokenizer := token.NewTokenizer(cfg.TokenSecret)
manager := clientManager.NewManager()
pg.IterateIntegrationsOrdered(func(i *postgres.Integration, err error) {
producer := queue.NewProducer(cfg.MessageSizeLimit, true)
defer producer.Close(15000)
// TODO: rework with integration manager
listener, err := integrations.New(pgConn, cfg.Postgres.String())
if err != nil {
log.Printf("Postgres listener error: %v\n", err)
log.Fatalf("Postgres listener error")
}
defer listener.Close()
listener.IterateIntegrationsOrdered(func(i *integrations.Integration, err error) {
if err != nil {
log.Printf("Postgres error: %v\n", err)
return
@ -45,16 +62,6 @@ func main() {
}
})
producer := queue.NewProducer(cfg.MessageSizeLimit, true)
defer producer.Close(15000)
listener, err := postgres.NewIntegrationsListener(cfg.Postgres.String())
if err != nil {
log.Printf("Postgres listener error: %v\n", err)
log.Fatalf("Postgres listener error")
}
defer listener.Close()
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
@ -67,7 +74,7 @@ func main() {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
listener.Close()
pg.Close()
pgConn.Close()
os.Exit(0)
case <-tick:
log.Printf("Requesting all...\n")
@ -88,13 +95,13 @@ func main() {
log.Printf("Integration error: %v\n", err)
case i := <-manager.RequestDataUpdates:
// log.Printf("Last request integration update: %v || %v\n", i, string(i.RequestData))
if err := pg.UpdateIntegrationRequestData(&i); err != nil {
if err := listener.UpdateIntegrationRequestData(&i); err != nil {
log.Printf("Postgres Update request_data error: %v\n", err)
}
case err := <-listener.Errors:
log.Printf("Postgres listen error: %v\n", err)
listener.Close()
pg.Close()
pgConn.Close()
os.Exit(0)
case iPointer := <-listener.Integrations:
log.Printf("Integration update: %v\n", *iPointer)

View file

@ -3,12 +3,14 @@ package db
import (
"openreplay/backend/internal/config/common"
"openreplay/backend/internal/config/configurator"
"openreplay/backend/internal/config/redis"
"time"
)
type Config struct {
common.Config
common.Postgres
redis.Redis
ProjectExpiration time.Duration `env:"PROJECT_EXPIRATION,default=10m"`
LoggerTimeout int `env:"LOG_QUEUE_STATS_INTERVAL_SEC,required"`
GroupDB string `env:"GROUP_DB,required"`

View file

@ -3,12 +3,14 @@ package ender
import (
"openreplay/backend/internal/config/common"
"openreplay/backend/internal/config/configurator"
"openreplay/backend/internal/config/redis"
"time"
)
type Config struct {
common.Config
common.Postgres
redis.Redis
ProjectExpiration time.Duration `env:"PROJECT_EXPIRATION,default=10m"`
GroupEnder string `env:"GROUP_ENDER,required"`
LoggerTimeout int `env:"LOG_QUEUE_STATS_INTERVAL_SEC,required"`

View file

@ -3,6 +3,7 @@ package http
import (
"openreplay/backend/internal/config/common"
"openreplay/backend/internal/config/configurator"
"openreplay/backend/internal/config/redis"
"openreplay/backend/pkg/env"
"time"
)
@ -10,6 +11,7 @@ import (
type Config struct {
common.Config
common.Postgres
redis.Redis
HTTPHost string `env:"HTTP_HOST,default="`
HTTPPort string `env:"HTTP_PORT,required"`
HTTPTimeout time.Duration `env:"HTTP_TIMEOUT,default=60s"`

View file

@ -0,0 +1,12 @@
package redis
import "time"
type Redis struct {
ConnectionURL string `env:"REDIS_STRING"`
MaxLength int64 `env:"REDIS_STREAMS_MAX_LEN,default=100000"`
ReadCount int64 `env:"REDIS_STREAMS_READ_COUNT,default=1"`
ReadBlockDuration time.Duration `env:"REDIS_STREAMS_READ_BLOCK_DURATION,default=200ms"`
CloseTimeout time.Duration `env:"REDIS_STREAMS_CLOSE_TIMEOUT,default=5s"`
UseRedisCache bool `env:"REDIS_CACHE_ENABLED,default=false"`
}

View file

@ -1,15 +1,16 @@
package datasaver
import (
"errors"
"log"
"openreplay/backend/internal/config/db"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/clickhouse"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/db/types"
. "openreplay/backend/pkg/messages"
queue "openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/sessions"
)
type Saver interface {
@ -20,13 +21,18 @@ type Saver interface {
type saverImpl struct {
cfg *db.Config
pg *cache.PGCache
pg *postgres.Conn
sessions sessions.Sessions
ch clickhouse.Connector
producer queue.Producer
}
func New(cfg *db.Config, pg *cache.PGCache) Saver {
s := &saverImpl{cfg: cfg, pg: pg}
func New(cfg *db.Config, pg *postgres.Conn, session sessions.Sessions) Saver {
s := &saverImpl{
cfg: cfg,
pg: pg,
sessions: session,
}
s.init()
return s
}
@ -48,11 +54,22 @@ func (s *saverImpl) Handle(msg Message) {
}
func (s *saverImpl) handleMessage(msg Message) error {
session, err := s.sessions.Get(msg.SessionID())
if err != nil {
return err
}
switch m := msg.(type) {
case *SessionStart:
return s.pg.HandleStartEvent(m)
case *SessionEnd:
return s.pg.HandleEndEvent(m.SessionID())
case *Metadata:
return s.pg.InsertMetadata(m)
return s.sessions.UpdateMetadata(m.SessionID(), m.Key, m.Value)
case *IssueEvent:
return s.pg.InsertIssueEvent(m)
if err = s.pg.InsertIssueEvent(session, m); err != nil {
return err
}
return s.sessions.UpdateIssuesStats(session.SessionID, 0, postgres.GetIssueScore(m))
case *CustomIssue:
ie := &IssueEvent{
Type: "custom",
@ -62,35 +79,63 @@ func (s *saverImpl) handleMessage(msg Message) error {
Payload: m.Payload,
}
ie.SetMeta(m.Meta())
return s.pg.InsertIssueEvent(ie)
case *SessionStart:
return s.pg.HandleWebSessionStart(m)
case *SessionEnd:
return s.pg.HandleWebSessionEnd(m)
if err = s.pg.InsertIssueEvent(session, ie); err != nil {
return err
}
return s.sessions.UpdateIssuesStats(session.SessionID, 0, postgres.GetIssueScore(ie))
case *UserID:
return s.pg.InsertWebUserID(m)
if err = s.sessions.UpdateUserID(session.SessionID, m.ID); err != nil {
return err
}
s.pg.InsertAutocompleteValue(session.SessionID, session.ProjectID, "USERID", m.ID)
return nil
case *UserAnonymousID:
return s.pg.InsertWebUserAnonymousID(m)
if err = s.sessions.UpdateAnonymousID(session.SessionID, m.ID); err != nil {
return err
}
s.pg.InsertAutocompleteValue(session.SessionID, session.ProjectID, "USERANONYMOUSID", m.ID)
return nil
case *CustomEvent:
return s.pg.InsertWebCustomEvent(m)
return s.pg.InsertWebCustomEvent(session, m)
case *MouseClick:
return s.pg.InsertWebClickEvent(m)
if err = s.pg.InsertWebClickEvent(session, m); err != nil {
return err
}
return s.sessions.UpdateEventsStats(session.SessionID, 1, 0)
case *InputEvent:
return s.pg.InsertWebInputEvent(m)
if err = s.pg.InsertWebInputEvent(session, m); err != nil {
if errors.Is(err, postgres.EmptyLabel) {
return nil
}
return err
}
return s.sessions.UpdateEventsStats(session.SessionID, 1, 0)
case *PageEvent:
return s.pg.InsertWebPageEvent(m)
if err = s.pg.InsertWebPageEvent(session, m); err != nil {
return err
}
return s.sessions.UpdateEventsStats(session.SessionID, 1, 1)
case *NetworkRequest:
return s.pg.InsertWebNetworkRequest(m)
return s.pg.InsertWebNetworkRequest(session, m)
case *GraphQL:
return s.pg.InsertWebGraphQL(m)
return s.pg.InsertWebGraphQL(session, m)
case *JSException:
return s.pg.InsertWebJSException(m)
if err = s.pg.InsertWebErrorEvent(session, types.WrapJSException(m)); err != nil {
return err
}
return s.sessions.UpdateIssuesStats(session.SessionID, 0, 1000)
case *IntegrationEvent:
return s.pg.InsertWebIntegrationEvent(m)
return s.pg.InsertWebErrorEvent(session, types.WrapIntegrationEvent(m))
case *InputChange:
return s.pg.InsertWebInputDuration(m)
if err = s.pg.InsertInputChangeEvent(session, m); err != nil {
return err
}
return s.sessions.UpdateEventsStats(session.SessionID, 1, 0)
case *MouseThrashing:
return s.pg.InsertMouseThrashing(m)
if err = s.pg.InsertMouseThrashing(session, m); err != nil {
return err
}
return s.sessions.UpdateIssuesStats(session.SessionID, 0, 50)
}
return nil
}

View file

@ -3,6 +3,7 @@ package db
import (
"log"
"openreplay/backend/pkg/memory"
"openreplay/backend/pkg/sessions"
"time"
"openreplay/backend/internal/config/db"
@ -16,27 +17,43 @@ type dbImpl struct {
consumer types.Consumer
saver datasaver.Saver
mm memory.Manager
sessions sessions.Sessions
done chan struct{}
finished chan struct{}
}
func New(cfg *db.Config, consumer types.Consumer, saver datasaver.Saver, mm memory.Manager) service.Interface {
func New(cfg *db.Config, consumer types.Consumer, saver datasaver.Saver, mm memory.Manager, sessions sessions.Sessions) service.Interface {
s := &dbImpl{
cfg: cfg,
consumer: consumer,
saver: saver,
mm: mm,
sessions: sessions,
done: make(chan struct{}),
finished: make(chan struct{}),
}
go s.run()
return s
}
func (d *dbImpl) run() {
sessionsCommitTick := time.Tick(time.Second * 3)
commitTick := time.Tick(d.cfg.CommitBatchTimeout)
for {
select {
case <-sessionsCommitTick:
d.sessions.Commit()
case <-commitTick:
d.commit()
case msg := <-d.consumer.Rebalanced():
log.Println(msg)
case <-d.done:
d.commit()
if err := d.saver.Close(); err != nil {
log.Printf("saver.Close error: %s", err)
}
d.consumer.Close()
d.finished <- struct{}{}
default:
if !d.mm.HasFreeMemory() {
continue
@ -50,13 +67,11 @@ func (d *dbImpl) run() {
func (d *dbImpl) commit() {
d.saver.Commit()
d.sessions.Commit()
d.consumer.Commit()
}
func (d *dbImpl) Stop() {
d.commit()
if err := d.saver.Close(); err != nil {
log.Printf("saver.Close error: %s", err)
}
d.consumer.Close()
d.done <- struct{}{}
<-d.finished
}

View file

@ -10,25 +10,29 @@ import (
"openreplay/backend/internal/config/heuristics"
"openreplay/backend/internal/service"
"openreplay/backend/pkg/builders"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/sessions"
)
type heuristicsImpl struct {
cfg *heuristics.Config
producer types.Producer
consumer types.Consumer
events sessions.EventBuilder
events builders.EventBuilder
mm memory.Manager
done chan struct{}
finished chan struct{}
}
func New(cfg *heuristics.Config, p types.Producer, c types.Consumer, e sessions.EventBuilder, mm memory.Manager) service.Interface {
func New(cfg *heuristics.Config, p types.Producer, c types.Consumer, e builders.EventBuilder, mm memory.Manager) service.Interface {
s := &heuristicsImpl{
cfg: cfg,
producer: p,
consumer: c,
events: e,
mm: mm,
done: make(chan struct{}),
finished: make(chan struct{}),
}
go s.run()
return s
@ -49,6 +53,19 @@ func (h *heuristicsImpl) run() {
h.consumer.Commit()
case msg := <-h.consumer.Rebalanced():
log.Println(msg)
case <-h.done:
// Stop event builder and flush all events
log.Println("stopping heuristics service")
h.events.Stop()
for evt := range h.events.Events() {
if err := h.producer.Produce(h.cfg.TopicAnalytics, evt.SessionID(), evt.Encode()); err != nil {
log.Printf("can't send new event to queue: %s", err)
}
}
h.producer.Close(h.cfg.ProducerTimeout)
h.consumer.Commit()
h.consumer.Close()
h.finished <- struct{}{}
default:
if !h.mm.HasFreeMemory() {
continue
@ -61,17 +78,8 @@ func (h *heuristicsImpl) run() {
}
func (h *heuristicsImpl) Stop() {
// Stop event builder and flush all events
log.Println("stopping heuristics service")
h.events.Stop()
for evt := range h.events.Events() {
if err := h.producer.Produce(h.cfg.TopicAnalytics, evt.SessionID(), evt.Encode()); err != nil {
log.Printf("can't send new event to queue: %s", err)
}
}
h.producer.Close(h.cfg.ProducerTimeout)
h.consumer.Commit()
h.consumer.Close()
h.done <- struct{}{}
<-h.finished
}
func messageTypeName(msg messages.Message) string {

View file

@ -9,6 +9,7 @@ import (
"math/rand"
"net/http"
"openreplay/backend/pkg/featureflags"
"openreplay/backend/pkg/sessions"
"strconv"
"time"
@ -102,7 +103,7 @@ func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request)
return
}
p, err := e.services.Database.GetProjectByKey(*req.ProjectKey)
p, err := e.services.Projects.GetProjectByKey(*req.ProjectKey)
if err != nil {
if postgres.IsNoRowsErr(err) {
ResponseWithError(w, http.StatusNotFound,
@ -165,7 +166,28 @@ func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request)
}
// Save sessionStart to db
if err := e.services.Database.InsertWebSessionStart(sessionID, sessionStart, geoInfo); err != nil {
if err := e.services.Sessions.Add(&sessions.Session{
SessionID: sessionID,
Platform: "web",
Timestamp: sessionStart.Timestamp,
ProjectID: uint32(sessionStart.ProjectID),
TrackerVersion: sessionStart.TrackerVersion,
RevID: sessionStart.RevID,
UserUUID: sessionStart.UserUUID,
UserOS: sessionStart.UserOS,
UserOSVersion: sessionStart.UserOSVersion,
UserDevice: sessionStart.UserDevice,
UserCountry: geoInfo.Country,
UserState: geoInfo.State,
UserCity: geoInfo.City,
UserAgent: sessionStart.UserAgent,
UserBrowser: sessionStart.UserBrowser,
UserBrowserVersion: sessionStart.UserBrowserVersion,
UserDeviceType: sessionStart.UserDeviceType,
UserDeviceMemorySize: sessionStart.UserDeviceMemorySize,
UserDeviceHeapSize: sessionStart.UserDeviceHeapSize,
UserID: &sessionStart.UserID,
}); err != nil {
log.Printf("can't insert session start: %s", err)
}
@ -268,7 +290,7 @@ func (e *Router) notStartedHandlerWeb(w http.ResponseWriter, r *http.Request) {
return
}
geoInfo := e.ExtractGeoData(r)
err = e.services.Database.InsertUnstartedSession(postgres.UnstartedSession{
err = e.services.Sessions.AddUnStarted(&sessions.UnStartedSession{
ProjectKey: *req.ProjectKey,
TrackerVersion: req.TrackerVersion,
DoNotTrack: req.DoNotTrack,
@ -324,23 +346,12 @@ func (e *Router) featureFlagsHandlerWeb(w http.ResponseWriter, r *http.Request)
return
}
// Grab flags and conditions for project
projectID, err := strconv.ParseUint(req.ProjectID, 10, 32)
if err != nil {
ResponseWithError(w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
flags, err := e.services.Database.GetFeatureFlags(uint32(projectID))
computedFlags, err := e.services.FeatureFlags.ComputeFlagsForSession(req)
if err != nil {
ResponseWithError(w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
computedFlags, err := featureflags.ComputeFeatureFlags(flags, req)
if err != nil {
ResponseWithError(w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
resp := &featureflags.FeatureFlagsResponse{
Flags: computedFlags,
}

View file

@ -4,28 +4,37 @@ import (
"openreplay/backend/internal/config/http"
"openreplay/backend/internal/http/geoip"
"openreplay/backend/internal/http/uaparser"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/db/redis"
"openreplay/backend/pkg/featureflags"
"openreplay/backend/pkg/flakeid"
"openreplay/backend/pkg/projects"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/sessions"
"openreplay/backend/pkg/token"
)
type ServicesBuilder struct {
Database *cache.PGCache
Producer types.Producer
Flaker *flakeid.Flaker
UaParser *uaparser.UAParser
GeoIP geoip.GeoParser
Tokenizer *token.Tokenizer
Projects projects.Projects
Sessions sessions.Sessions
FeatureFlags featureflags.FeatureFlags
Producer types.Producer
Flaker *flakeid.Flaker
UaParser *uaparser.UAParser
GeoIP geoip.GeoParser
Tokenizer *token.Tokenizer
}
func New(cfg *http.Config, producer types.Producer, pgconn *cache.PGCache) (*ServicesBuilder, error) {
func New(cfg *http.Config, producer types.Producer, pgconn pool.Pool, redis *redis.Client) (*ServicesBuilder, error) {
projs := projects.New(pgconn, redis)
return &ServicesBuilder{
Database: pgconn,
Producer: producer,
Tokenizer: token.NewTokenizer(cfg.TokenSecret),
UaParser: uaparser.NewUAParser(cfg.UAParserFile),
GeoIP: geoip.New(cfg.MaxMinDBFile),
Flaker: flakeid.NewFlaker(cfg.WorkerID),
Projects: projs,
Sessions: sessions.New(pgconn, projs, redis),
FeatureFlags: featureflags.New(pgconn),
Producer: producer,
Tokenizer: token.NewTokenizer(cfg.TokenSecret),
UaParser: uaparser.NewUAParser(cfg.UAParserFile),
GeoIP: geoip.New(cfg.MaxMinDBFile),
Flaker: flakeid.NewFlaker(cfg.WorkerID),
}, nil
}

View file

@ -2,29 +2,28 @@ package clientManager
import (
"openreplay/backend/internal/integrations/integration"
"openreplay/backend/pkg/integrations"
"strconv"
"openreplay/backend/pkg/db/postgres"
)
type manager struct {
clientMap integration.ClientMap
Events chan *integration.SessionErrorEvent
Errors chan error
RequestDataUpdates chan postgres.Integration // not pointer because it could change in other thread
RequestDataUpdates chan integrations.Integration // not pointer because it could change in other thread
}
func NewManager() *manager {
return &manager{
clientMap: make(integration.ClientMap),
RequestDataUpdates: make(chan postgres.Integration, 100),
RequestDataUpdates: make(chan integrations.Integration, 100),
Events: make(chan *integration.SessionErrorEvent, 100),
Errors: make(chan error, 100),
}
}
func (m *manager) Update(i *postgres.Integration) error {
func (m *manager) Update(i *integrations.Integration) error {
key := strconv.Itoa(int(i.ProjectID)) + i.Provider
if i.Options == nil {
delete(m.clientMap, key)

View file

@ -4,10 +4,10 @@ import (
"encoding/json"
"fmt"
"log"
"openreplay/backend/pkg/integrations"
"sync"
"time"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/messages"
)
@ -29,10 +29,10 @@ type requestData struct {
type client struct {
requestData
requester
integration *postgres.Integration
integration *integrations.Integration
// TODO: timeout ?
mux sync.Mutex
updateChan chan<- postgres.Integration
updateChan chan<- integrations.Integration
evChan chan<- *SessionErrorEvent
errChan chan<- error
}
@ -45,7 +45,7 @@ type SessionErrorEvent struct {
type ClientMap map[string]*client
func NewClient(i *postgres.Integration, updateChan chan<- postgres.Integration, evChan chan<- *SessionErrorEvent, errChan chan<- error) (*client, error) {
func NewClient(i *integrations.Integration, updateChan chan<- integrations.Integration, evChan chan<- *SessionErrorEvent, errChan chan<- error) (*client, error) {
c := new(client)
if err := c.Update(i); err != nil {
return nil, err
@ -67,7 +67,7 @@ func NewClient(i *postgres.Integration, updateChan chan<- postgres.Integration,
}
// from outside
func (c *client) Update(i *postgres.Integration) error {
func (c *client) Update(i *integrations.Integration) error {
c.mux.Lock()
defer c.mux.Unlock()
var r requester

View file

@ -1,4 +1,4 @@
package sessions
package builders
import (
"log"

View file

@ -1,4 +1,4 @@
package sessions
package builders
import (
"log"

72
backend/pkg/cache/cache.go vendored Normal file
View file

@ -0,0 +1,72 @@
package cache
import (
"sync"
"time"
)
type Cache interface {
Set(key, value interface{})
Get(key interface{}) (interface{}, bool)
GetAndRefresh(key interface{}) (interface{}, bool)
}
type item struct {
data interface{}
lastUsage time.Time
}
type cacheImpl struct {
mutex sync.Mutex
items map[interface{}]item
}
func New(cleaningInterval, itemDuration time.Duration) Cache {
cache := &cacheImpl{items: make(map[interface{}]item)}
go func() {
cleanTick := time.Tick(cleaningInterval)
for {
select {
case <-cleanTick:
cache.mutex.Lock()
now := time.Now()
for k, v := range cache.items {
if now.Sub(v.lastUsage) > itemDuration {
delete(cache.items, k)
}
}
cache.mutex.Unlock()
}
}
}()
return cache
}
func (c *cacheImpl) Set(key, value interface{}) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.items[key] = item{
data: value,
lastUsage: time.Now(),
}
}
func (c *cacheImpl) Get(key interface{}) (interface{}, bool) {
c.mutex.Lock()
defer c.mutex.Unlock()
if v, ok := c.items[key]; ok {
return v.data, ok
}
return nil, false
}
func (c *cacheImpl) GetAndRefresh(key interface{}) (interface{}, bool) {
c.mutex.Lock()
defer c.mutex.Unlock()
v, ok := c.items[key]
if ok {
v.lastUsage = time.Now()
c.items[key] = v
}
return v.data, ok
}

View file

@ -1,73 +0,0 @@
package cache
import (
"log"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/db/types"
"sync"
"time"
)
type SessionMeta struct {
*types.Session
lastUse time.Time
}
type ProjectMeta struct {
*types.Project
expirationTime time.Time
}
type Cache interface {
SetSession(sess *types.Session)
HasSession(sessID uint64) bool
GetSession(sessID uint64) (*types.Session, error)
GetProject(projectID uint32) (*types.Project, error)
GetProjectByKey(projectKey string) (*types.Project, error)
}
type cacheImpl struct {
conn *postgres.Conn
mutex sync.RWMutex
sessions map[uint64]*SessionMeta
projects map[uint32]*ProjectMeta
projectsByKeys sync.Map
projectExpirationTimeout time.Duration
}
func NewCache(conn *postgres.Conn, projectExpiration time.Duration) Cache {
newCache := &cacheImpl{
conn: conn,
sessions: make(map[uint64]*SessionMeta),
projects: make(map[uint32]*ProjectMeta),
projectExpirationTimeout: projectExpiration,
}
go newCache.cleaner()
return newCache
}
func (c *cacheImpl) cleaner() {
cleanTick := time.Tick(time.Minute * 5)
for {
select {
case <-cleanTick:
c.clearCache()
}
}
}
func (c *cacheImpl) clearCache() {
c.mutex.Lock()
defer c.mutex.Unlock()
now := time.Now()
cacheSize := len(c.sessions)
deleted := 0
for id, sess := range c.sessions {
if now.Sub(sess.lastUse).Minutes() > 3 {
deleted++
delete(c.sessions, id)
}
}
log.Printf("cache cleaner: deleted %d/%d sessions", deleted, cacheSize)
}

View file

@ -1,61 +0,0 @@
package cache
import (
"log"
. "openreplay/backend/pkg/messages"
"time"
)
func (c *PGCache) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64, error) {
return c.Conn.InsertSessionEnd(sessionID, timestamp)
}
func (c *PGCache) InsertSessionEncryptionKey(sessionID uint64, key []byte) error {
return c.Conn.InsertSessionEncryptionKey(sessionID, key)
}
func (c *PGCache) HandleSessionEnd(sessionID uint64) error {
if err := c.Conn.HandleSessionEnd(sessionID); err != nil {
log.Printf("can't handle session end: %s", err)
}
return nil
}
func (c *PGCache) InsertIssueEvent(crash *IssueEvent) error {
sessionID := crash.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
}
return c.Conn.InsertIssueEvent(sessionID, session.ProjectID, crash)
}
func (c *PGCache) InsertMetadata(metadata *Metadata) error {
sessionID := metadata.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
}
project, err := c.Cache.GetProject(session.ProjectID)
if err != nil {
return err
}
keyNo := project.GetMetadataNo(metadata.Key)
if keyNo == 0 {
// TODO: insert project metadata
return nil
}
if err := c.Conn.InsertMetadata(sessionID, keyNo, metadata.Value); err != nil {
// Try to insert metadata after one minute
time.AfterFunc(time.Minute, func() {
if err := c.Conn.InsertMetadata(sessionID, keyNo, metadata.Value); err != nil {
log.Printf("metadata retry err: %s", err)
}
})
return err
}
session.SetMetadata(keyNo, metadata.Value)
return nil
}

View file

@ -1,203 +0,0 @@
package cache
import (
"fmt"
"openreplay/backend/internal/http/geoip"
. "openreplay/backend/pkg/db/types"
. "openreplay/backend/pkg/messages"
)
func (c *PGCache) InsertWebSessionStart(sessionID uint64, s *SessionStart, geo *geoip.GeoRecord) error {
return c.Conn.InsertSessionStart(sessionID, &Session{
SessionID: sessionID,
Platform: "web",
Timestamp: s.Timestamp,
ProjectID: uint32(s.ProjectID),
TrackerVersion: s.TrackerVersion,
RevID: s.RevID,
UserUUID: s.UserUUID,
UserOS: s.UserOS,
UserOSVersion: s.UserOSVersion,
UserDevice: s.UserDevice,
UserCountry: geo.Country,
UserState: geo.State,
UserCity: geo.City,
UserAgent: s.UserAgent,
UserBrowser: s.UserBrowser,
UserBrowserVersion: s.UserBrowserVersion,
UserDeviceType: s.UserDeviceType,
UserDeviceMemorySize: s.UserDeviceMemorySize,
UserDeviceHeapSize: s.UserDeviceHeapSize,
UserID: &s.UserID,
})
}
func (c *PGCache) HandleWebSessionStart(s *SessionStart) error {
sessionID := s.SessionID()
if c.Cache.HasSession(sessionID) {
return fmt.Errorf("session %d already in cache", sessionID)
}
geoInfo := geoip.UnpackGeoRecord(s.UserCountry)
newSess := &Session{
SessionID: sessionID,
Platform: "web",
Timestamp: s.Timestamp,
ProjectID: uint32(s.ProjectID),
TrackerVersion: s.TrackerVersion,
RevID: s.RevID,
UserUUID: s.UserUUID,
UserOS: s.UserOS,
UserOSVersion: s.UserOSVersion,
UserDevice: s.UserDevice,
UserCountry: geoInfo.Country,
UserState: geoInfo.State,
UserCity: geoInfo.City,
UserAgent: s.UserAgent,
UserBrowser: s.UserBrowser,
UserBrowserVersion: s.UserBrowserVersion,
UserDeviceType: s.UserDeviceType,
UserDeviceMemorySize: s.UserDeviceMemorySize,
UserDeviceHeapSize: s.UserDeviceHeapSize,
UserID: &s.UserID,
}
c.Cache.SetSession(newSess)
if err := c.Conn.HandleSessionStart(sessionID, newSess); err != nil {
c.Cache.SetSession(nil)
return err
}
return nil
}
func (c *PGCache) InsertWebSessionEnd(sessionID uint64, e *SessionEnd) error {
_, err := c.InsertSessionEnd(sessionID, e.Timestamp)
return err
}
func (c *PGCache) HandleWebSessionEnd(e *SessionEnd) error {
sessionID := e.SessionID()
return c.HandleSessionEnd(sessionID)
}
func (c *PGCache) InsertWebJSException(e *JSException) error {
return c.InsertWebErrorEvent(e.SessionID(), WrapJSException(e))
}
func (c *PGCache) InsertWebIntegrationEvent(e *IntegrationEvent) error {
return c.InsertWebErrorEvent(e.SessionID(), WrapIntegrationEvent(e))
}
func (c *PGCache) InsertWebErrorEvent(sessionID uint64, e *ErrorEvent) error {
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
}
if err := c.Conn.InsertWebErrorEvent(sessionID, session.ProjectID, e); err != nil {
return err
}
session.ErrorsCount += 1
return nil
}
func (c *PGCache) InsertSessionReferrer(sessionID uint64, referrer string) error {
_, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
}
return c.Conn.InsertSessionReferrer(sessionID, referrer)
}
func (c *PGCache) InsertWebNetworkRequest(e *NetworkRequest) error {
sessionID := e.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
}
project, err := c.Cache.GetProject(session.ProjectID)
if err != nil {
return err
}
return c.Conn.InsertWebNetworkRequest(sessionID, session.ProjectID, project.SaveRequestPayloads, e)
}
func (c *PGCache) InsertWebGraphQL(e *GraphQL) error {
sessionID := e.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
}
project, err := c.Cache.GetProject(session.ProjectID)
if err != nil {
return err
}
return c.Conn.InsertWebGraphQL(sessionID, session.ProjectID, project.SaveRequestPayloads, e)
}
func (c *PGCache) InsertWebCustomEvent(e *CustomEvent) error {
sessionID := e.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
}
return c.Conn.InsertWebCustomEvent(sessionID, session.ProjectID, e)
}
func (c *PGCache) InsertWebUserID(userID *UserID) error {
sessionID := userID.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
}
return c.Conn.InsertWebUserID(sessionID, session.ProjectID, userID)
}
func (c *PGCache) InsertWebUserAnonymousID(userAnonymousID *UserAnonymousID) error {
sessionID := userAnonymousID.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
}
return c.Conn.InsertWebUserAnonymousID(sessionID, session.ProjectID, userAnonymousID)
}
func (c *PGCache) InsertWebPageEvent(e *PageEvent) error {
sessionID := e.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
}
return c.Conn.InsertWebPageEvent(sessionID, session.ProjectID, e)
}
func (c *PGCache) InsertWebClickEvent(e *MouseClick) error {
sessionID := e.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
}
return c.Conn.InsertWebClickEvent(sessionID, session.ProjectID, e)
}
func (c *PGCache) InsertWebInputEvent(e *InputEvent) error {
sessionID := e.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
}
return c.Conn.InsertWebInputEvent(sessionID, session.ProjectID, e)
}
func (c *PGCache) InsertWebInputDuration(e *InputChange) error {
sessionID := e.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
}
return c.Conn.InsertWebInputDuration(sessionID, session.ProjectID, e)
}
func (c *PGCache) InsertMouseThrashing(e *MouseThrashing) error {
sessionID := e.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
}
return c.Conn.InsertMouseThrashing(sessionID, session.ProjectID, e)
}

View file

@ -1,21 +0,0 @@
package cache
import (
"openreplay/backend/pkg/db/postgres"
"time"
)
type PGCache struct {
*postgres.Conn
Cache Cache
}
func NewPGCache(conn *postgres.Conn, projectExpiration time.Duration) *PGCache {
// Create in-memory cache layer for sessions and projects
c := NewCache(conn, projectExpiration)
// Return PG wrapper with integrated cache layer
return &PGCache{
Conn: conn,
Cache: c,
}
}

View file

@ -1,39 +0,0 @@
package cache
import (
. "openreplay/backend/pkg/db/types"
"time"
)
func (c *cacheImpl) GetProjectByKey(projectKey string) (*Project, error) {
pmInterface, found := c.projectsByKeys.Load(projectKey)
if found {
if pm, ok := pmInterface.(*ProjectMeta); ok {
if time.Now().Before(pm.expirationTime) {
return pm.Project, nil
}
}
}
p, err := c.conn.GetProjectByKey(projectKey)
if err != nil {
return nil, err
}
//c.projects[ p.ProjectID ] = &ProjectMeta{ p, time.Now().Add(c.projectExpirationTimeout) }
c.projectsByKeys.Store(projectKey, p)
return p, nil
}
func (c *cacheImpl) GetProject(projectID uint32) (*Project, error) {
if c.projects[projectID] != nil &&
time.Now().Before(c.projects[projectID].expirationTime) {
return c.projects[projectID].Project, nil
}
p, err := c.conn.GetProject(projectID)
if err != nil {
return nil, err
}
c.projects[projectID] = &ProjectMeta{p, time.Now().Add(c.projectExpirationTimeout)}
//c.projectsByKeys.Store(p.ProjectKey, c.projects[ projectID ])
return p, nil
}

View file

@ -1,51 +0,0 @@
package cache
import (
"errors"
"github.com/jackc/pgx/v4"
. "openreplay/backend/pkg/db/types"
"time"
)
var NilSessionInCacheError = errors.New("nil session in error")
func (c *cacheImpl) SetSession(sess *Session) {
c.mutex.Lock()
defer c.mutex.Unlock()
if meta, ok := c.sessions[sess.SessionID]; ok {
meta.Session = sess
meta.lastUse = time.Now()
} else {
c.sessions[sess.SessionID] = &SessionMeta{sess, time.Now()}
}
}
func (c *cacheImpl) HasSession(sessID uint64) bool {
c.mutex.RLock()
defer c.mutex.RUnlock()
sess, ok := c.sessions[sessID]
return ok && sess.Session != nil
}
func (c *cacheImpl) GetSession(sessionID uint64) (*Session, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
if s, inCache := c.sessions[sessionID]; inCache {
if s.Session == nil {
return nil, NilSessionInCacheError
}
return s.Session, nil
}
s, err := c.conn.GetSession(sessionID)
if err == pgx.ErrNoRows {
c.sessions[sessionID] = &SessionMeta{nil, time.Now()}
}
if err != nil {
return nil, err
}
c.sessions[sessionID] = &SessionMeta{s, time.Now()}
return s, nil
}

View file

@ -3,22 +3,23 @@ package clickhouse
import (
"openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/sessions"
)
type Connector interface {
Prepare() error
Commit() error
Stop() error
InsertWebSession(session *types.Session) error
InsertWebResourceEvent(session *types.Session, msg *messages.ResourceTiming) error
InsertWebPageEvent(session *types.Session, msg *messages.PageEvent) error
InsertWebClickEvent(session *types.Session, msg *messages.MouseClick) error
InsertWebInputEvent(session *types.Session, msg *messages.InputEvent) error
InsertWebErrorEvent(session *types.Session, msg *types.ErrorEvent) error
InsertWebPerformanceTrackAggr(session *types.Session, msg *messages.PerformanceTrackAggr) error
InsertAutocomplete(session *types.Session, msgType, msgValue string) error
InsertRequest(session *types.Session, msg *messages.NetworkRequest, savePayload bool) error
InsertCustom(session *types.Session, msg *messages.CustomEvent) error
InsertGraphQL(session *types.Session, msg *messages.GraphQL) error
InsertIssue(session *types.Session, msg *messages.IssueEvent) error
InsertWebSession(session *sessions.Session) error
InsertWebResourceEvent(session *sessions.Session, msg *messages.ResourceTiming) error
InsertWebPageEvent(session *sessions.Session, msg *messages.PageEvent) error
InsertWebClickEvent(session *sessions.Session, msg *messages.MouseClick) error
InsertWebInputEvent(session *sessions.Session, msg *messages.InputEvent) error
InsertWebErrorEvent(session *sessions.Session, msg *types.ErrorEvent) error
InsertWebPerformanceTrackAggr(session *sessions.Session, msg *messages.PerformanceTrackAggr) error
InsertAutocomplete(session *sessions.Session, msgType, msgValue string) error
InsertRequest(session *sessions.Session, msg *messages.NetworkRequest, savePayload bool) error
InsertCustom(session *sessions.Session, msg *messages.CustomEvent) error
InsertGraphQL(session *sessions.Session, msg *messages.GraphQL) error
InsertIssue(session *sessions.Session, msg *messages.IssueEvent) error
}

View file

@ -0,0 +1,137 @@
package batch
import (
"log"
"strings"
"time"
"github.com/jackc/pgx/v4"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/metrics/database"
)
type batchItem struct {
query string
arguments []interface{}
}
type SessionBatch struct {
Batch *pgx.Batch
items []*batchItem
}
func NewSessionBatch() *SessionBatch {
return &SessionBatch{
Batch: &pgx.Batch{},
items: make([]*batchItem, 0),
}
}
func (b *SessionBatch) Queue(query string, arguments ...interface{}) {
b.Batch.Queue(query, arguments...)
b.items = append(b.items, &batchItem{
query: query,
arguments: arguments,
})
}
func (b *SessionBatch) Len() int {
return b.Batch.Len()
}
type batchesTask struct {
batches []*SessionBatch
}
func NewBatchesTask(size int) *batchesTask {
return &batchesTask{batches: make([]*SessionBatch, 0, size)}
}
type BatchSet struct {
c pool.Pool
batches map[uint64]*SessionBatch
workerTask chan *batchesTask
done chan struct{}
finished chan struct{}
}
func NewBatchSet(c pool.Pool) *BatchSet {
bs := &BatchSet{
c: c,
batches: make(map[uint64]*SessionBatch),
workerTask: make(chan *batchesTask, 1),
done: make(chan struct{}),
finished: make(chan struct{}),
}
go bs.worker()
return bs
}
func (conn *BatchSet) getBatch(sessionID uint64) *SessionBatch {
sessionID = sessionID % 10
if _, ok := conn.batches[sessionID]; !ok {
conn.batches[sessionID] = NewSessionBatch()
}
return conn.batches[sessionID]
}
func (conn *BatchSet) BatchQueue(sessionID uint64, sql string, args ...interface{}) {
conn.getBatch(sessionID).Queue(sql, args...)
}
func (conn *BatchSet) Commit() {
newTask := NewBatchesTask(len(conn.batches) + 2)
// Copy batches
for _, b := range conn.batches {
newTask.batches = append(newTask.batches, b)
}
// Reset current batches
conn.batches = make(map[uint64]*SessionBatch)
conn.workerTask <- newTask
}
func (conn *BatchSet) Stop() {
conn.done <- struct{}{}
<-conn.finished
}
func (conn *BatchSet) sendBatches(t *batchesTask) {
for _, batch := range t.batches {
// Record batch size
database.RecordBatchElements(float64(batch.Len()))
start := time.Now()
// Send batch to db and execute
br := conn.c.SendBatch(batch.Batch)
l := batch.Len()
for i := 0; i < l; i++ {
if _, err := br.Exec(); err != nil {
log.Printf("Error in PG batch: %v \n", err)
failedSql := batch.items[i]
query := strings.ReplaceAll(failedSql.query, "\n", " ")
log.Println("failed sql req:", query, failedSql.arguments)
}
}
br.Close() // returns err
database.RecordBatchInsertDuration(float64(time.Now().Sub(start).Milliseconds()))
}
}
func (conn *BatchSet) worker() {
for {
select {
case t := <-conn.workerTask:
conn.sendBatches(t)
case <-conn.done:
if len(conn.workerTask) > 0 {
for t := range conn.workerTask {
conn.sendBatches(t)
}
}
conn.finished <- struct{}{}
return
}
}
}

View file

@ -1,207 +0,0 @@
package postgres
import (
"log"
"strings"
"time"
"openreplay/backend/pkg/metrics/database"
"github.com/jackc/pgx/v4"
)
type batchItem struct {
query string
arguments []interface{}
}
type SessionBatch struct {
sessID uint64
batch *pgx.Batch
size int
items []*batchItem
updates *sessionUpdates
}
func NewSessionBatch(sessionID uint64) *SessionBatch {
return &SessionBatch{
sessID: sessionID,
batch: &pgx.Batch{},
size: 0,
items: make([]*batchItem, 0),
updates: NewSessionUpdates(sessionID),
}
}
func (b *SessionBatch) SessionID() uint64 {
return b.sessID
}
func (b *SessionBatch) Queue(query string, arguments ...interface{}) {
b.batch.Queue(query, arguments...)
b.items = append(b.items, &batchItem{
query: query,
arguments: arguments,
})
}
func (b *SessionBatch) Update(pages, events int) {
b.updates.addEvents(pages, events)
}
func (b *SessionBatch) AddToSize(size int) {
b.size += size
}
func (b *SessionBatch) Size() int {
return b.size
}
func (b *SessionBatch) Len() int {
return b.batch.Len()
}
func (b *SessionBatch) Prepare() {
sql, args := b.updates.request()
if sql != "" {
b.Queue(sql, args...)
}
}
type batchesTask struct {
batches []*SessionBatch
}
func NewBatchesTask(size int) *batchesTask {
return &batchesTask{batches: make([]*SessionBatch, 0, size)}
}
type BatchSet struct {
c Pool
batches map[uint64]*SessionBatch
batchQueueLimit int
batchSizeLimit int
updates map[uint64]*sessionUpdates
workerTask chan *batchesTask
done chan struct{}
finished chan struct{}
}
func NewBatchSet(c Pool, queueLimit, sizeLimit int) *BatchSet {
bs := &BatchSet{
c: c,
batches: make(map[uint64]*SessionBatch),
batchQueueLimit: queueLimit,
batchSizeLimit: sizeLimit,
workerTask: make(chan *batchesTask, 1),
done: make(chan struct{}),
finished: make(chan struct{}),
updates: make(map[uint64]*sessionUpdates),
}
go bs.worker()
return bs
}
func (conn *BatchSet) getBatch(sessionID uint64) *SessionBatch {
sessionID = sessionID % 10
if _, ok := conn.batches[sessionID]; !ok {
conn.batches[sessionID] = NewSessionBatch(sessionID)
}
return conn.batches[sessionID]
}
func (conn *BatchSet) batchQueue(sessionID uint64, sql string, args ...interface{}) {
conn.getBatch(sessionID).Queue(sql, args...)
}
func (conn *BatchSet) updateSessionEvents(sessionID uint64, events, pages int) {
upd, ok := conn.updates[sessionID]
if !ok {
upd = NewSessionUpdates(sessionID)
conn.updates[sessionID] = upd
}
upd.addEvents(pages, events)
}
func (conn *BatchSet) updateSessionIssues(sessionID uint64, errors, issueScore int) {
upd, ok := conn.updates[sessionID]
if !ok {
upd = NewSessionUpdates(sessionID)
conn.updates[sessionID] = upd
}
upd.addIssues(errors, issueScore)
}
func (conn *BatchSet) updateBatchSize(sessionID uint64, reqSize int) {
conn.getBatch(sessionID).AddToSize(reqSize)
}
func (conn *BatchSet) Commit() {
newTask := NewBatchesTask(len(conn.batches) + 2)
// Copy batches
for _, b := range conn.batches {
newTask.batches = append(newTask.batches, b)
}
// Reset current batches
conn.batches = make(map[uint64]*SessionBatch)
// common batch for user's updates
batch := NewSessionBatch(0)
for _, upd := range conn.updates {
if str, args := upd.request(); str != "" {
batch.Queue(str, args...)
}
}
newTask.batches = append(newTask.batches, batch)
conn.updates = make(map[uint64]*sessionUpdates)
conn.workerTask <- newTask
}
func (conn *BatchSet) Stop() {
conn.done <- struct{}{}
<-conn.finished
}
func (conn *BatchSet) sendBatches(t *batchesTask) {
for _, batch := range t.batches {
// Append session update sql request to the end of batch
batch.Prepare()
// Record batch size in bytes and number of lines
database.RecordBatchSize(float64(batch.Size()))
database.RecordBatchElements(float64(batch.Len()))
start := time.Now()
// Send batch to db and execute
br := conn.c.SendBatch(batch.batch)
l := batch.Len()
for i := 0; i < l; i++ {
if _, err := br.Exec(); err != nil {
log.Printf("Error in PG batch (session: %d): %v \n", batch.SessionID(), err)
failedSql := batch.items[i]
query := strings.ReplaceAll(failedSql.query, "\n", " ")
log.Println("failed sql req:", query, failedSql.arguments)
}
}
br.Close() // returns err
database.RecordBatchInsertDuration(float64(time.Now().Sub(start).Milliseconds()))
}
}
func (conn *BatchSet) worker() {
for {
select {
case t := <-conn.workerTask:
conn.sendBatches(t)
case <-conn.done:
if len(conn.workerTask) > 0 {
for t := range conn.workerTask {
conn.sendBatches(t)
}
}
conn.finished <- struct{}{}
return
}
}
}

View file

@ -4,8 +4,10 @@ import (
"bytes"
"errors"
"fmt"
"openreplay/backend/pkg/metrics/database"
"openreplay/backend/pkg/db/postgres/pool"
"time"
"openreplay/backend/pkg/metrics/database"
)
const (
@ -21,7 +23,7 @@ type Bulk interface {
}
type bulkImpl struct {
conn Pool
conn pool.Pool
table string
columns string
template string
@ -78,7 +80,7 @@ func (b *bulkImpl) send() error {
return nil
}
func NewBulk(conn Pool, table, columns, template string, setSize, sizeLimit int) (Bulk, error) {
func NewBulk(conn pool.Pool, table, columns, template string, setSize, sizeLimit int) (Bulk, error) {
switch {
case conn == nil:
return nil, errors.New("db conn is empty")

View file

@ -2,6 +2,7 @@ package postgres
import (
"log"
"openreplay/backend/pkg/db/postgres/pool"
)
type bulksTask struct {
@ -13,7 +14,7 @@ func NewBulksTask() *bulksTask {
}
type BulkSet struct {
c Pool
c pool.Pool
autocompletes Bulk
requests Bulk
customEvents Bulk
@ -34,7 +35,7 @@ type BulkSet struct {
finished chan struct{}
}
func NewBulkSet(c Pool) *BulkSet {
func NewBulkSet(c pool.Pool) *BulkSet {
bs := &BulkSet{
c: c,
workerTask: make(chan *bulksTask, 1),

View file

@ -1,21 +1,20 @@
package postgres
import (
"context"
"log"
"github.com/jackc/pgx/v4/pgxpool"
"openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/db/postgres/batch"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/sessions"
)
type CH interface {
InsertAutocomplete(session *types.Session, msgType, msgValue string) error
InsertAutocomplete(session *sessions.Session, msgType, msgValue string) error
}
// Conn contains batches, bulks and cache for all sessions
type Conn struct {
c Pool
batches *BatchSet
Pool pool.Pool
batches *batch.BatchSet
bulks *BulkSet
chConn CH // hack for autocomplete inserts, TODO: rewrite
}
@ -24,29 +23,24 @@ func (conn *Conn) SetClickHouse(ch CH) {
conn.chConn = ch
}
func NewConn(url string, queueLimit, sizeLimit int) *Conn {
c, err := pgxpool.Connect(context.Background(), url)
if err != nil {
log.Fatalf("pgxpool.Connect err: %s", err)
func NewConn(pool pool.Pool) *Conn {
if pool == nil {
log.Fatalf("pool is nil")
}
conn := &Conn{}
conn.c, err = NewPool(c)
if err != nil {
log.Fatalf("can't create new pool wrapper: %s", err)
return &Conn{
Pool: pool,
bulks: NewBulkSet(pool),
batches: batch.NewBatchSet(pool),
}
conn.bulks = NewBulkSet(conn.c)
conn.batches = NewBatchSet(conn.c, queueLimit, sizeLimit)
return conn
}
func (conn *Conn) Close() error {
conn.bulks.Stop()
conn.batches.Stop()
conn.c.Close()
return nil
}
func (conn *Conn) insertAutocompleteValue(sessionID uint64, projectID uint32, tp string, value string) {
func (conn *Conn) InsertAutocompleteValue(sessionID uint64, projectID uint32, tp string, value string) {
if len(value) == 0 {
return
}
@ -57,28 +51,16 @@ func (conn *Conn) insertAutocompleteValue(sessionID uint64, projectID uint32, tp
return
}
// Send autocomplete data to clickhouse
if err := conn.chConn.InsertAutocomplete(&types.Session{SessionID: sessionID, ProjectID: projectID}, tp, value); err != nil {
if err := conn.chConn.InsertAutocomplete(&sessions.Session{SessionID: sessionID, ProjectID: projectID}, tp, value); err != nil {
log.Printf("click house autocomplete err: %s", err)
}
}
func (conn *Conn) batchQueue(sessionID uint64, sql string, args ...interface{}) {
conn.batches.batchQueue(sessionID, sql, args...)
}
func (conn *Conn) updateSessionEvents(sessionID uint64, events, pages int) {
conn.batches.updateSessionEvents(sessionID, events, pages)
}
func (conn *Conn) updateSessionIssues(sessionID uint64, errors, issueScore int) {
conn.batches.updateSessionIssues(sessionID, errors, issueScore)
func (conn *Conn) BatchQueue(sessionID uint64, sql string, args ...interface{}) {
conn.batches.BatchQueue(sessionID, sql, args...)
}
func (conn *Conn) Commit() {
conn.bulks.Send()
conn.batches.Commit()
}
func (conn *Conn) updateBatchSize(sessionID uint64, reqSize int) {
conn.batches.updateBatchSize(sessionID, reqSize)
}

View file

@ -0,0 +1,281 @@
package postgres
import (
"errors"
"fmt"
"log"
"strings"
"openreplay/backend/internal/http/geoip"
"openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/hashid"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/sessions"
"openreplay/backend/pkg/url"
)
var EmptyLabel = errors.New("empty label")
func getAutocompleteType(baseType string, platform string) string {
if platform == "web" {
return baseType
}
return baseType + "_" + strings.ToUpper(platform)
}
func (conn *Conn) HandleStartEvent(s *messages.SessionStart) error {
sessionID := s.SessionID()
projectID := uint32(s.ProjectID)
platform := "web"
geoInfo := geoip.UnpackGeoRecord(s.UserCountry)
conn.InsertAutocompleteValue(sessionID, projectID, getAutocompleteType("USEROS", platform), s.UserOS)
conn.InsertAutocompleteValue(sessionID, projectID, getAutocompleteType("USERDEVICE", platform), s.UserDevice)
conn.InsertAutocompleteValue(sessionID, projectID, getAutocompleteType("USERCOUNTRY", platform), geoInfo.Country)
conn.InsertAutocompleteValue(sessionID, projectID, getAutocompleteType("USERSTATE", platform), geoInfo.State)
conn.InsertAutocompleteValue(sessionID, projectID, getAutocompleteType("USERCITY", platform), geoInfo.City)
conn.InsertAutocompleteValue(sessionID, projectID, getAutocompleteType("REVID", platform), s.RevID)
conn.InsertAutocompleteValue(sessionID, projectID, "USERBROWSER", s.UserBrowser)
return nil
}
func (conn *Conn) HandleEndEvent(sessionID uint64) error {
sqlRequest := `
UPDATE sessions
SET issue_types=(SELECT
CASE WHEN errors_count > 0 THEN
(COALESCE(ARRAY_AGG(DISTINCT ps.type), '{}') || 'js_exception'::issue_type)::issue_type[]
ELSE
(COALESCE(ARRAY_AGG(DISTINCT ps.type), '{}'))::issue_type[]
END
FROM events_common.issues
INNER JOIN issues AS ps USING (issue_id)
WHERE session_id = $1)
WHERE session_id = $1
`
return conn.Pool.Exec(sqlRequest, sessionID)
}
func (conn *Conn) InsertRequest(sessionID uint64, timestamp uint64, index uint32, url string, duration uint64, success bool) error {
if err := conn.bulks.Get("requests").Append(sessionID, timestamp, index, url, duration, success); err != nil {
return fmt.Errorf("insert request in bulk err: %s", err)
}
return nil
}
func (conn *Conn) InsertCustomEvent(sessionID uint64, timestamp uint64, index uint32, name string, payload string) error {
if err := conn.bulks.Get("customEvents").Append(sessionID, timestamp, index, name, payload); err != nil {
return fmt.Errorf("insert custom event in bulk err: %s", err)
}
return nil
}
func (conn *Conn) InsertIssueEvent(sess *sessions.Session, e *messages.IssueEvent) error {
issueID := hashid.IssueID(sess.ProjectID, e)
payload := &e.Payload
if *payload == "" || *payload == "{}" {
payload = nil
}
if err := conn.bulks.Get("webIssues").Append(sess.ProjectID, issueID, e.Type, e.ContextString); err != nil {
log.Printf("insert web issue err: %s", err)
}
if err := conn.bulks.Get("webIssueEvents").Append(sess.SessionID, issueID, e.Timestamp, truncSqIdx(e.MessageID), payload); err != nil {
log.Printf("insert web issue event err: %s", err)
}
if e.Type == "custom" {
if err := conn.bulks.Get("webCustomEvents").Append(sess.SessionID, truncSqIdx(e.MessageID), e.Timestamp, e.ContextString, e.Payload, "error"); err != nil {
log.Printf("insert web custom event err: %s", err)
}
}
return nil
}
func (conn *Conn) InsertWebCustomEvent(sess *sessions.Session, e *messages.CustomEvent) error {
err := conn.InsertCustomEvent(
sess.SessionID,
e.Meta().Timestamp,
truncSqIdx(e.Meta().Index),
e.Name,
e.Payload,
)
if err == nil {
conn.InsertAutocompleteValue(sess.SessionID, sess.ProjectID, "CUSTOM", e.Name)
}
return err
}
func (conn *Conn) InsertWebPageEvent(sess *sessions.Session, e *messages.PageEvent) error {
host, path, query, err := url.GetURLParts(e.URL)
if err != nil {
return err
}
// base_path is deprecated
if err = conn.bulks.Get("webPageEvents").Append(sess.SessionID, truncSqIdx(e.MessageID), e.Timestamp, e.Referrer, url.DiscardURLQuery(e.Referrer),
host, path, query, e.DomContentLoadedEventEnd, e.LoadEventEnd, e.ResponseEnd, e.FirstPaint, e.FirstContentfulPaint,
e.SpeedIndex, e.VisuallyComplete, e.TimeToInteractive, calcResponseTime(e), calcDomBuildingTime(e)); err != nil {
log.Printf("insert web page event in bulk err: %s", err)
}
// Add new value set to autocomplete bulk
conn.InsertAutocompleteValue(sess.SessionID, sess.ProjectID, "LOCATION", url.DiscardURLQuery(path))
conn.InsertAutocompleteValue(sess.SessionID, sess.ProjectID, "REFERRER", url.DiscardURLQuery(e.Referrer))
return nil
}
func (conn *Conn) InsertWebClickEvent(sess *sessions.Session, e *messages.MouseClick) error {
if e.Label == "" {
return nil
}
var host, path string
host, path, _, _ = url.GetURLParts(e.Url)
if err := conn.bulks.Get("webClickEvents").Append(sess.SessionID, truncSqIdx(e.MsgID()), e.Timestamp, e.Label, e.Selector, host+path, path, e.HesitationTime); err != nil {
log.Printf("insert web click err: %s", err)
}
// Add new value set to autocomplete bulk
conn.InsertAutocompleteValue(sess.SessionID, sess.ProjectID, "CLICK", e.Label)
return nil
}
func (conn *Conn) InsertWebInputEvent(sess *sessions.Session, e *messages.InputEvent) error {
if e.Label == "" {
return EmptyLabel
}
if err := conn.bulks.Get("webInputEvents").Append(sess.SessionID, truncSqIdx(e.MessageID), e.Timestamp, e.Label); err != nil {
log.Printf("insert web input event err: %s", err)
}
conn.InsertAutocompleteValue(sess.SessionID, sess.ProjectID, "INPUT", e.Label)
return nil
}
func (conn *Conn) InsertInputChangeEvent(sess *sessions.Session, e *messages.InputChange) error {
if e.Label == "" {
return nil
}
if e.HesitationTime > 2147483647 {
e.HesitationTime = 0
}
if e.InputDuration > 2147483647 {
e.InputDuration = 0
}
if err := conn.bulks.Get("webInputDurations").Append(sess.SessionID, truncSqIdx(e.ID), e.Timestamp, e.Label, e.HesitationTime, e.InputDuration); err != nil {
log.Printf("insert web input event err: %s", err)
}
conn.InsertAutocompleteValue(sess.SessionID, sess.ProjectID, "INPUT", e.Label)
return nil
}
func (conn *Conn) InsertWebErrorEvent(sess *sessions.Session, e *types.ErrorEvent) error {
errorID := e.ID(sess.ProjectID)
if err := conn.bulks.Get("webErrors").Append(errorID, sess.ProjectID, e.Source, e.Name, e.Message, e.Payload); err != nil {
log.Printf("insert web error err: %s", err)
}
if err := conn.bulks.Get("webErrorEvents").Append(sess.SessionID, truncSqIdx(e.MessageID), e.Timestamp, errorID); err != nil {
log.Printf("insert web error event err: %s", err)
}
for key, value := range e.Tags {
if err := conn.bulks.Get("webErrorTags").Append(sess.SessionID, truncSqIdx(e.MessageID), errorID, key, value); err != nil {
log.Printf("insert web error token err: %s", err)
}
}
return nil
}
func (conn *Conn) InsertWebNetworkRequest(sess *sessions.Session, e *messages.NetworkRequest) error {
var request, response *string
if sess.SaveRequestPayload {
request = &e.Request
response = &e.Response
}
host, path, query, err := url.GetURLParts(e.URL)
conn.InsertAutocompleteValue(sess.SessionID, sess.ProjectID, "REQUEST", path)
if err != nil {
return err
}
conn.bulks.Get("webNetworkRequest").Append(sess.SessionID, e.Meta().Timestamp, truncSqIdx(e.Meta().Index), e.URL, host, path, query,
request, response, e.Status, url.EnsureMethod(e.Method), e.Duration, e.Status < 400)
return nil
}
func (conn *Conn) InsertWebGraphQL(sess *sessions.Session, e *messages.GraphQL) error {
var request, response *string
if sess.SaveRequestPayload {
request = &e.Variables
response = &e.Response
}
if err := conn.bulks.Get("webGraphQL").Append(sess.SessionID, e.Meta().Timestamp, truncSqIdx(e.Meta().Index), e.OperationName, request, response); err != nil {
log.Printf("insert web graphQL event err: %s", err)
}
conn.InsertAutocompleteValue(sess.SessionID, sess.ProjectID, "GRAPHQL", e.OperationName)
return nil
}
func (conn *Conn) InsertMouseThrashing(sess *sessions.Session, e *messages.MouseThrashing) error {
issueID := hashid.MouseThrashingID(sess.ProjectID, sess.SessionID, e.Timestamp)
if err := conn.bulks.Get("webIssues").Append(sess.ProjectID, issueID, "mouse_thrashing", e.Url); err != nil {
log.Printf("insert web issue err: %s", err)
}
if err := conn.bulks.Get("webIssueEvents").Append(sess.SessionID, issueID, e.Timestamp, truncSqIdx(e.MsgID()), nil); err != nil {
log.Printf("insert web issue event err: %s", err)
}
return nil
}
func (conn *Conn) InsertWebStatsPerformance(p *messages.PerformanceTrackAggr) error {
sessionID := p.SessionID()
timestamp := (p.TimestampEnd + p.TimestampStart) / 2
sqlRequest := `
INSERT INTO events.performance (
session_id, timestamp, message_id,
min_fps, avg_fps, max_fps,
min_cpu, avg_cpu, max_cpu,
min_total_js_heap_size, avg_total_js_heap_size, max_total_js_heap_size,
min_used_js_heap_size, avg_used_js_heap_size, max_used_js_heap_size
) VALUES (
$1, $2, $3,
$4, $5, $6,
$7, $8, $9,
$10, $11, $12,
$13, $14, $15
)`
conn.BatchQueue(sessionID, sqlRequest,
sessionID, timestamp, timestamp, // ??? TODO: primary key by timestamp+session_id
p.MinFPS, p.AvgFPS, p.MaxFPS,
p.MinCPU, p.AvgCPU, p.MinCPU,
p.MinTotalJSHeapSize, p.AvgTotalJSHeapSize, p.MaxTotalJSHeapSize,
p.MinUsedJSHeapSize, p.AvgUsedJSHeapSize, p.MaxUsedJSHeapSize,
)
return nil
}
func (conn *Conn) InsertWebStatsResourceEvent(e *messages.ResourceTiming) error {
sessionID := e.SessionID()
host, _, _, err := url.GetURLParts(e.URL)
if err != nil {
return err
}
msgType := url.GetResourceType(e.Initiator, e.URL)
sqlRequest := `
INSERT INTO events.resources (
session_id, timestamp, message_id,
type,
url, url_host, url_hostpath,
success, status,
duration, ttfb, header_size, encoded_body_size, decoded_body_size
) VALUES (
$1, $2, $3,
$4,
LEFT($5, 8000), LEFT($6, 300), LEFT($7, 2000),
$8, $9,
NULLIF($10, 0), NULLIF($11, 0), NULLIF($12, 0), NULLIF($13, 0), NULLIF($14, 0)
)`
urlQuery := url.DiscardURLQuery(e.URL)
conn.BatchQueue(sessionID, sqlRequest,
sessionID, e.Timestamp, truncSqIdx(e.MsgID()),
msgType,
e.URL, host, urlQuery,
e.Duration != 0, 0,
e.Duration, e.TTFB, e.HeaderSize, e.EncodedBodySize, e.DecodedBodySize,
)
return nil
}

View file

@ -1,45 +0,0 @@
package postgres
import (
"openreplay/backend/pkg/featureflags"
)
func (conn *Conn) GetFeatureFlags(projectID uint32) ([]*featureflags.FeatureFlag, error) {
rows, err := conn.c.Query(`
SELECT ff.flag_id, ff.flag_key, ff.flag_type, ff.is_persist, ff.payload, ff.rollout_percentages, ff.filters,
ARRAY_AGG(fv.value) as values,
ARRAY_AGG(fv.payload) as payloads,
ARRAY_AGG(fv.rollout_percentage) AS variants_percentages
FROM (
SELECT ff.feature_flag_id AS flag_id, ff.flag_key AS flag_key, ff.flag_type, ff.is_persist, ff.payload,
ARRAY_AGG(fc.rollout_percentage) AS rollout_percentages,
ARRAY_AGG(fc.filters) AS filters
FROM public.feature_flags ff
LEFT JOIN public.feature_flags_conditions fc ON ff.feature_flag_id = fc.feature_flag_id
WHERE ff.project_id = $1 AND ff.is_active = TRUE
GROUP BY ff.feature_flag_id
) AS ff
LEFT JOIN public.feature_flags_variants fv ON ff.flag_type = 'multi' AND ff.flag_id = fv.feature_flag_id
GROUP BY ff.flag_id, ff.flag_key, ff.flag_type, ff.is_persist, ff.payload, ff.filters, ff.rollout_percentages;
`, projectID)
if err != nil {
return nil, err
}
defer rows.Close()
var flags []*featureflags.FeatureFlag
for rows.Next() {
var flag featureflags.FeatureFlagPG
if err := rows.Scan(&flag.FlagID, &flag.FlagKey, &flag.FlagType, &flag.IsPersist, &flag.Payload, &flag.RolloutPercentages,
&flag.Filters, &flag.Values, &flag.Payloads, &flag.VariantRollout); err != nil {
return nil, err
}
parsedFlag, err := featureflags.ParseFeatureFlag(&flag)
if err != nil {
return nil, err
}
flags = append(flags, parsedFlag)
}
return flags, nil
}

View file

@ -6,7 +6,7 @@ import (
"openreplay/backend/pkg/messages"
)
func getIssueScore(issueEvent *messages.IssueEvent) int {
func GetIssueScore(issueEvent *messages.IssueEvent) int {
switch issueEvent.Type {
case "crash", "dead_click", "memory", "cpu":
return 1000

View file

@ -1,49 +0,0 @@
package postgres
import (
"encoding/json"
)
//go:generate $GOPATH/bin/easytags $GOFILE json
type Integration struct {
ProjectID uint32 `json:"project_id"`
Provider string `json:"provider"`
//DeletedAt *int64 `json:"deleted_at"`
RequestData json.RawMessage `json:"request_data"`
Options json.RawMessage `json:"options"`
}
func (conn *Conn) IterateIntegrationsOrdered(iter func(integration *Integration, err error)) error {
rows, err := conn.c.Query(`
SELECT project_id, provider, options, request_data
FROM integrations
`)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
i := new(Integration)
if err := rows.Scan(&i.ProjectID, &i.Provider, &i.Options, &i.RequestData); err != nil {
iter(nil, err)
continue
}
iter(i, nil)
}
if err = rows.Err(); err != nil {
return err
}
return nil
}
func (conn *Conn) UpdateIntegrationRequestData(i *Integration) error {
return conn.c.Exec(`
UPDATE integrations
SET request_data = $1
WHERE project_id=$2 AND provider=$3`,
i.RequestData, i.ProjectID, i.Provider,
)
}

View file

@ -1,184 +0,0 @@
package postgres
import (
"fmt"
"log"
"openreplay/backend/pkg/url"
"strings"
"openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/hashid"
"openreplay/backend/pkg/messages"
)
func getAutocompleteType(baseType string, platform string) string {
if platform == "web" {
return baseType
}
return baseType + "_" + strings.ToUpper(platform)
}
func (conn *Conn) InsertSessionStart(sessionID uint64, s *types.Session) error {
return conn.c.Exec(`
INSERT INTO sessions (
session_id, project_id, start_ts,
user_uuid, user_device, user_device_type, user_country,
user_os, user_os_version,
rev_id,
tracker_version, issue_score,
platform,
user_agent, user_browser, user_browser_version, user_device_memory_size, user_device_heap_size,
user_id, user_state, user_city
) VALUES (
$1, $2, $3,
$4, $5, $6, $7,
$8, NULLIF($9, ''),
NULLIF($10, ''),
$11, $12,
$13,
NULLIF($14, ''), NULLIF($15, ''), NULLIF($16, ''), NULLIF($17, 0), NULLIF($18, 0::bigint),
NULLIF(LEFT($19, 8000), ''), NULLIF($20, ''), NULLIF($21, '')
)`,
sessionID, s.ProjectID, s.Timestamp,
s.UserUUID, s.UserDevice, s.UserDeviceType, s.UserCountry,
s.UserOS, s.UserOSVersion,
s.RevID,
s.TrackerVersion, s.Timestamp/1000,
s.Platform,
s.UserAgent, s.UserBrowser, s.UserBrowserVersion, s.UserDeviceMemorySize, s.UserDeviceHeapSize,
s.UserID, s.UserState, s.UserCity,
)
}
func (conn *Conn) HandleSessionStart(sessionID uint64, s *types.Session) error {
conn.insertAutocompleteValue(sessionID, s.ProjectID, getAutocompleteType("USEROS", s.Platform), s.UserOS)
conn.insertAutocompleteValue(sessionID, s.ProjectID, getAutocompleteType("USERDEVICE", s.Platform), s.UserDevice)
conn.insertAutocompleteValue(sessionID, s.ProjectID, getAutocompleteType("USERCOUNTRY", s.Platform), s.UserCountry)
conn.insertAutocompleteValue(sessionID, s.ProjectID, getAutocompleteType("USERSTATE", s.Platform), s.UserState)
conn.insertAutocompleteValue(sessionID, s.ProjectID, getAutocompleteType("USERCITY", s.Platform), s.UserCity)
conn.insertAutocompleteValue(sessionID, s.ProjectID, getAutocompleteType("REVID", s.Platform), s.RevID)
// s.Platform == "web"
conn.insertAutocompleteValue(sessionID, s.ProjectID, "USERBROWSER", s.UserBrowser)
return nil
}
func (conn *Conn) GetSessionDuration(sessionID uint64) (uint64, error) {
var dur uint64
if err := conn.c.QueryRow("SELECT COALESCE( duration, 0 ) FROM sessions WHERE session_id=$1", sessionID).Scan(&dur); err != nil {
return 0, err
}
return dur, nil
}
func (conn *Conn) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64, error) {
var dur uint64
if err := conn.c.QueryRow(`
UPDATE sessions SET duration=$2 - start_ts
WHERE session_id=$1
RETURNING duration
`,
sessionID, timestamp,
).Scan(&dur); err != nil {
return 0, err
}
return dur, nil
}
func (conn *Conn) InsertSessionEncryptionKey(sessionID uint64, key []byte) error {
return conn.c.Exec(`UPDATE sessions SET file_key = $2 WHERE session_id = $1`, sessionID, string(key))
}
func (conn *Conn) HandleSessionEnd(sessionID uint64) error {
sqlRequest := `
UPDATE sessions
SET issue_types=(SELECT
CASE WHEN errors_count > 0 THEN
(COALESCE(ARRAY_AGG(DISTINCT ps.type), '{}') || 'js_exception'::issue_type)::issue_type[]
ELSE
(COALESCE(ARRAY_AGG(DISTINCT ps.type), '{}'))::issue_type[]
END
FROM events_common.issues
INNER JOIN issues AS ps USING (issue_id)
WHERE session_id = $1)
WHERE session_id = $1
`
return conn.c.Exec(sqlRequest, sessionID)
}
func (conn *Conn) InsertRequest(sessionID uint64, timestamp uint64, index uint32, url string, duration uint64, success bool) error {
if err := conn.bulks.Get("requests").Append(sessionID, timestamp, index, url, duration, success); err != nil {
return fmt.Errorf("insert request in bulk err: %s", err)
}
return nil
}
func (conn *Conn) InsertCustomEvent(sessionID uint64, timestamp uint64, index uint32, name string, payload string) error {
if err := conn.bulks.Get("customEvents").Append(sessionID, timestamp, index, name, payload); err != nil {
return fmt.Errorf("insert custom event in bulk err: %s", err)
}
return nil
}
func (conn *Conn) InsertUserID(sessionID uint64, userID string) error {
sqlRequest := `
UPDATE sessions SET user_id = LEFT($1, 8000)
WHERE session_id = $2`
conn.batchQueue(sessionID, sqlRequest, userID, sessionID)
// Record approximate message size
conn.updateBatchSize(sessionID, len(sqlRequest)+len(userID)+8)
return nil
}
func (conn *Conn) InsertUserAnonymousID(sessionID uint64, userAnonymousID string) error {
sqlRequest := `
UPDATE sessions SET user_anonymous_id = $1
WHERE session_id = $2`
conn.batchQueue(sessionID, sqlRequest, userAnonymousID, sessionID)
// Record approximate message size
conn.updateBatchSize(sessionID, len(sqlRequest)+len(userAnonymousID)+8)
return nil
}
func (conn *Conn) InsertMetadata(sessionID uint64, keyNo uint, value string) error {
sqlRequest := `
UPDATE sessions SET metadata_%v = LEFT($1, 8000)
WHERE session_id = $2`
return conn.c.Exec(fmt.Sprintf(sqlRequest, keyNo), value, sessionID)
}
func (conn *Conn) InsertIssueEvent(sessionID uint64, projectID uint32, e *messages.IssueEvent) error {
issueID := hashid.IssueID(projectID, e)
payload := &e.Payload
if *payload == "" || *payload == "{}" {
payload = nil
}
if err := conn.bulks.Get("webIssues").Append(projectID, issueID, e.Type, e.ContextString); err != nil {
log.Printf("insert web issue err: %s", err)
}
if err := conn.bulks.Get("webIssueEvents").Append(sessionID, issueID, e.Timestamp, truncSqIdx(e.MessageID), payload); err != nil {
log.Printf("insert web issue event err: %s", err)
}
conn.updateSessionIssues(sessionID, 0, getIssueScore(e))
if e.Type == "custom" {
if err := conn.bulks.Get("webCustomEvents").Append(sessionID, truncSqIdx(e.MessageID), e.Timestamp, e.ContextString, e.Payload, "error"); err != nil {
log.Printf("insert web custom event err: %s", err)
}
}
return nil
}
func (conn *Conn) InsertReferrer(sessionID uint64, referrer string) error {
baseReferrer := url.DiscardURLQuery(referrer)
sqlRequest := `
UPDATE sessions SET referrer = LEFT($1, 8000), base_referrer = LEFT($2, 8000)
WHERE session_id = $3`
conn.batchQueue(sessionID, sqlRequest, referrer, baseReferrer, sessionID)
// Record approximate message size
conn.updateBatchSize(sessionID, len(sqlRequest)+len(referrer)+len(baseReferrer)+8)
return nil
}

View file

@ -1,72 +0,0 @@
package postgres
import (
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/url"
)
func (conn *Conn) InsertWebStatsPerformance(p *PerformanceTrackAggr) error {
sessionID := p.SessionID()
timestamp := (p.TimestampEnd + p.TimestampStart) / 2
sqlRequest := `
INSERT INTO events.performance (
session_id, timestamp, message_id,
min_fps, avg_fps, max_fps,
min_cpu, avg_cpu, max_cpu,
min_total_js_heap_size, avg_total_js_heap_size, max_total_js_heap_size,
min_used_js_heap_size, avg_used_js_heap_size, max_used_js_heap_size
) VALUES (
$1, $2, $3,
$4, $5, $6,
$7, $8, $9,
$10, $11, $12,
$13, $14, $15
)`
conn.batchQueue(sessionID, sqlRequest,
sessionID, timestamp, timestamp, // ??? TODO: primary key by timestamp+session_id
p.MinFPS, p.AvgFPS, p.MaxFPS,
p.MinCPU, p.AvgCPU, p.MinCPU,
p.MinTotalJSHeapSize, p.AvgTotalJSHeapSize, p.MaxTotalJSHeapSize,
p.MinUsedJSHeapSize, p.AvgUsedJSHeapSize, p.MaxUsedJSHeapSize,
)
// Record approximate message size
conn.updateBatchSize(sessionID, len(sqlRequest)+8*15)
return nil
}
func (conn *Conn) InsertWebStatsResourceEvent(e *ResourceTiming) error {
sessionID := e.SessionID()
host, _, _, err := url.GetURLParts(e.URL)
if err != nil {
return err
}
msgType := url.GetResourceType(e.Initiator, e.URL)
sqlRequest := `
INSERT INTO events.resources (
session_id, timestamp, message_id,
type,
url, url_host, url_hostpath,
success, status,
duration, ttfb, header_size, encoded_body_size, decoded_body_size
) VALUES (
$1, $2, $3,
$4,
LEFT($5, 8000), LEFT($6, 300), LEFT($7, 2000),
$8, $9,
NULLIF($10, 0), NULLIF($11, 0), NULLIF($12, 0), NULLIF($13, 0), NULLIF($14, 0)
)`
urlQuery := url.DiscardURLQuery(e.URL)
conn.batchQueue(sessionID, sqlRequest,
sessionID, e.Timestamp, truncSqIdx(e.MsgID()),
msgType,
e.URL, host, urlQuery,
e.Duration != 0, 0,
e.Duration, e.TTFB, e.HeaderSize, e.EncodedBodySize, e.DecodedBodySize,
)
// Record approximate message size
conn.updateBatchSize(sessionID, len(sqlRequest)+len(msgType)+len(e.URL)+len(host)+len(urlQuery)+8*9+1)
return nil
}

View file

@ -1,170 +0,0 @@
package postgres
import (
"log"
"openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/hashid"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/url"
)
func (conn *Conn) InsertWebCustomEvent(sessionID uint64, projectID uint32, e *messages.CustomEvent) error {
err := conn.InsertCustomEvent(
sessionID,
uint64(e.Meta().Timestamp),
truncSqIdx(e.Meta().Index),
e.Name,
e.Payload,
)
if err == nil {
conn.insertAutocompleteValue(sessionID, projectID, "CUSTOM", e.Name)
}
return err
}
func (conn *Conn) InsertWebUserID(sessionID uint64, projectID uint32, userID *messages.UserID) error {
err := conn.InsertUserID(sessionID, userID.ID)
if err == nil {
conn.insertAutocompleteValue(sessionID, projectID, "USERID", userID.ID)
}
return err
}
func (conn *Conn) InsertWebUserAnonymousID(sessionID uint64, projectID uint32, userAnonymousID *messages.UserAnonymousID) error {
err := conn.InsertUserAnonymousID(sessionID, userAnonymousID.ID)
if err == nil {
conn.insertAutocompleteValue(sessionID, projectID, "USERANONYMOUSID", userAnonymousID.ID)
}
return err
}
func (conn *Conn) InsertWebPageEvent(sessionID uint64, projectID uint32, e *messages.PageEvent) error {
host, path, query, err := url.GetURLParts(e.URL)
if err != nil {
return err
}
// base_path is deprecated
if err = conn.bulks.Get("webPageEvents").Append(sessionID, truncSqIdx(e.MessageID), e.Timestamp, e.Referrer, url.DiscardURLQuery(e.Referrer),
host, path, query, e.DomContentLoadedEventEnd, e.LoadEventEnd, e.ResponseEnd, e.FirstPaint, e.FirstContentfulPaint,
e.SpeedIndex, e.VisuallyComplete, e.TimeToInteractive, calcResponseTime(e), calcDomBuildingTime(e)); err != nil {
log.Printf("insert web page event in bulk err: %s", err)
}
if err = conn.InsertReferrer(sessionID, e.Referrer); err != nil {
log.Printf("insert session referrer err: %s", err)
}
// Accumulate session updates and exec inside batch with another sql commands
conn.updateSessionEvents(sessionID, 1, 1)
// Add new value set to autocomplete bulk
conn.insertAutocompleteValue(sessionID, projectID, "LOCATION", url.DiscardURLQuery(path))
conn.insertAutocompleteValue(sessionID, projectID, "REFERRER", url.DiscardURLQuery(e.Referrer))
return nil
}
func (conn *Conn) InsertWebClickEvent(sessionID uint64, projectID uint32, e *messages.MouseClick) error {
if e.Label == "" {
return nil
}
var host, path string
host, path, _, _ = url.GetURLParts(e.Url)
if err := conn.bulks.Get("webClickEvents").Append(sessionID, truncSqIdx(e.MsgID()), e.Timestamp, e.Label, e.Selector, host+path, path, e.HesitationTime); err != nil {
log.Printf("insert web click err: %s", err)
}
// Accumulate session updates and exec inside batch with another sql commands
conn.updateSessionEvents(sessionID, 1, 0)
// Add new value set to autocomplete bulk
conn.insertAutocompleteValue(sessionID, projectID, "CLICK", e.Label)
return nil
}
func (conn *Conn) InsertWebInputEvent(sessionID uint64, projectID uint32, e *messages.InputEvent) error {
if e.Label == "" {
return nil
}
if err := conn.bulks.Get("webInputEvents").Append(sessionID, truncSqIdx(e.MessageID), e.Timestamp, e.Label); err != nil {
log.Printf("insert web input event err: %s", err)
}
conn.updateSessionEvents(sessionID, 1, 0)
conn.insertAutocompleteValue(sessionID, projectID, "INPUT", e.Label)
return nil
}
func (conn *Conn) InsertWebInputDuration(sessionID uint64, projectID uint32, e *messages.InputChange) error {
if e.Label == "" {
return nil
}
if err := conn.bulks.Get("webInputDurations").Append(sessionID, truncSqIdx(e.ID), e.Timestamp, e.Label, e.HesitationTime, e.InputDuration); err != nil {
log.Printf("insert web input event err: %s", err)
}
conn.updateSessionEvents(sessionID, 1, 0)
conn.insertAutocompleteValue(sessionID, projectID, "INPUT", e.Label)
return nil
}
func (conn *Conn) InsertWebErrorEvent(sessionID uint64, projectID uint32, e *types.ErrorEvent) error {
errorID := e.ID(projectID)
if err := conn.bulks.Get("webErrors").Append(errorID, projectID, e.Source, e.Name, e.Message, e.Payload); err != nil {
log.Printf("insert web error err: %s", err)
}
if err := conn.bulks.Get("webErrorEvents").Append(sessionID, truncSqIdx(e.MessageID), e.Timestamp, errorID); err != nil {
log.Printf("insert web error event err: %s", err)
}
conn.updateSessionIssues(sessionID, 1, 1000)
for key, value := range e.Tags {
if err := conn.bulks.Get("webErrorTags").Append(sessionID, truncSqIdx(e.MessageID), errorID, key, value); err != nil {
log.Printf("insert web error token err: %s", err)
}
}
return nil
}
func (conn *Conn) InsertWebNetworkRequest(sessionID uint64, projectID uint32, savePayload bool, e *messages.NetworkRequest) error {
var request, response *string
if savePayload {
request = &e.Request
response = &e.Response
}
host, path, query, err := url.GetURLParts(e.URL)
conn.insertAutocompleteValue(sessionID, projectID, "REQUEST", path)
if err != nil {
return err
}
conn.bulks.Get("webNetworkRequest").Append(sessionID, e.Meta().Timestamp, truncSqIdx(e.Meta().Index), e.URL, host, path, query,
request, response, e.Status, url.EnsureMethod(e.Method), e.Duration, e.Status < 400)
return nil
}
func (conn *Conn) InsertWebGraphQL(sessionID uint64, projectID uint32, savePayload bool, e *messages.GraphQL) error {
var request, response *string
if savePayload {
request = &e.Variables
response = &e.Response
}
if err := conn.bulks.Get("webGraphQL").Append(sessionID, e.Meta().Timestamp, truncSqIdx(e.Meta().Index), e.OperationName, request, response); err != nil {
log.Printf("insert web graphQL event err: %s", err)
}
conn.insertAutocompleteValue(sessionID, projectID, "GRAPHQL", e.OperationName)
return nil
}
func (conn *Conn) InsertSessionReferrer(sessionID uint64, referrer string) error {
if referrer == "" {
return nil
}
return conn.c.Exec(`
UPDATE sessions
SET referrer = LEFT($1, 8000), base_referrer = LEFT($2, 8000)
WHERE session_id = $3 AND referrer IS NULL`,
referrer, url.DiscardURLQuery(referrer), sessionID)
}
func (conn *Conn) InsertMouseThrashing(sessionID uint64, projectID uint32, e *messages.MouseThrashing) error {
issueID := hashid.MouseThrashingID(projectID, sessionID, e.Timestamp)
if err := conn.bulks.Get("webIssues").Append(projectID, issueID, "mouse_thrashing", e.Url); err != nil {
log.Printf("insert web issue err: %s", err)
}
if err := conn.bulks.Get("webIssueEvents").Append(sessionID, issueID, e.Timestamp, truncSqIdx(e.MsgID()), nil); err != nil {
log.Printf("insert web issue event err: %s", err)
}
conn.updateSessionIssues(sessionID, 0, 50)
return nil
}

View file

@ -1,8 +1,9 @@
package postgres
package pool
import (
"context"
"errors"
"fmt"
"strings"
"time"
@ -72,9 +73,13 @@ func (p *poolImpl) Close() {
p.conn.Close()
}
func NewPool(conn *pgxpool.Pool) (Pool, error) {
if conn == nil {
return nil, errors.New("conn is empty")
func New(url string) (Pool, error) {
if url == "" {
return nil, errors.New("pg connection url is empty")
}
conn, err := pgxpool.Connect(context.Background(), url)
if err != nil {
return nil, fmt.Errorf("pgxpool.Connect error: %v", err)
}
return &poolImpl{
conn: conn,

View file

@ -1,39 +0,0 @@
package postgres
// Mechanism of combination several session updates into one
const sessionUpdateReq = `UPDATE sessions SET pages_count = pages_count + $1, events_count = events_count + $2, errors_count = errors_count + $3, issue_score = issue_score + $4 WHERE session_id = $5`
type sessionUpdates struct {
sessionID uint64
pages int
events int
errors int
issues int
}
func NewSessionUpdates(sessionID uint64) *sessionUpdates {
return &sessionUpdates{
sessionID: sessionID,
pages: 0,
events: 0,
errors: 0,
issues: 0,
}
}
func (su *sessionUpdates) addEvents(pages, events int) {
su.pages += pages
su.events += events
}
func (su *sessionUpdates) addIssues(errors, issues int) {
su.errors += errors
su.issues += issues
}
func (su *sessionUpdates) request() (string, []interface{}) {
if su.pages == 0 && su.events == 0 {
return "", nil
}
return sessionUpdateReq, []interface{}{su.pages, su.events, su.errors, su.issues, su.sessionID}
}

View file

@ -1,59 +0,0 @@
package postgres
import (
"github.com/jackc/pgtype"
"log"
. "openreplay/backend/pkg/db/types"
)
func (conn *Conn) GetSession(sessionID uint64) (*Session, error) {
s := &Session{SessionID: sessionID}
var revID, userOSVersion, userBrowserVersion, userState, userCity *string
var issueTypes pgtype.EnumArray
if err := conn.c.QueryRow(`
SELECT platform,
duration, project_id, start_ts,
user_uuid, user_os, user_os_version,
user_device, user_device_type, user_country, user_state, user_city,
rev_id, tracker_version,
user_id, user_anonymous_id, referrer,
pages_count, events_count, errors_count, issue_types,
user_browser, user_browser_version, issue_score,
metadata_1, metadata_2, metadata_3, metadata_4, metadata_5,
metadata_6, metadata_7, metadata_8, metadata_9, metadata_10
FROM sessions
WHERE session_id=$1
`,
sessionID,
).Scan(&s.Platform,
&s.Duration, &s.ProjectID, &s.Timestamp,
&s.UserUUID, &s.UserOS, &userOSVersion,
&s.UserDevice, &s.UserDeviceType, &s.UserCountry, &userState, &userCity,
&revID, &s.TrackerVersion,
&s.UserID, &s.UserAnonymousID, &s.Referrer,
&s.PagesCount, &s.EventsCount, &s.ErrorsCount, &issueTypes,
&s.UserBrowser, &userBrowserVersion, &s.IssueScore,
&s.Metadata1, &s.Metadata2, &s.Metadata3, &s.Metadata4, &s.Metadata5,
&s.Metadata6, &s.Metadata7, &s.Metadata8, &s.Metadata9, &s.Metadata10); err != nil {
return nil, err
}
if userOSVersion != nil {
s.UserOSVersion = *userOSVersion
}
if userBrowserVersion != nil {
s.UserBrowserVersion = *userBrowserVersion
}
if revID != nil {
s.RevID = *revID
}
if err := issueTypes.AssignTo(&s.IssueTypes); err != nil {
log.Printf("can't scan IssueTypes, err: %s", err)
}
if userState != nil {
s.UserState = *userState
}
if userCity != nil {
s.UserCity = *userCity
}
return s, nil
}

View file

@ -1,47 +0,0 @@
package postgres
type UnstartedSession struct {
ProjectKey string
TrackerVersion string
DoNotTrack bool
Platform string
UserAgent string
UserOS string
UserOSVersion string
UserBrowser string
UserBrowserVersion string
UserDevice string
UserDeviceType string
UserCountry string
UserState string
UserCity string
}
func (conn *Conn) InsertUnstartedSession(s UnstartedSession) error {
return conn.c.Exec(`
INSERT INTO unstarted_sessions (
project_id,
tracker_version, do_not_track,
platform, user_agent,
user_os, user_os_version,
user_browser, user_browser_version,
user_device, user_device_type,
user_country, user_state, user_city
) VALUES (
(SELECT project_id FROM projects WHERE project_key = $1),
$2, $3,
$4, $5,
$6, $7,
$8, $9,
$10, $11,
$12, NULLIF($13, ''), NULLIF($14, '')
)`,
s.ProjectKey,
s.TrackerVersion, s.DoNotTrack,
s.Platform, s.UserAgent,
s.UserOS, s.UserOSVersion,
s.UserBrowser, s.UserBrowserVersion,
s.UserDevice, s.UserDeviceType,
s.UserCountry, s.UserState, s.UserCity,
)
}

View file

@ -0,0 +1,20 @@
package redis
import (
"errors"
"github.com/go-redis/redis"
config "openreplay/backend/internal/config/redis"
)
type Client struct {
Cfg *config.Redis
Redis *redis.Client
}
func New(cfg *config.Redis) (*Client, error) {
return nil, errors.New("not implemented")
}
func (c *Client) Close() error {
return c.Redis.Close()
}

View file

@ -1,71 +0,0 @@
package types
type Session struct {
SessionID uint64
Timestamp uint64
ProjectID uint32
TrackerVersion string
RevID string
UserUUID string
UserOS string
UserOSVersion string
UserDevice string
UserCountry string
UserState string
UserCity string
Referrer *string
Duration *uint64
PagesCount int
EventsCount int
ErrorsCount int
IssueTypes []string
IssueScore int
UserID *string
UserAnonymousID *string
Metadata1 *string
Metadata2 *string
Metadata3 *string
Metadata4 *string
Metadata5 *string
Metadata6 *string
Metadata7 *string
Metadata8 *string
Metadata9 *string
Metadata10 *string
Platform string
// Only-web properties
UserAgent string
UserBrowser string
UserBrowserVersion string
UserDeviceType string
UserDeviceMemorySize uint64
UserDeviceHeapSize uint64
}
func (s *Session) SetMetadata(keyNo uint, value string) {
switch keyNo {
case 1:
s.Metadata1 = &value
case 2:
s.Metadata2 = &value
case 3:
s.Metadata3 = &value
case 4:
s.Metadata4 = &value
case 5:
s.Metadata5 = &value
case 6:
s.Metadata6 = &value
case 7:
s.Metadata7 = &value
case 8:
s.Metadata8 = &value
case 9:
s.Metadata9 = &value
case 10:
s.Metadata10 = &value
}
}

View file

@ -3,12 +3,15 @@ package featureflags
import (
"encoding/json"
"fmt"
"github.com/jackc/pgtype"
"log"
"math/rand"
"strconv"
"strings"
"time"
"openreplay/backend/pkg/db/postgres/pool"
"github.com/jackc/pgtype"
)
type FeatureFlagsRequest struct {
@ -355,3 +358,75 @@ func ComputeFeatureFlags(flags []*FeatureFlag, sessInfo *FeatureFlagsRequest) ([
}
return result, nil
}
//---------------------------------//
func (f *featureFlagsImpl) GetFeatureFlags(projectID uint32) ([]*FeatureFlag, error) {
rows, err := f.db.Query(`
SELECT ff.flag_id, ff.flag_key, ff.flag_type, ff.is_persist, ff.payload, ff.rollout_percentages, ff.filters,
ARRAY_AGG(fv.value) as values,
ARRAY_AGG(fv.payload) as payloads,
ARRAY_AGG(fv.rollout_percentage) AS variants_percentages
FROM (
SELECT ff.feature_flag_id AS flag_id, ff.flag_key AS flag_key, ff.flag_type, ff.is_persist, ff.payload,
ARRAY_AGG(fc.rollout_percentage) AS rollout_percentages,
ARRAY_AGG(fc.filters) AS filters
FROM public.feature_flags ff
LEFT JOIN public.feature_flags_conditions fc ON ff.feature_flag_id = fc.feature_flag_id
WHERE ff.project_id = $1 AND ff.is_active = TRUE
GROUP BY ff.feature_flag_id
) AS ff
LEFT JOIN public.feature_flags_variants fv ON ff.flag_type = 'multi' AND ff.flag_id = fv.feature_flag_id
GROUP BY ff.flag_id, ff.flag_key, ff.flag_type, ff.is_persist, ff.payload, ff.filters, ff.rollout_percentages;
`, projectID)
if err != nil {
return nil, err
}
defer rows.Close()
var flags []*FeatureFlag
for rows.Next() {
var flag FeatureFlagPG
if err := rows.Scan(&flag.FlagID, &flag.FlagKey, &flag.FlagType, &flag.IsPersist, &flag.Payload, &flag.RolloutPercentages,
&flag.Filters, &flag.Values, &flag.Payloads, &flag.VariantRollout); err != nil {
return nil, err
}
parsedFlag, err := ParseFeatureFlag(&flag)
if err != nil {
return nil, err
}
flags = append(flags, parsedFlag)
}
return flags, nil
}
//---------------------------------//
type FeatureFlags interface {
ComputeFlagsForSession(req *FeatureFlagsRequest) ([]interface{}, error)
}
type featureFlagsImpl struct {
db pool.Pool
}
func New(db pool.Pool) FeatureFlags {
return &featureFlagsImpl{
db: db,
}
}
func (f *featureFlagsImpl) ComputeFlagsForSession(req *FeatureFlagsRequest) ([]interface{}, error) {
// Grab flags and conditions for project
projectID, err := strconv.ParseUint(req.ProjectID, 10, 32)
if err != nil {
return nil, err
}
flags, err := f.GetFeatureFlags(uint32(projectID))
if err != nil {
return nil, err
}
return ComputeFeatureFlags(flags, req)
}

View file

@ -1,26 +1,36 @@
package postgres
package integrations
import (
"context"
"encoding/json"
"fmt"
"openreplay/backend/pkg/db/postgres/pool"
"github.com/jackc/pgx/v4"
)
type Listener struct {
conn *pgx.Conn
db pool.Pool
Integrations chan *Integration
Errors chan error
}
func NewIntegrationsListener(url string) (*Listener, error) {
type Integration struct {
ProjectID uint32 `json:"project_id"`
Provider string `json:"provider"`
RequestData json.RawMessage `json:"request_data"`
Options json.RawMessage `json:"options"`
}
func New(db pool.Pool, url string) (*Listener, error) {
conn, err := pgx.Connect(context.Background(), url)
if err != nil {
return nil, err
}
listener := &Listener{
conn: conn,
db: db,
Errors: make(chan error),
}
listener.Integrations = make(chan *Integration, 50)
@ -53,3 +63,37 @@ func (listener *Listener) listen() {
func (listener *Listener) Close() error {
return listener.conn.Close(context.Background())
}
func (listener *Listener) IterateIntegrationsOrdered(iter func(integration *Integration, err error)) error {
rows, err := listener.db.Query(`
SELECT project_id, provider, options, request_data
FROM integrations
`)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
i := new(Integration)
if err := rows.Scan(&i.ProjectID, &i.Provider, &i.Options, &i.RequestData); err != nil {
iter(nil, err)
continue
}
iter(i, nil)
}
if err = rows.Err(); err != nil {
return err
}
return nil
}
func (listener *Listener) UpdateIntegrationRequestData(i *Integration) error {
return listener.db.Exec(`
UPDATE integrations
SET request_data = $1
WHERE project_id=$2 AND provider=$3`,
i.RequestData, i.ProjectID, i.Provider,
)
}

View file

@ -5,19 +5,6 @@ import (
"openreplay/backend/pkg/metrics/common"
)
var dbBatchSize = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "db",
Name: "batch_size_bytes",
Help: "A histogram displaying the batch size in bytes.",
Buckets: common.DefaultSizeBuckets,
},
)
func RecordBatchSize(size float64) {
dbBatchSize.Observe(size)
}
var dbBatchElements = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "db",
@ -115,7 +102,6 @@ func IncreaseTotalRequests(method, table string) {
func List() []prometheus.Collector {
return []prometheus.Collector{
dbBatchSize,
dbBatchElements,
dbBatchInsertDuration,
dbBulkSize,

View file

@ -0,0 +1,72 @@
package projects
import (
"encoding/json"
"fmt"
"github.com/pkg/errors"
"openreplay/backend/pkg/db/redis"
"time"
)
type Cache interface {
Set(project *Project) error
GetByID(projectID uint32) (*Project, error)
GetByKey(projectKey string) (*Project, error)
}
type cacheImpl struct {
db *redis.Client
}
var ErrDisabledCache = errors.New("cache is disabled")
func (c *cacheImpl) Set(project *Project) error {
if c.db == nil {
return ErrDisabledCache
}
projectBytes, err := json.Marshal(project)
if err != nil {
return err
}
if _, err = c.db.Redis.Set(fmt.Sprintf("project:id:%d", project.ProjectID), projectBytes, time.Minute*10).Result(); err != nil {
return err
}
if _, err = c.db.Redis.Set(fmt.Sprintf("project:key:%s", project.ProjectKey), projectBytes, time.Minute*10).Result(); err != nil {
return err
}
return nil
}
func (c *cacheImpl) GetByID(projectID uint32) (*Project, error) {
if c.db == nil {
return nil, ErrDisabledCache
}
result, err := c.db.Redis.Get(fmt.Sprintf("project:id:%d", projectID)).Result()
if err != nil {
return nil, err
}
project := &Project{}
if err = json.Unmarshal([]byte(result), project); err != nil {
return nil, err
}
return project, nil
}
func (c *cacheImpl) GetByKey(projectKey string) (*Project, error) {
if c.db == nil {
return nil, ErrDisabledCache
}
result, err := c.db.Redis.Get(fmt.Sprintf("project:key:%s", projectKey)).Result()
if err != nil {
return nil, err
}
project := &Project{}
if err = json.Unmarshal([]byte(result), project); err != nil {
return nil, err
}
return project, nil
}
func NewCache(db *redis.Client) Cache {
return &cacheImpl{db: db}
}

View file

@ -1,4 +1,4 @@
package types
package projects
import "log"

View file

@ -0,0 +1,68 @@
package projects
import (
"errors"
"log"
"openreplay/backend/pkg/cache"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/db/redis"
"time"
)
type Projects interface {
GetProject(projectID uint32) (*Project, error)
GetProjectByKey(projectKey string) (*Project, error)
}
type projectsImpl struct {
db pool.Pool
cache Cache
projectsByID cache.Cache
projectsByKeys cache.Cache
}
func New(db pool.Pool, redis *redis.Client) Projects {
cl := NewCache(redis)
return &projectsImpl{
db: db,
cache: cl,
projectsByID: cache.New(time.Minute*5, time.Minute*10),
projectsByKeys: cache.New(time.Minute*5, time.Minute*10),
}
}
func (c *projectsImpl) GetProject(projectID uint32) (*Project, error) {
if proj, ok := c.projectsByID.Get(projectID); ok {
return proj.(*Project), nil
}
if proj, err := c.cache.GetByID(projectID); err == nil {
return proj, nil
}
p, err := c.getProject(projectID)
if err != nil {
return nil, err
}
c.projectsByID.Set(projectID, p)
if err = c.cache.Set(p); err != nil && !errors.Is(err, ErrDisabledCache) {
log.Printf("Failed to cache project: %v", err)
}
return p, nil
}
func (c *projectsImpl) GetProjectByKey(projectKey string) (*Project, error) {
if proj, ok := c.projectsByKeys.Get(projectKey); ok {
return proj.(*Project), nil
}
if proj, err := c.cache.GetByKey(projectKey); err == nil {
return proj, nil
}
p, err := c.getProjectByKey(projectKey)
if err != nil {
return nil, err
}
c.projectsByKeys.Set(projectKey, p)
if err := c.cache.Set(p); err != nil && !errors.Is(err, ErrDisabledCache) {
log.Printf("Failed to cache project: %v", err)
}
return p, nil
}

View file

@ -1,12 +1,8 @@
package postgres
package projects
import (
. "openreplay/backend/pkg/db/types"
)
func (conn *Conn) GetProjectByKey(projectKey string) (*Project, error) {
func (c *projectsImpl) getProjectByKey(projectKey string) (*Project, error) {
p := &Project{ProjectKey: projectKey}
if err := conn.c.QueryRow(`
if err := c.db.QueryRow(`
SELECT max_session_duration, sample_rate, project_id, beacon_size
FROM projects
WHERE project_key=$1 AND active = true
@ -18,10 +14,9 @@ func (conn *Conn) GetProjectByKey(projectKey string) (*Project, error) {
return p, nil
}
// TODO: logical separation of metadata
func (conn *Conn) GetProject(projectID uint32) (*Project, error) {
func (c *projectsImpl) getProject(projectID uint32) (*Project, error) {
p := &Project{ProjectID: projectID}
if err := conn.c.QueryRow(`
if err := c.db.QueryRow(`
SELECT project_key, max_session_duration, save_request_payloads,
metadata_1, metadata_2, metadata_3, metadata_4, metadata_5,
metadata_6, metadata_7, metadata_8, metadata_9, metadata_10

View file

@ -0,0 +1,49 @@
package sessions
import (
"errors"
"log"
"openreplay/backend/pkg/cache"
"time"
)
type Cache interface {
Set(session *Session) error
Get(sessionID uint64) (*Session, error)
}
var ErrSessionNotFound = errors.New("session not found")
type inMemoryCacheImpl struct {
sessions cache.Cache
redis Cache
}
func (i *inMemoryCacheImpl) Set(session *Session) error {
i.sessions.Set(session.SessionID, session)
if err := i.redis.Set(session); err != nil && !errors.Is(err, ErrDisabledCache) {
log.Printf("Failed to cache session: %v", err)
}
return nil
}
func (i *inMemoryCacheImpl) Get(sessionID uint64) (*Session, error) {
if session, ok := i.sessions.Get(sessionID); ok {
return session.(*Session), nil
}
session, err := i.redis.Get(sessionID)
if err == nil {
return session, nil
}
if !errors.Is(err, ErrDisabledCache) && err.Error() != "redis: nil" {
log.Printf("Failed to get session from cache: %v", err)
}
return nil, ErrSessionNotFound
}
func NewInMemoryCache(redisCache Cache) Cache {
return &inMemoryCacheImpl{
sessions: cache.New(time.Minute*3, time.Minute*5),
redis: redisCache,
}
}

View file

@ -0,0 +1,87 @@
package sessions
type Session struct {
SessionID uint64
Timestamp uint64
ProjectID uint32
TrackerVersion string
RevID string
UserUUID string
UserOS string
UserOSVersion string
UserDevice string
UserCountry string
UserState string
UserCity string
Referrer *string
ReferrerBase *string
Duration *uint64
PagesCount int
EventsCount int
ErrorsCount int
IssueTypes []string
IssueScore int
UserID *string
UserAnonymousID *string
Metadata1 *string
Metadata2 *string
Metadata3 *string
Metadata4 *string
Metadata5 *string
Metadata6 *string
Metadata7 *string
Metadata8 *string
Metadata9 *string
Metadata10 *string
Platform string
UserAgent string
UserBrowser string
UserBrowserVersion string
UserDeviceType string
UserDeviceMemorySize uint64
UserDeviceHeapSize uint64
SaveRequestPayload bool
EncryptionKey string
}
func (s *Session) SetMetadata(keyNo uint, value string) {
switch keyNo {
case 1:
s.Metadata1 = &value
case 2:
s.Metadata2 = &value
case 3:
s.Metadata3 = &value
case 4:
s.Metadata4 = &value
case 5:
s.Metadata5 = &value
case 6:
s.Metadata6 = &value
case 7:
s.Metadata7 = &value
case 8:
s.Metadata8 = &value
case 9:
s.Metadata9 = &value
case 10:
s.Metadata10 = &value
}
}
type UnStartedSession struct {
ProjectKey string
TrackerVersion string
DoNotTrack bool
Platform string
UserAgent string
UserOS string
UserOSVersion string
UserBrowser string
UserBrowserVersion string
UserDevice string
UserDeviceType string
UserCountry string
UserState string
UserCity string
}

View file

@ -0,0 +1,22 @@
package sessions
import (
"errors"
"openreplay/backend/pkg/db/redis"
)
type cacheImpl struct{}
func (c *cacheImpl) Set(session *Session) error {
return ErrDisabledCache
}
func (c *cacheImpl) Get(sessionID uint64) (*Session, error) {
return nil, ErrDisabledCache
}
var ErrDisabledCache = errors.New("cache is disabled")
func NewCache(db *redis.Client) Cache {
return &cacheImpl{}
}

View file

@ -0,0 +1,323 @@
package sessions
import (
"log"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/db/redis"
"openreplay/backend/pkg/projects"
"openreplay/backend/pkg/url"
)
type Sessions interface {
Add(session *Session) error
AddUnStarted(session *UnStartedSession) error
Get(sessionID uint64) (*Session, error)
GetUpdated(sessionID uint64) (*Session, error)
GetDuration(sessionID uint64) (uint64, error)
UpdateDuration(sessionID uint64, timestamp uint64) (uint64, error)
UpdateEncryptionKey(sessionID uint64, key []byte) error
UpdateUserID(sessionID uint64, userID string) error
UpdateAnonymousID(sessionID uint64, userAnonymousID string) error
UpdateReferrer(sessionID uint64, referrer string) error
UpdateMetadata(sessionID uint64, key, value string) error
UpdateEventsStats(sessionID uint64, events, pages int) error
UpdateIssuesStats(sessionID uint64, errors, issueScore int) error
Commit()
}
type sessionsImpl struct {
cache Cache
storage Storage
updates Updates
projects projects.Projects
}
func New(db pool.Pool, proj projects.Projects, redis *redis.Client) Sessions {
return &sessionsImpl{
cache: NewInMemoryCache(NewCache(redis)),
storage: NewStorage(db),
updates: NewSessionUpdates(db),
projects: proj,
}
}
// Add usage: /start endpoint in http service
func (s *sessionsImpl) Add(session *Session) error {
if cachedSession, err := s.cache.Get(session.SessionID); err == nil {
log.Printf("[!] Session %d already exists in cache, new: %+v, cached: %+v", session.SessionID, session, cachedSession)
}
err := s.storage.Add(session)
if err != nil {
return err
}
proj, err := s.projects.GetProject(session.ProjectID)
if err != nil {
return err
}
session.SaveRequestPayload = proj.SaveRequestPayloads
if err := s.cache.Set(session); err != nil {
log.Printf("Failed to cache session: %v", err)
}
return nil
}
// AddUnStarted usage: /not-started endpoint in http service
func (s *sessionsImpl) AddUnStarted(sess *UnStartedSession) error {
return s.storage.AddUnStarted(sess)
}
func (s *sessionsImpl) getFromDB(sessionID uint64) (*Session, error) {
session, err := s.storage.Get(sessionID)
if err != nil {
log.Printf("Failed to get session from postgres: %v", err)
return nil, err
}
proj, err := s.projects.GetProject(session.ProjectID)
if err != nil {
return nil, err
}
session.SaveRequestPayload = proj.SaveRequestPayloads
return session, nil
}
// Get usage: db message processor + connectors in feature
func (s *sessionsImpl) Get(sessionID uint64) (*Session, error) {
if sess, err := s.cache.Get(sessionID); err == nil {
return sess, nil
}
// Get from postgres and update in-memory and redis caches
session, err := s.getFromDB(sessionID)
if err != nil {
return nil, err
}
s.cache.Set(session)
return session, nil
}
// Special method for clickhouse connector
func (s *sessionsImpl) GetUpdated(sessionID uint64) (*Session, error) {
session, err := s.getFromDB(sessionID)
if err != nil {
return nil, err
}
if err := s.cache.Set(session); err != nil {
log.Printf("Failed to cache session: %v", err)
}
return session, nil
}
// GetDuration usage: in ender to check current and new duration to avoid duplicates
func (s *sessionsImpl) GetDuration(sessionID uint64) (uint64, error) {
if sess, err := s.cache.Get(sessionID); err == nil {
if sess.Duration != nil {
return *sess.Duration, nil
}
return 0, nil
}
session, err := s.getFromDB(sessionID)
if err != nil {
return 0, err
}
if err := s.cache.Set(session); err != nil {
log.Printf("Failed to cache session: %v", err)
}
if session.Duration != nil {
return *session.Duration, nil
}
return 0, nil
}
// UpdateDuration usage: in ender to update session duration
func (s *sessionsImpl) UpdateDuration(sessionID uint64, timestamp uint64) (uint64, error) {
newDuration, err := s.storage.UpdateDuration(sessionID, timestamp)
if err != nil {
return 0, err
}
session, err := s.cache.Get(sessionID)
if err != nil {
session, err = s.getFromDB(sessionID)
if err != nil {
return 0, err
}
}
session.Duration = &newDuration
if err := s.cache.Set(session); err != nil {
log.Printf("Failed to cache session: %v", err)
}
return newDuration, nil
}
// UpdateEncryptionKey usage: in ender to update session encryption key if encryption is enabled
func (s *sessionsImpl) UpdateEncryptionKey(sessionID uint64, key []byte) error {
if err := s.storage.InsertEncryptionKey(sessionID, key); err != nil {
return err
}
if session, err := s.cache.Get(sessionID); err != nil {
session.EncryptionKey = string(key)
if err := s.cache.Set(session); err != nil {
log.Printf("Failed to cache session: %v", err)
}
return nil
}
session, err := s.getFromDB(sessionID)
if err != nil {
log.Printf("Failed to get session from postgres: %v", err)
return nil
}
if err := s.cache.Set(session); err != nil {
log.Printf("Failed to cache session: %v", err)
}
return nil
}
// UpdateUserID usage: in db handler
func (s *sessionsImpl) UpdateUserID(sessionID uint64, userID string) error {
s.updates.AddUserID(sessionID, userID)
return nil
}
func (s *sessionsImpl) _updateUserID(sessionID uint64, userID string) error {
if err := s.storage.InsertUserID(sessionID, userID); err != nil {
return err
}
if session, err := s.cache.Get(sessionID); err != nil {
session.UserID = &userID
if err := s.cache.Set(session); err != nil {
log.Printf("Failed to cache session: %v", err)
}
return nil
}
session, err := s.getFromDB(sessionID)
if err != nil {
log.Printf("Failed to get session from postgres: %v", err)
return nil
}
if err := s.cache.Set(session); err != nil {
log.Printf("Failed to cache session: %v", err)
}
return nil
}
// UpdateAnonymousID usage: in db handler
func (s *sessionsImpl) UpdateAnonymousID(sessionID uint64, userAnonymousID string) error {
s.updates.AddUserID(sessionID, userAnonymousID)
return nil
}
func (s *sessionsImpl) _updateAnonymousID(sessionID uint64, userAnonymousID string) error {
if err := s.storage.InsertUserAnonymousID(sessionID, userAnonymousID); err != nil {
return err
}
if session, err := s.cache.Get(sessionID); err != nil {
session.UserAnonymousID = &userAnonymousID
if err := s.cache.Set(session); err != nil {
log.Printf("Failed to cache session: %v", err)
}
return nil
}
session, err := s.getFromDB(sessionID)
if err != nil {
log.Printf("Failed to get session from postgres: %v", err)
return nil
}
if err := s.cache.Set(session); err != nil {
log.Printf("Failed to cache session: %v", err)
}
return nil
}
// UpdateReferrer usage: in db handler on each page event
func (s *sessionsImpl) UpdateReferrer(sessionID uint64, referrer string) error {
if referrer == "" {
return nil
}
baseReferrer := url.DiscardURLQuery(referrer)
s.updates.SetReferrer(sessionID, referrer, baseReferrer)
return nil
}
func (s *sessionsImpl) _updateReferrer(sessionID uint64, referrer string) error {
baseReferrer := url.DiscardURLQuery(referrer)
if err := s.storage.InsertReferrer(sessionID, referrer, baseReferrer); err != nil {
return err
}
if session, err := s.cache.Get(sessionID); err != nil {
session.Referrer = &referrer
session.ReferrerBase = &baseReferrer
if err := s.cache.Set(session); err != nil {
log.Printf("Failed to cache session: %v", err)
}
return nil
}
session, err := s.getFromDB(sessionID)
if err != nil {
log.Printf("Failed to get session from postgres: %v", err)
return nil
}
if err := s.cache.Set(session); err != nil {
log.Printf("Failed to cache session: %v", err)
}
return nil
}
// UpdateMetadata usage: in db handler on each metadata event
func (s *sessionsImpl) UpdateMetadata(sessionID uint64, key, value string) error {
session, err := s.Get(sessionID)
if err != nil {
return err
}
project, err := s.projects.GetProject(session.ProjectID)
if err != nil {
return err
}
keyNo := project.GetMetadataNo(key)
if keyNo == 0 {
return nil
}
s.updates.SetMetadata(sessionID, keyNo, value)
return nil
}
func (s *sessionsImpl) _updateMetadata(sessionID uint64, key, value string) error {
session, err := s.Get(sessionID)
if err != nil {
return err
}
project, err := s.projects.GetProject(session.ProjectID)
if err != nil {
return err
}
keyNo := project.GetMetadataNo(key)
if keyNo == 0 {
return nil
}
if err := s.storage.InsertMetadata(sessionID, keyNo, value); err != nil {
return err
}
session.SetMetadata(keyNo, value)
if err := s.cache.Set(session); err != nil {
log.Printf("Failed to cache session: %v", err)
}
return nil
}
func (s *sessionsImpl) UpdateEventsStats(sessionID uint64, events, pages int) error {
s.updates.AddEvents(sessionID, events, pages)
return nil
}
func (s *sessionsImpl) UpdateIssuesStats(sessionID uint64, errors, issueScore int) error {
s.updates.AddIssues(sessionID, errors, issueScore)
return nil
}
func (s *sessionsImpl) Commit() {
s.updates.Commit()
}

View file

@ -0,0 +1,206 @@
package sessions
import (
"fmt"
"github.com/jackc/pgtype"
"log"
"openreplay/backend/pkg/db/postgres/pool"
)
type Storage interface {
Add(sess *Session) error
AddUnStarted(sess *UnStartedSession) error
Get(sessionID uint64) (*Session, error)
GetDuration(sessionID uint64) (uint64, error)
UpdateDuration(sessionID uint64, timestamp uint64) (uint64, error)
InsertEncryptionKey(sessionID uint64, key []byte) error
InsertUserID(sessionID uint64, userID string) error
InsertUserAnonymousID(sessionID uint64, userAnonymousID string) error
InsertReferrer(sessionID uint64, referrer, baseReferrer string) error
InsertMetadata(sessionID uint64, keyNo uint, value string) error
}
type storageImpl struct {
db pool.Pool
}
func NewStorage(db pool.Pool) Storage {
return &storageImpl{
db: db,
}
}
func (s *storageImpl) Add(sess *Session) error {
return s.db.Exec(`
INSERT INTO sessions (
session_id, project_id, start_ts,
user_uuid, user_device, user_device_type, user_country,
user_os, user_os_version,
rev_id,
tracker_version, issue_score,
platform,
user_agent, user_browser, user_browser_version, user_device_memory_size, user_device_heap_size,
user_id, user_state, user_city
) VALUES (
$1, $2, $3,
$4, $5, $6, $7,
$8, NULLIF($9, ''),
NULLIF($10, ''),
$11, $12,
$13,
NULLIF($14, ''), NULLIF($15, ''), NULLIF($16, ''), NULLIF($17, 0), NULLIF($18, 0::bigint),
NULLIF(LEFT($19, 8000), ''), NULLIF($20, ''), NULLIF($21, '')
)`,
sess.SessionID, sess.ProjectID, sess.Timestamp,
sess.UserUUID, sess.UserDevice, sess.UserDeviceType, sess.UserCountry,
sess.UserOS, sess.UserOSVersion,
sess.RevID,
sess.TrackerVersion, sess.Timestamp/1000,
sess.Platform,
sess.UserAgent, sess.UserBrowser, sess.UserBrowserVersion, sess.UserDeviceMemorySize, sess.UserDeviceHeapSize,
sess.UserID, sess.UserState, sess.UserCity,
)
}
func (s *storageImpl) AddUnStarted(sess *UnStartedSession) error {
return s.db.Exec(`
INSERT INTO unstarted_sessions (
project_id,
tracker_version, do_not_track,
platform, user_agent,
user_os, user_os_version,
user_browser, user_browser_version,
user_device, user_device_type,
user_country, user_state, user_city
) VALUES (
(SELECT project_id FROM projects WHERE project_key = $1),
$2, $3,
$4, $5,
$6, $7,
$8, $9,
$10, $11,
$12, NULLIF($13, ''), NULLIF($14, '')
)`,
sess.ProjectKey,
sess.TrackerVersion, sess.DoNotTrack,
sess.Platform, sess.UserAgent,
sess.UserOS, sess.UserOSVersion,
sess.UserBrowser, sess.UserBrowserVersion,
sess.UserDevice, sess.UserDeviceType,
sess.UserCountry, sess.UserState, sess.UserCity,
)
}
func (s *storageImpl) Get(sessionID uint64) (*Session, error) {
sess := &Session{SessionID: sessionID}
var revID, userOSVersion, userBrowserVersion, userState, userCity *string
var issueTypes pgtype.EnumArray
if err := s.db.QueryRow(`
SELECT platform,
duration, project_id, start_ts,
user_uuid, user_os, user_os_version,
user_device, user_device_type, user_country, user_state, user_city,
rev_id, tracker_version,
user_id, user_anonymous_id, referrer,
pages_count, events_count, errors_count, issue_types,
user_browser, user_browser_version, issue_score,
metadata_1, metadata_2, metadata_3, metadata_4, metadata_5,
metadata_6, metadata_7, metadata_8, metadata_9, metadata_10
FROM sessions
WHERE session_id=$1
`,
sessionID,
).Scan(&sess.Platform,
&sess.Duration, &sess.ProjectID, &sess.Timestamp,
&sess.UserUUID, &sess.UserOS, &userOSVersion,
&sess.UserDevice, &sess.UserDeviceType, &sess.UserCountry, &userState, &userCity,
&revID, &sess.TrackerVersion,
&sess.UserID, &sess.UserAnonymousID, &sess.Referrer,
&sess.PagesCount, &sess.EventsCount, &sess.ErrorsCount, &issueTypes,
&sess.UserBrowser, &userBrowserVersion, &sess.IssueScore,
&sess.Metadata1, &sess.Metadata2, &sess.Metadata3, &sess.Metadata4, &sess.Metadata5,
&sess.Metadata6, &sess.Metadata7, &sess.Metadata8, &sess.Metadata9, &sess.Metadata10); err != nil {
return nil, err
}
if userOSVersion != nil {
sess.UserOSVersion = *userOSVersion
}
if userBrowserVersion != nil {
sess.UserBrowserVersion = *userBrowserVersion
}
if revID != nil {
sess.RevID = *revID
}
if err := issueTypes.AssignTo(&sess.IssueTypes); err != nil {
log.Printf("can't scan IssueTypes, err: %s", err)
}
if userState != nil {
sess.UserState = *userState
}
if userCity != nil {
sess.UserCity = *userCity
}
return sess, nil
}
func (s *storageImpl) GetDuration(sessionID uint64) (uint64, error) {
var dur uint64
if err := s.db.QueryRow("SELECT COALESCE( duration, 0 ) FROM sessions WHERE session_id=$1", sessionID).Scan(&dur); err != nil {
return 0, err
}
return dur, nil
}
func (s *storageImpl) UpdateDuration(sessionID uint64, timestamp uint64) (uint64, error) {
var dur uint64
if err := s.db.QueryRow(`
UPDATE sessions SET duration=$2 - start_ts
WHERE session_id=$1
RETURNING duration
`,
sessionID, timestamp,
).Scan(&dur); err != nil {
return 0, err
}
return dur, nil
}
func (s *storageImpl) InsertEncryptionKey(sessionID uint64, key []byte) error {
sqlRequest := `
UPDATE sessions
SET file_key = $2
WHERE session_id = $1`
return s.db.Exec(sqlRequest, sessionID, string(key))
}
func (s *storageImpl) InsertUserID(sessionID uint64, userID string) error {
sqlRequest := `
UPDATE sessions
SET user_id = LEFT($1, 8000)
WHERE session_id = $2`
return s.db.Exec(sqlRequest, userID, sessionID)
}
func (s *storageImpl) InsertUserAnonymousID(sessionID uint64, userAnonymousID string) error {
sqlRequest := `
UPDATE sessions
SET user_anonymous_id = LEFT($1, 8000)
WHERE session_id = $2`
return s.db.Exec(sqlRequest, userAnonymousID, sessionID)
}
func (s *storageImpl) InsertReferrer(sessionID uint64, referrer, baseReferrer string) error {
sqlRequest := `
UPDATE sessions
SET referrer = LEFT($1, 8000), base_referrer = LEFT($2, 8000)
WHERE session_id = $3 AND referrer IS NULL`
return s.db.Exec(sqlRequest, referrer, baseReferrer, sessionID)
}
func (s *storageImpl) InsertMetadata(sessionID uint64, keyNo uint, value string) error {
sqlRequest := `
UPDATE sessions
SET metadata_%v = LEFT($1, 8000)
WHERE session_id = $2`
return s.db.Exec(fmt.Sprintf(sqlRequest, keyNo), value, sessionID)
}

View file

@ -0,0 +1,222 @@
package sessions
import (
"fmt"
"log"
"openreplay/backend/pkg/db/postgres/pool"
"time"
"github.com/jackc/pgx/v4"
"openreplay/backend/pkg/metrics/database"
)
type Updates interface {
AddUserID(sessionID uint64, userID string)
AddAnonID(sessionID uint64, userID string)
SetReferrer(sessionID uint64, referrer, baseReferrer string)
SetMetadata(sessionID uint64, keyNo uint, value string)
AddEvents(sessionID uint64, events, pages int)
AddIssues(sessionID uint64, errors, issues int)
Commit()
}
type updatesImpl struct {
db pool.Pool
updates map[uint64]*sessionUpdate
}
func NewSessionUpdates(db pool.Pool) Updates {
return &updatesImpl{
db: db,
updates: make(map[uint64]*sessionUpdate),
}
}
func (u *updatesImpl) AddUserID(sessionID uint64, userID string) {
if u.updates[sessionID] == nil {
u.updates[sessionID] = NewSessionUpdate(sessionID)
}
u.updates[sessionID].setUserID(userID)
}
func (u *updatesImpl) AddAnonID(sessionID uint64, userID string) {
if u.updates[sessionID] == nil {
u.updates[sessionID] = NewSessionUpdate(sessionID)
}
u.updates[sessionID].setUserID(userID)
}
func (u *updatesImpl) SetReferrer(sessionID uint64, referrer, baseReferrer string) {
if u.updates[sessionID] == nil {
u.updates[sessionID] = NewSessionUpdate(sessionID)
}
u.updates[sessionID].setReferrer(referrer, baseReferrer)
}
func (u *updatesImpl) SetMetadata(sessionID uint64, keyNo uint, value string) {
if u.updates[sessionID] == nil {
u.updates[sessionID] = NewSessionUpdate(sessionID)
}
u.updates[sessionID].setMetadata(keyNo, value)
}
func (u *updatesImpl) AddEvents(sessionID uint64, events, pages int) {
if u.updates[sessionID] == nil {
u.updates[sessionID] = NewSessionUpdate(sessionID)
}
u.updates[sessionID].addEvents(events, pages)
}
func (u *updatesImpl) AddIssues(sessionID uint64, errors, issues int) {
if u.updates[sessionID] == nil {
u.updates[sessionID] = NewSessionUpdate(sessionID)
}
u.updates[sessionID].addIssues(errors, issues)
}
func (u *updatesImpl) Commit() {
b := &pgx.Batch{}
for _, upd := range u.updates {
if str, args := upd.request(); str != "" {
b.Queue(str, args...)
}
}
// Record batch size
database.RecordBatchElements(float64(b.Len()))
start := time.Now()
// Send batch to db and execute
br := u.db.SendBatch(b)
l := b.Len()
failed := false
for i := 0; i < l; i++ {
if _, err := br.Exec(); err != nil {
log.Printf("Error in PG batch.Exec(): %v \n", err)
failed = true
break
}
}
if err := br.Close(); err != nil {
log.Printf("Error in PG batch.Close(): %v \n", err)
}
if failed {
for _, upd := range u.updates {
if str, args := upd.request(); str != "" {
if err := u.db.Exec(str, args...); err != nil {
log.Printf("Error in PG Exec(): %v \n", err)
}
}
}
}
database.RecordBatchInsertDuration(float64(time.Now().Sub(start).Milliseconds()))
u.updates = make(map[uint64]*sessionUpdate)
}
type sessionUpdate struct {
sessionID uint64
userID *string
anonID *string
referrer *string
baseReferrer *string
metadata map[uint]string
pages int
events int
errors int
issues int
}
func NewSessionUpdate(sessionID uint64) *sessionUpdate {
return &sessionUpdate{
sessionID: sessionID,
pages: 0,
events: 0,
errors: 0,
issues: 0,
metadata: make(map[uint]string),
}
}
func (su *sessionUpdate) setUserID(userID string) {
su.userID = &userID
}
func (su *sessionUpdate) setAnonID(anonID string) {
su.anonID = &anonID
}
func (su *sessionUpdate) setReferrer(referrer, baseReferrer string) {
su.referrer = &referrer
su.baseReferrer = &baseReferrer
}
func (su *sessionUpdate) setMetadata(keyNo uint, value string) {
su.metadata[keyNo] = value
}
func (su *sessionUpdate) addEvents(events, pages int) {
su.events += events
su.pages += pages
}
func (su *sessionUpdate) addIssues(errors, issues int) {
su.errors += errors
su.issues += issues
}
func (su *sessionUpdate) request() (string, []interface{}) {
sqlReq := "UPDATE sessions SET"
sqlArgs := make([]interface{}, 0)
varsCounter := 0
if su.userID != nil {
varsCounter++
sqlReq += fmt.Sprintf(" user_id = LEFT($%d, 8000),", varsCounter)
sqlArgs = append(sqlArgs, *su.userID)
}
if su.anonID != nil {
varsCounter++
sqlReq += fmt.Sprintf(" user_anonymous_id = LEFT($%d, 8000),", varsCounter)
sqlArgs = append(sqlArgs, *su.anonID)
}
if su.referrer != nil {
varsCounter += 2
sqlReq += fmt.Sprintf(" referrer = LEFT($%d, 8000), base_referrer = LEFT($%d, 8000),", varsCounter-1, varsCounter)
sqlArgs = append(sqlArgs, *su.referrer, *su.baseReferrer)
}
for keyNo, value := range su.metadata {
varsCounter++
sqlReq += fmt.Sprintf(" metadata_%d = LEFT($%d, 8000),", keyNo, varsCounter)
sqlArgs = append(sqlArgs, value)
}
if su.pages > 0 {
varsCounter++
sqlReq += fmt.Sprintf(" pages_count = pages_count + $%d,", varsCounter)
sqlArgs = append(sqlArgs, su.pages)
}
if su.events > 0 {
varsCounter++
sqlReq += fmt.Sprintf(" events_count = events_count + $%d,", varsCounter)
sqlArgs = append(sqlArgs, su.events)
}
if su.errors > 0 {
varsCounter++
sqlReq += fmt.Sprintf(" errors_count = errors_count + $%d,", varsCounter)
sqlArgs = append(sqlArgs, su.errors)
}
if su.issues > 0 {
varsCounter++
sqlReq += fmt.Sprintf(" issue_score = issue_score + $%d,", varsCounter)
sqlArgs = append(sqlArgs, su.issues)
}
if varsCounter == 0 {
return "", nil
}
varsCounter++
sqlReq = sqlReq[:len(sqlReq)-1] + fmt.Sprintf(" WHERE session_id = $%d", varsCounter)
sqlArgs = append(sqlArgs, su.sessionID)
return sqlReq, sqlArgs
}

View file

@ -98,21 +98,16 @@ func WrapGraphQL(m *messages.GraphQL, projID uint32) *GraphQLFTS {
}
}
func (s *saverImpl) sendToFTS(msg messages.Message) {
func (s *saverImpl) sendToFTS(msg messages.Message, projID uint32) {
// Skip, if FTS is disabled
if s.producer == nil {
return
}
var (
projID uint32
event []byte
err error
event []byte
err error
)
if sess, err := s.pg.Cache.GetSession(msg.SessionID()); err == nil {
projID = sess.ProjectID
}
switch m := msg.(type) {
// Common
case *messages.NetworkRequest:

View file

@ -1,15 +1,14 @@
package datasaver
import (
"errors"
"log"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/clickhouse"
"openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/env"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/sessions"
)
func (s *saverImpl) init() {
@ -17,70 +16,64 @@ func (s *saverImpl) init() {
if err := s.ch.Prepare(); err != nil {
log.Fatalf("can't prepare clickhouse: %s", err)
}
s.pg.Conn.SetClickHouse(s.ch)
s.pg.SetClickHouse(s.ch)
if s.cfg.UseQuickwit {
s.producer = queue.NewProducer(s.cfg.MessageSizeLimit, true)
}
}
func (s *saverImpl) handleExtraMessage(msg Message) error {
// Send data to quickwit
s.sendToFTS(msg)
func (s *saverImpl) handleExtraMessage(msg messages.Message) error {
// Get session data
var (
session *types.Session
session *sessions.Session
err error
)
if msg.TypeID() == MsgSessionEnd {
session, err = s.pg.GetSession(msg.SessionID())
if msg.TypeID() == messages.MsgSessionEnd {
session, err = s.sessions.GetUpdated(msg.SessionID())
} else {
session, err = s.pg.Cache.GetSession(msg.SessionID())
session, err = s.sessions.Get(msg.SessionID())
}
if session == nil {
if err != nil && !errors.Is(err, cache.NilSessionInCacheError) {
log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, msg.SessionID(), msg)
}
if err != nil || session == nil {
log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, msg.SessionID(), msg)
return err
}
// Send data to quickwit
s.sendToFTS(msg, session.ProjectID)
// Handle message
switch m := msg.(type) {
case *SessionEnd:
case *messages.SessionEnd:
return s.ch.InsertWebSession(session)
case *PerformanceTrackAggr:
case *messages.PerformanceTrackAggr:
return s.ch.InsertWebPerformanceTrackAggr(session, m)
case *MouseClick:
case *messages.MouseClick:
return s.ch.InsertWebClickEvent(session, m)
case *InputEvent:
case *messages.InputEvent:
return s.ch.InsertWebInputEvent(session, m)
// Unique for Web
case *PageEvent:
case *messages.PageEvent:
return s.ch.InsertWebPageEvent(session, m)
case *ResourceTiming:
case *messages.ResourceTiming:
return s.ch.InsertWebResourceEvent(session, m)
case *JSException:
case *messages.JSException:
return s.ch.InsertWebErrorEvent(session, types.WrapJSException(m))
case *IntegrationEvent:
case *messages.IntegrationEvent:
return s.ch.InsertWebErrorEvent(session, types.WrapIntegrationEvent(m))
case *IssueEvent:
case *messages.IssueEvent:
return s.ch.InsertIssue(session, m)
case *CustomEvent:
case *messages.CustomEvent:
return s.ch.InsertCustom(session, m)
case *NetworkRequest:
project, err := s.pg.Cache.GetProject(session.ProjectID)
if err != nil {
log.Printf("can't get project: %s", err)
} else {
if err := s.ch.InsertRequest(session, m, project.SaveRequestPayloads); err != nil {
log.Printf("can't insert request event into clickhouse: %s", err)
}
case *messages.NetworkRequest:
if err := s.ch.InsertRequest(session, m, session.SaveRequestPayload); err != nil {
log.Printf("can't insert request event into clickhouse: %s", err)
}
case *GraphQL:
case *messages.GraphQL:
return s.ch.InsertGraphQL(session, m)
case *InputChange:
case *messages.InputChange:
return s.ch.InsertWebInputDuration(session, m)
case *MouseThrashing:
case *messages.MouseThrashing:
return s.ch.InsertMouseThrashing(session, m)
}
return nil

View file

@ -9,6 +9,7 @@ import (
"openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/hashid"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/sessions"
"openreplay/backend/pkg/url"
"os"
"strings"
@ -21,20 +22,20 @@ type Connector interface {
Prepare() error
Commit() error
Stop() error
InsertWebSession(session *types.Session) error
InsertWebResourceEvent(session *types.Session, msg *messages.ResourceTiming) error
InsertWebPageEvent(session *types.Session, msg *messages.PageEvent) error
InsertWebClickEvent(session *types.Session, msg *messages.MouseClick) error
InsertWebInputEvent(session *types.Session, msg *messages.InputEvent) error
InsertWebErrorEvent(session *types.Session, msg *types.ErrorEvent) error
InsertWebPerformanceTrackAggr(session *types.Session, msg *messages.PerformanceTrackAggr) error
InsertAutocomplete(session *types.Session, msgType, msgValue string) error
InsertRequest(session *types.Session, msg *messages.NetworkRequest, savePayload bool) error
InsertCustom(session *types.Session, msg *messages.CustomEvent) error
InsertGraphQL(session *types.Session, msg *messages.GraphQL) error
InsertIssue(session *types.Session, msg *messages.IssueEvent) error
InsertWebInputDuration(session *types.Session, msg *messages.InputChange) error
InsertMouseThrashing(session *types.Session, msg *messages.MouseThrashing) error
InsertWebSession(session *sessions.Session) error
InsertWebResourceEvent(session *sessions.Session, msg *messages.ResourceTiming) error
InsertWebPageEvent(session *sessions.Session, msg *messages.PageEvent) error
InsertWebClickEvent(session *sessions.Session, msg *messages.MouseClick) error
InsertWebInputEvent(session *sessions.Session, msg *messages.InputEvent) error
InsertWebErrorEvent(session *sessions.Session, msg *types.ErrorEvent) error
InsertWebPerformanceTrackAggr(session *sessions.Session, msg *messages.PerformanceTrackAggr) error
InsertAutocomplete(session *sessions.Session, msgType, msgValue string) error
InsertRequest(session *sessions.Session, msg *messages.NetworkRequest, savePayload bool) error
InsertCustom(session *sessions.Session, msg *messages.CustomEvent) error
InsertGraphQL(session *sessions.Session, msg *messages.GraphQL) error
InsertIssue(session *sessions.Session, msg *messages.IssueEvent) error
InsertWebInputDuration(session *sessions.Session, msg *messages.InputChange) error
InsertMouseThrashing(session *sessions.Session, msg *messages.MouseThrashing) error
}
type task struct {
@ -177,7 +178,7 @@ func (c *connectorImpl) checkError(name string, err error) {
}
}
func (c *connectorImpl) InsertWebInputDuration(session *types.Session, msg *messages.InputChange) error {
func (c *connectorImpl) InsertWebInputDuration(session *sessions.Session, msg *messages.InputChange) error {
if msg.Label == "" {
return nil
}
@ -197,7 +198,7 @@ func (c *connectorImpl) InsertWebInputDuration(session *types.Session, msg *mess
return nil
}
func (c *connectorImpl) InsertMouseThrashing(session *types.Session, msg *messages.MouseThrashing) error {
func (c *connectorImpl) InsertMouseThrashing(session *sessions.Session, msg *messages.MouseThrashing) error {
issueID := hashid.MouseThrashingID(session.ProjectID, session.SessionID, msg.Timestamp)
// Insert issue event to batches
if err := c.batches["issuesEvents"].Append(
@ -225,7 +226,7 @@ func (c *connectorImpl) InsertMouseThrashing(session *types.Session, msg *messag
return nil
}
func (c *connectorImpl) InsertIssue(session *types.Session, msg *messages.IssueEvent) error {
func (c *connectorImpl) InsertIssue(session *sessions.Session, msg *messages.IssueEvent) error {
issueID := hashid.IssueID(session.ProjectID, msg)
// Check issue type before insert to avoid panic from clickhouse lib
switch msg.Type {
@ -259,7 +260,7 @@ func (c *connectorImpl) InsertIssue(session *types.Session, msg *messages.IssueE
return nil
}
func (c *connectorImpl) InsertWebSession(session *types.Session) error {
func (c *connectorImpl) InsertWebSession(session *sessions.Session) error {
if session.Duration == nil {
return errors.New("trying to insert session with nil duration")
}
@ -303,7 +304,7 @@ func (c *connectorImpl) InsertWebSession(session *types.Session) error {
return nil
}
func (c *connectorImpl) InsertWebResourceEvent(session *types.Session, msg *messages.ResourceTiming) error {
func (c *connectorImpl) InsertWebResourceEvent(session *sessions.Session, msg *messages.ResourceTiming) error {
msgType := url.GetResourceType(msg.Initiator, msg.URL)
resourceType := url.EnsureType(msgType)
if resourceType == "" {
@ -329,7 +330,7 @@ func (c *connectorImpl) InsertWebResourceEvent(session *types.Session, msg *mess
return nil
}
func (c *connectorImpl) InsertWebPageEvent(session *types.Session, msg *messages.PageEvent) error {
func (c *connectorImpl) InsertWebPageEvent(session *sessions.Session, msg *messages.PageEvent) error {
if err := c.batches["pages"].Append(
session.SessionID,
uint16(session.ProjectID),
@ -356,7 +357,7 @@ func (c *connectorImpl) InsertWebPageEvent(session *types.Session, msg *messages
return nil
}
func (c *connectorImpl) InsertWebClickEvent(session *types.Session, msg *messages.MouseClick) error {
func (c *connectorImpl) InsertWebClickEvent(session *sessions.Session, msg *messages.MouseClick) error {
if msg.Label == "" {
return nil
}
@ -375,7 +376,7 @@ func (c *connectorImpl) InsertWebClickEvent(session *types.Session, msg *message
return nil
}
func (c *connectorImpl) InsertWebInputEvent(session *types.Session, msg *messages.InputEvent) error {
func (c *connectorImpl) InsertWebInputEvent(session *sessions.Session, msg *messages.InputEvent) error {
if msg.Label == "" {
return nil
}
@ -395,7 +396,7 @@ func (c *connectorImpl) InsertWebInputEvent(session *types.Session, msg *message
return nil
}
func (c *connectorImpl) InsertWebErrorEvent(session *types.Session, msg *types.ErrorEvent) error {
func (c *connectorImpl) InsertWebErrorEvent(session *sessions.Session, msg *types.ErrorEvent) error {
keys, values := make([]string, 0, len(msg.Tags)), make([]*string, 0, len(msg.Tags))
for k, v := range msg.Tags {
keys = append(keys, k)
@ -427,7 +428,7 @@ func (c *connectorImpl) InsertWebErrorEvent(session *types.Session, msg *types.E
return nil
}
func (c *connectorImpl) InsertWebPerformanceTrackAggr(session *types.Session, msg *messages.PerformanceTrackAggr) error {
func (c *connectorImpl) InsertWebPerformanceTrackAggr(session *sessions.Session, msg *messages.PerformanceTrackAggr) error {
var timestamp uint64 = (msg.TimestampStart + msg.TimestampEnd) / 2
if err := c.batches["performance"].Append(
session.SessionID,
@ -455,7 +456,7 @@ func (c *connectorImpl) InsertWebPerformanceTrackAggr(session *types.Session, ms
return nil
}
func (c *connectorImpl) InsertAutocomplete(session *types.Session, msgType, msgValue string) error {
func (c *connectorImpl) InsertAutocomplete(session *sessions.Session, msgType, msgValue string) error {
if len(msgValue) == 0 {
return nil
}
@ -470,7 +471,7 @@ func (c *connectorImpl) InsertAutocomplete(session *types.Session, msgType, msgV
return nil
}
func (c *connectorImpl) InsertRequest(session *types.Session, msg *messages.NetworkRequest, savePayload bool) error {
func (c *connectorImpl) InsertRequest(session *sessions.Session, msg *messages.NetworkRequest, savePayload bool) error {
urlMethod := url.EnsureMethod(msg.Method)
if urlMethod == "" {
return fmt.Errorf("can't parse http method. sess: %d, method: %s", session.SessionID, msg.Method)
@ -500,7 +501,7 @@ func (c *connectorImpl) InsertRequest(session *types.Session, msg *messages.Netw
return nil
}
func (c *connectorImpl) InsertCustom(session *types.Session, msg *messages.CustomEvent) error {
func (c *connectorImpl) InsertCustom(session *sessions.Session, msg *messages.CustomEvent) error {
if err := c.batches["custom"].Append(
session.SessionID,
uint16(session.ProjectID),
@ -516,7 +517,7 @@ func (c *connectorImpl) InsertCustom(session *types.Session, msg *messages.Custo
return nil
}
func (c *connectorImpl) InsertGraphQL(session *types.Session, msg *messages.GraphQL) error {
func (c *connectorImpl) InsertGraphQL(session *sessions.Session, msg *messages.GraphQL) error {
if err := c.batches["graphql"].Append(
session.SessionID,
uint16(session.ProjectID),

View file

@ -0,0 +1,45 @@
package redis
import (
"errors"
"github.com/go-redis/redis"
config "openreplay/backend/internal/config/redis"
"strings"
)
type Client struct {
Cfg *config.Redis
Redis *redis.Client
}
func New(cfg *config.Redis) (*Client, error) {
if cfg == nil {
return nil, errors.New("redis config is nil")
}
if !cfg.UseRedisCache {
return nil, errors.New("redis cache is disabled")
}
if cfg.ConnectionURL == "" {
return nil, errors.New("redis connection url is empty")
}
connUrl := cfg.ConnectionURL
if !strings.Contains(connUrl, "://") {
connUrl = "redis://" + connUrl
}
options, err := redis.ParseURL(connUrl)
if err != nil {
return nil, err
}
client := redis.NewClient(options)
if _, err := client.Ping().Result(); err != nil {
return nil, err
}
return &Client{
Cfg: cfg,
Redis: client,
}, nil
}
func (c *Client) Close() error {
return c.Redis.Close()
}

View file

@ -0,0 +1,173 @@
package redis
import (
"errors"
"fmt"
"github.com/go-redis/redis"
"log"
"net"
redis2 "openreplay/backend/pkg/db/redis"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue/types"
"sort"
"strconv"
"strings"
)
type idsInfo struct {
id []string
ts []int64
}
type streamPendingIDsMap map[string]*idsInfo
type consumerImpl struct {
client *redis2.Client
group string
streams []string
idsPending streamPendingIDsMap
lastTs int64
autoCommit bool
event chan interface{}
}
type QueueMessage struct {
Data []byte
Info *messages.BatchInfo
}
func (c *consumerImpl) ConsumeNext() error {
//TODO implement me
panic("implement me")
}
func (c *consumerImpl) Close() {
//TODO implement me
panic("implement me")
}
func NewConsumer(client *redis2.Client, group string, streams []string) types.Consumer {
idsPending := make(streamPendingIDsMap)
streamsCount := len(streams)
for i := 0; i < streamsCount; i++ {
err := client.Redis.XGroupCreateMkStream(streams[i], group, "0").Err()
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
log.Fatalln(err)
}
idsPending[streams[i]] = new(idsInfo)
// ">" is for never-delivered messages.
// Otherwise - never acknowledged only
// TODO: understand why in case of "0" it eats 100% cpu
streams = append(streams, ">")
}
return &consumerImpl{
client: client,
streams: streams,
group: group,
autoCommit: true,
idsPending: idsPending,
event: make(chan interface{}, 4),
}
}
func (c *consumerImpl) ConsumeNextOld() (*QueueMessage, error) {
res, err := c.client.Redis.XReadGroup(&redis.XReadGroupArgs{
Group: c.group,
Consumer: c.group,
Streams: c.streams,
Count: c.client.Cfg.ReadCount,
Block: c.client.Cfg.ReadBlockDuration,
}).Result()
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
return nil, err
}
if err == redis.Nil {
return nil, errors.New("key does not exist")
}
return nil, err
}
// TODO: remove debug logs
log.Printf("info: res.size: %d", len(res))
for _, r := range res {
log.Printf("info: messages.size: %d", len(r.Messages))
for _, m := range r.Messages {
sessionIDString, ok := m.Values["sessionID"].(string)
if !ok {
return nil, fmt.Errorf("can't cast sessionID value for messageID %s", m.ID)
}
sessionID, err := strconv.ParseUint(sessionIDString, 10, 64)
if err != nil {
return nil, fmt.Errorf("can't parse sessionID %s for messageID %s", sessionIDString, m.ID)
}
valueString, ok := m.Values["value"].(string)
if !ok {
return nil, fmt.Errorf("can't cast value for messageID %s", m.ID)
}
// Assuming that ID has a correct format
idParts := strings.Split(m.ID, "-")
ts, _ := strconv.ParseUint(idParts[0], 10, 64)
idx, _ := strconv.ParseUint(idParts[1], 10, 64)
if idx > 0x1FFF {
return nil, errors.New("too many messages per ms in redis")
}
bID := ts<<13 | (idx & 0x1FFF) // Max: 4096 messages/ms for 69 years
result := &QueueMessage{
Data: []byte(valueString),
Info: messages.NewBatchInfo(sessionID, r.Stream, bID, 0, int64(ts)),
}
if c.autoCommit {
if err = c.client.Redis.XAck(r.Stream, c.group, m.ID).Err(); err != nil {
log.Printf("Acknoledgment error for messageID %s, err: %s", m.ID, err.Error())
}
} else {
c.lastTs = int64(ts)
c.idsPending[r.Stream].id = append(c.idsPending[r.Stream].id, m.ID)
c.idsPending[r.Stream].ts = append(c.idsPending[r.Stream].ts, int64(ts))
}
return result, nil
}
}
return nil, errors.New("no messages")
}
func (c *consumerImpl) CommitBack(gap int64) error {
if c.lastTs == 0 {
return nil
}
maxTs := c.lastTs - gap
for stream, idsInfo := range c.idsPending {
if len(idsInfo.id) == 0 {
continue
}
maxI := sort.Search(len(idsInfo.ts), func(i int) bool {
return idsInfo.ts[i] > maxTs
})
if err := c.client.Redis.XAck(stream, c.group, idsInfo.id[:maxI]...).Err(); err != nil {
return fmt.Errorf("RedisStreams: Acknoledgment error on commit %v", err)
}
c.idsPending[stream].id = idsInfo.id[maxI:]
c.idsPending[stream].ts = idsInfo.ts[maxI:]
}
return nil
}
func (c *consumerImpl) Commit() error {
for stream, idsInfo := range c.idsPending {
if len(idsInfo.id) == 0 {
continue
}
if err := c.client.Redis.XAck(stream, c.group, idsInfo.id...).Err(); err != nil {
return fmt.Errorf("RedisStreams: Acknoledgment error on commit %v", err)
}
c.idsPending[stream].id = nil
c.idsPending[stream].ts = nil
}
return nil
}
func (c *consumerImpl) Rebalanced() <-chan interface{} {
return c.event
}

View file

@ -0,0 +1,41 @@
package redis
import (
"github.com/go-redis/redis"
redis2 "openreplay/backend/pkg/db/redis"
"openreplay/backend/pkg/queue/types"
)
type producerImpl struct {
client *redis2.Client
}
func (c *producerImpl) Close(timeout int) {
//TODO implement me
panic("implement me")
}
func NewProducer(client *redis2.Client) types.Producer {
return &producerImpl{
client: client,
}
}
func (c *producerImpl) Produce(topic string, key uint64, value []byte) error {
args := &redis.XAddArgs{
Stream: topic,
Values: map[string]interface{}{
"sessionID": key,
"value": value,
},
MaxLenApprox: c.client.Cfg.MaxLength,
}
_, err := c.client.Redis.XAdd(args).Result()
return err
}
func (c *producerImpl) ProduceToPartition(topic string, partition, key uint64, value []byte) error {
return c.Produce(topic, key, value)
}
func (c *producerImpl) Flush(timeout int) {}

View file

@ -0,0 +1,58 @@
package sessions
import (
"encoding/json"
"errors"
"fmt"
"openreplay/backend/pkg/db/redis"
"openreplay/backend/pkg/sessions"
"time"
)
type cacheImpl struct {
db *redis.Client
}
func (c *cacheImpl) Set(session *sessions.Session) error {
if c.db == nil {
return ErrDisabledCache
}
if session == nil {
return errors.New("session is nil")
}
if session.SessionID == 0 {
return errors.New("session id is 0")
}
sessionBytes, err := json.Marshal(session)
if err != nil {
return err
}
if _, err = c.db.Redis.Set(fmt.Sprintf("session:id:%d", session.SessionID), sessionBytes, time.Minute*30).Result(); err != nil {
return err
}
return nil
}
func (c *cacheImpl) Get(sessionID uint64) (*sessions.Session, error) {
if c.db == nil {
return nil, ErrDisabledCache
}
if sessionID == 0 {
return nil, errors.New("session id is 0")
}
result, err := c.db.Redis.Get(fmt.Sprintf("session:id:%d", sessionID)).Result()
if err != nil {
return nil, err
}
session := &sessions.Session{}
if err = json.Unmarshal([]byte(result), session); err != nil {
return nil, err
}
return session, nil
}
var ErrDisabledCache = errors.New("cache is disabled")
func NewCache(db *redis.Client) sessions.Cache {
return &cacheImpl{db: db}
}