Heuristics refactoring (#987)

* feat(backend): refactored heuristics service

* feat(backend): refactored db service (moved several events to heuristics)
This commit is contained in:
Alexander 2023-03-09 09:54:12 +01:00 committed by GitHub
parent e3016fc64a
commit c6aac11cbf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
61 changed files with 2342 additions and 2764 deletions

View file

@ -13,7 +13,6 @@ import (
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/metrics"
assetsMetrics "openreplay/backend/pkg/metrics/assets"
"openreplay/backend/pkg/pprof"
"openreplay/backend/pkg/queue"
)
@ -24,9 +23,6 @@ func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := config.New()
if cfg.UseProfiler {
pprof.StartProfilingServer()
}
cacher := cacher.NewCacher(cfg)

View file

@ -1,174 +1,59 @@
package main
import (
"errors"
"log"
"os"
"os/signal"
"syscall"
"time"
"openreplay/backend/internal/config/db"
config "openreplay/backend/internal/config/db"
"openreplay/backend/internal/db"
"openreplay/backend/internal/db/datasaver"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/postgres"
types2 "openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/handlers"
custom2 "openreplay/backend/pkg/handlers/custom"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/metrics"
databaseMetrics "openreplay/backend/pkg/metrics/database"
"openreplay/backend/pkg/pprof"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/sessions"
"openreplay/backend/pkg/terminator"
)
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
m := metrics.New()
m.Register(databaseMetrics.List())
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := db.New()
if cfg.UseProfiler {
pprof.StartProfilingServer()
}
cfg := config.New()
// Init database
pg := cache.NewPGCache(
postgres.NewConn(cfg.Postgres.String(), cfg.BatchQueueLimit, cfg.BatchSizeLimit), cfg.ProjectExpirationTimeoutMs)
defer pg.Close()
// HandlersFabric returns the list of message handlers we want to be applied to each incoming message.
handlersFabric := func() []handlers.MessageProcessor {
return []handlers.MessageProcessor{
&custom2.EventMapper{},
custom2.NewInputEventBuilder(),
custom2.NewPageEventBuilder(),
}
}
// Create handler's aggregator
builderMap := sessions.NewBuilderMap(handlersFabric)
// Init modules
saver := datasaver.New(pg, cfg)
saver.InitStats()
// Init data saver
saver := datasaver.New(cfg, pg)
// Message filter
msgFilter := []int{messages.MsgMetadata, messages.MsgIssueEvent, messages.MsgSessionStart, messages.MsgSessionEnd,
messages.MsgUserID, messages.MsgUserAnonymousID, messages.MsgClickEvent,
messages.MsgIntegrationEvent, messages.MsgPerformanceTrackAggr,
messages.MsgJSException, messages.MsgResourceTiming,
messages.MsgCustomEvent, messages.MsgCustomIssue, messages.MsgFetch, messages.MsgNetworkRequest, messages.MsgGraphQL,
messages.MsgStateAction, messages.MsgSetInputTarget, messages.MsgSetInputValue, messages.MsgCreateDocument,
messages.MsgMouseClick, messages.MsgSetPageLocation, messages.MsgPageLoadTiming, messages.MsgPageRenderTiming}
// Handler logic
msgHandler := func(msg messages.Message) {
// Just save session data into db without additional checks
if err := saver.InsertMessage(msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, msg.SessionID(), msg)
}
return
}
var (
session *types2.Session
err error
)
if msg.TypeID() == messages.MsgSessionEnd {
session, err = pg.GetSession(msg.SessionID())
} else {
session, err = pg.Cache.GetSession(msg.SessionID())
}
if session == nil {
if err != nil && !errors.Is(err, cache.NilSessionInCacheError) {
log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, msg.SessionID(), msg)
}
return
}
// Save statistics to db
err = saver.InsertStats(session, msg)
if err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg)
}
// Handle heuristics and save to temporary queue in memory
builderMap.HandleMessage(msg)
// Process saved heuristics messages as usual messages above in the code
builderMap.IterateSessionReadyMessages(msg.SessionID(), func(msg messages.Message) {
if err := saver.InsertMessage(msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
return
}
if err := saver.InsertStats(session, msg); err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
})
}
messages.MsgUserID, messages.MsgUserAnonymousID, messages.MsgIntegrationEvent, messages.MsgPerformanceTrackAggr,
messages.MsgJSException, messages.MsgResourceTiming, messages.MsgCustomEvent, messages.MsgCustomIssue,
messages.MsgFetch, messages.MsgNetworkRequest, messages.MsgGraphQL, messages.MsgStateAction,
messages.MsgSetInputTarget, messages.MsgSetInputValue, messages.MsgCreateDocument, messages.MsgMouseClick,
messages.MsgSetPageLocation, messages.MsgPageLoadTiming, messages.MsgPageRenderTiming,
messages.MsgInputEvent, messages.MsgPageEvent}
// Init consumer
consumer := queue.NewConsumer(
cfg.GroupDB,
[]string{
cfg.TopicRawWeb, // from tracker
cfg.TopicAnalytics, // from heuristics
cfg.TopicRawWeb,
cfg.TopicAnalytics,
},
messages.NewMessageIterator(msgHandler, msgFilter, true),
messages.NewMessageIterator(saver.Handle, msgFilter, true),
false,
cfg.MessageSizeLimit,
)
// Run service and wait for TERM signal
service := db.New(cfg, consumer, saver)
log.Printf("Db service started\n")
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
commitTick := time.Tick(cfg.CommitBatchTimeout)
// Send collected batches to db
commitDBUpdates := func() {
// Commit collected batches and bulks of information to PG
pg.Commit()
// Commit collected batches of information to CH
if err := saver.CommitStats(); err != nil {
log.Printf("Error on stats commit: %v", err)
}
// Commit current position in queue
if err := consumer.Commit(); err != nil {
log.Printf("Error on consumer commit: %v", err)
}
}
for {
select {
case sig := <-sigchan:
log.Printf("Caught signal %s: terminating\n", sig.String())
commitDBUpdates()
if err := pg.Close(); err != nil {
log.Printf("db.Close error: %s", err)
}
if err := saver.Close(); err != nil {
log.Printf("saver.Close error: %s", err)
}
consumer.Close()
os.Exit(0)
case <-commitTick:
commitDBUpdates()
builderMap.ClearOldSessions()
case msg := <-consumer.Rebalanced():
log.Println(msg)
default:
// Handle new message from queue
if err := consumer.ConsumeNext(); err != nil {
log.Fatalf("Error on consumption: %v", err)
}
}
}
terminator.Wait(service)
}

View file

@ -18,7 +18,6 @@ import (
"openreplay/backend/pkg/metrics"
databaseMetrics "openreplay/backend/pkg/metrics/database"
enderMetrics "openreplay/backend/pkg/metrics/ender"
"openreplay/backend/pkg/pprof"
"openreplay/backend/pkg/queue"
)
@ -30,9 +29,6 @@ func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := ender.New()
if cfg.UseProfiler {
pprof.StartProfilingServer()
}
pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres.String(), 0, 0), cfg.ProjectExpirationTimeoutMs)
defer pg.Close()
@ -72,12 +68,12 @@ func main() {
consumer.Close()
os.Exit(0)
case <-tick:
failedSessionEnds := make(map[uint64]int64)
failedSessionEnds := make(map[uint64]uint64)
duplicatedSessionEnds := make(map[uint64]uint64)
// Find ended sessions and send notification to other services
sessions.HandleEndedSessions(func(sessionID uint64, timestamp int64) bool {
msg := &messages.SessionEnd{Timestamp: uint64(timestamp)}
sessions.HandleEndedSessions(func(sessionID uint64, timestamp uint64) bool {
msg := &messages.SessionEnd{Timestamp: timestamp}
currDuration, err := pg.GetSessionDuration(sessionID)
if err != nil {
log.Printf("getSessionDuration failed, sessID: %d, err: %s", sessionID, err)

View file

@ -2,90 +2,49 @@ package main
import (
"log"
"openreplay/backend/pkg/pprof"
"os"
"os/signal"
"syscall"
"time"
"openreplay/backend/internal/config/heuristics"
config "openreplay/backend/internal/config/heuristics"
"openreplay/backend/internal/heuristics"
"openreplay/backend/pkg/handlers"
web2 "openreplay/backend/pkg/handlers/web"
"openreplay/backend/pkg/intervals"
"openreplay/backend/pkg/handlers/custom"
"openreplay/backend/pkg/handlers/web"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/sessions"
"openreplay/backend/pkg/terminator"
)
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := heuristics.New()
if cfg.UseProfiler {
pprof.StartProfilingServer()
}
cfg := config.New()
// HandlersFabric returns the list of message handlers we want to be applied to each incoming message.
handlersFabric := func() []handlers.MessageProcessor {
return []handlers.MessageProcessor{
// web handlers
&web2.ClickRageDetector{},
&web2.CpuIssueDetector{},
&web2.DeadClickDetector{},
&web2.MemoryIssueDetector{},
&web2.NetworkIssueDetector{},
&web2.PerformanceAggregator{},
// Other handlers (you can add your custom handlers here)
//&custom.CustomHandler{},
custom.NewInputEventBuilder(),
custom.NewPageEventBuilder(),
web.NewDeadClickDetector(),
&web.ClickRageDetector{},
&web.CpuIssueDetector{},
&web.MemoryIssueDetector{},
&web.NetworkIssueDetector{},
&web.PerformanceAggregator{},
}
}
// Create handler's aggregator
builderMap := sessions.NewBuilderMap(handlersFabric)
// Init producer and consumer for data bus
eventBuilder := sessions.NewBuilderMap(handlersFabric)
producer := queue.NewProducer(cfg.MessageSizeLimit, true)
msgHandler := func(msg messages.Message) {
builderMap.HandleMessage(msg)
}
consumer := queue.NewConsumer(
cfg.GroupHeuristics,
[]string{
cfg.TopicRawWeb,
},
messages.NewMessageIterator(msgHandler, nil, true),
messages.NewMessageIterator(eventBuilder.HandleMessage, nil, true),
false,
cfg.MessageSizeLimit,
)
// Run service and wait for TERM signal
service := heuristics.New(cfg, producer, consumer, eventBuilder)
log.Printf("Heuristics service started\n")
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
tick := time.Tick(intervals.EVENTS_COMMIT_INTERVAL * time.Millisecond)
for {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
producer.Close(cfg.ProducerTimeout)
consumer.Commit()
consumer.Close()
os.Exit(0)
case <-tick:
builderMap.IterateReadyMessages(func(sessionID uint64, readyMsg messages.Message) {
producer.Produce(cfg.TopicAnalytics, sessionID, readyMsg.Encode())
})
producer.Flush(cfg.ProducerTimeout)
consumer.Commit()
case msg := <-consumer.Rebalanced():
log.Println(msg)
default:
if err := consumer.ConsumeNext(); err != nil {
log.Fatalf("Error on consuming: %v", err)
}
}
}
terminator.Wait(service)
}

View file

@ -15,7 +15,6 @@ import (
"openreplay/backend/pkg/metrics"
databaseMetrics "openreplay/backend/pkg/metrics/database"
httpMetrics "openreplay/backend/pkg/metrics/http"
"openreplay/backend/pkg/pprof"
"openreplay/backend/pkg/queue"
)
@ -27,9 +26,6 @@ func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := http.New()
if cfg.UseProfiler {
pprof.StartProfilingServer()
}
// Connect to queue
producer := queue.NewProducer(cfg.MessageSizeLimit, true)

View file

@ -13,7 +13,6 @@ import (
"openreplay/backend/pkg/intervals"
"openreplay/backend/pkg/metrics"
databaseMetrics "openreplay/backend/pkg/metrics/database"
"openreplay/backend/pkg/pprof"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/token"
)
@ -25,9 +24,6 @@ func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := config.New()
if cfg.UseProfiler {
pprof.StartProfilingServer()
}
pg := postgres.NewConn(cfg.Postgres.String(), 0, 0)
defer pg.Close()

View file

@ -16,7 +16,6 @@ import (
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/metrics"
sinkMetrics "openreplay/backend/pkg/metrics/sink"
"openreplay/backend/pkg/pprof"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/url/assets"
)
@ -27,9 +26,6 @@ func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := sink.New()
if cfg.UseProfiler {
pprof.StartProfilingServer()
}
if _, err := os.Stat(cfg.FsDir); os.IsNotExist(err) {
log.Fatalf("%v doesn't exist. %v", cfg.FsDir, err)
@ -112,7 +108,7 @@ func main() {
log.Printf("zero ts; sessID: %d, msgType: %d", msg.SessionID(), msg.TypeID())
} else {
// Log ts of last processed message
counter.Update(msg.SessionID(), time.UnixMilli(ts))
counter.Update(msg.SessionID(), time.UnixMilli(int64(ts)))
}
// Try to encode message to avoid null data inserts

View file

@ -13,7 +13,6 @@ import (
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/metrics"
storageMetrics "openreplay/backend/pkg/metrics/storage"
"openreplay/backend/pkg/pprof"
"openreplay/backend/pkg/queue"
cloud "openreplay/backend/pkg/storage"
)
@ -25,9 +24,6 @@ func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := config.New()
if cfg.UseProfiler {
pprof.StartProfilingServer()
}
s3 := cloud.NewS3(cfg.S3Region, cfg.S3Bucket)
srv, err := storage.New(cfg, s3)

View file

@ -3,6 +3,7 @@ package heuristics
import (
"openreplay/backend/internal/config/common"
"openreplay/backend/internal/config/configurator"
"openreplay/backend/pkg/pprof"
)
type Config struct {
@ -19,5 +20,8 @@ type Config struct {
func New() *Config {
cfg := &Config{}
configurator.Process(cfg)
if cfg.UseProfiler {
pprof.StartProfilingServer()
}
return cfg
}

View file

@ -1,74 +0,0 @@
package datasaver
import (
"fmt"
. "openreplay/backend/pkg/messages"
)
func (mi *Saver) InsertMessage(msg Message) error {
sessionID := msg.SessionID()
switch m := msg.(type) {
// Common
case *Metadata:
if err := mi.pg.InsertMetadata(sessionID, m); err != nil {
return fmt.Errorf("insert metadata err: %s", err)
}
return nil
case *IssueEvent:
return mi.pg.InsertIssueEvent(sessionID, m)
//TODO: message adapter (transformer) (at the level of pkg/message) for types: *IOSMetadata, *IOSIssueEvent and others
// Web
case *SessionStart:
return mi.pg.HandleWebSessionStart(sessionID, m)
case *SessionEnd:
return mi.pg.HandleWebSessionEnd(sessionID, m)
case *UserID:
return mi.pg.InsertWebUserID(sessionID, m)
case *UserAnonymousID:
return mi.pg.InsertWebUserAnonymousID(sessionID, m)
case *CustomEvent:
return mi.pg.InsertWebCustomEvent(sessionID, m)
case *ClickEvent:
return mi.pg.InsertWebClickEvent(sessionID, m)
case *InputEvent:
return mi.pg.InsertWebInputEvent(sessionID, m)
// Unique Web messages
case *PageEvent:
return mi.pg.InsertWebPageEvent(sessionID, m)
case *NetworkRequest:
return mi.pg.InsertWebNetworkRequest(sessionID, m)
case *GraphQL:
return mi.pg.InsertWebGraphQL(sessionID, m)
case *JSException:
return mi.pg.InsertWebJSException(m)
case *IntegrationEvent:
return mi.pg.InsertWebIntegrationEvent(m)
// IOS
case *IOSSessionStart:
return mi.pg.InsertIOSSessionStart(sessionID, m)
case *IOSSessionEnd:
return mi.pg.InsertIOSSessionEnd(sessionID, m)
case *IOSUserID:
return mi.pg.InsertIOSUserID(sessionID, m)
case *IOSUserAnonymousID:
return mi.pg.InsertIOSUserAnonymousID(sessionID, m)
case *IOSCustomEvent:
return mi.pg.InsertIOSCustomEvent(sessionID, m)
case *IOSClickEvent:
return mi.pg.InsertIOSClickEvent(sessionID, m)
case *IOSInputEvent:
return mi.pg.InsertIOSInputEvent(sessionID, m)
// Unique IOS messages
case *IOSNetworkCall:
return mi.pg.InsertIOSNetworkCall(sessionID, m)
case *IOSScreenEnter:
return mi.pg.InsertIOSScreenEnter(sessionID, m)
case *IOSCrash:
return mi.pg.InsertIOSCrash(sessionID, m)
}
return nil // "Not implemented"
}

View file

@ -0,0 +1,19 @@
package datasaver
import (
. "openreplay/backend/pkg/messages"
)
func (s *saverImpl) init() {
// noop
}
func (s *saverImpl) handleExtraMessage(msg Message) error {
switch m := msg.(type) {
case *PerformanceTrackAggr:
return s.pg.InsertWebStatsPerformance(m)
case *ResourceTiming:
return s.pg.InsertWebStatsResourceEvent(m)
}
return nil
}

View file

@ -1,16 +1,126 @@
package datasaver
import (
"log"
"openreplay/backend/internal/config/db"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/db/clickhouse"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/db/types"
. "openreplay/backend/pkg/messages"
queue "openreplay/backend/pkg/queue/types"
)
type Saver struct {
pg *cache.PGCache
producer types.Producer
type Saver interface {
Handle(msg Message)
Commit() error
Close() error
}
func New(pg *cache.PGCache, _ *db.Config) *Saver {
return &Saver{pg: pg, producer: nil}
type saverImpl struct {
cfg *db.Config
pg *cache.PGCache
ch clickhouse.Connector
producer queue.Producer
}
func New(cfg *db.Config, pg *cache.PGCache) Saver {
s := &saverImpl{cfg: cfg, pg: pg}
s.init()
return s
}
func (s *saverImpl) Handle(msg Message) {
if msg.TypeID() == MsgCustomEvent {
defer s.Handle(types.WrapCustomEvent(msg.(*CustomEvent)))
}
if err := s.handleMessage(msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, msg.SessionID(), msg)
}
return
}
if err := s.handleExtraMessage(msg); err != nil {
log.Printf("Stats Insertion Error %v; Session: %d, Message: %v", err, msg.SessionID(), msg)
}
return
}
func (s *saverImpl) handleMessage(msg Message) error {
switch m := msg.(type) {
case *Metadata:
return s.pg.InsertMetadata(m)
case *IssueEvent:
return s.pg.InsertIssueEvent(m)
case *SessionStart:
return s.pg.HandleWebSessionStart(m)
case *SessionEnd:
return s.pg.HandleWebSessionEnd(m)
case *UserID:
return s.pg.InsertWebUserID(m)
case *UserAnonymousID:
return s.pg.InsertWebUserAnonymousID(m)
case *CustomEvent:
return s.pg.InsertWebCustomEvent(m)
case *MouseClick:
return s.pg.InsertWebClickEvent(m)
case *InputEvent:
return s.pg.InsertWebInputEvent(m)
case *PageEvent:
return s.pg.InsertWebPageEvent(m)
case *NetworkRequest:
return s.pg.InsertWebNetworkRequest(m)
case *GraphQL:
return s.pg.InsertWebGraphQL(m)
case *JSException:
return s.pg.InsertWebJSException(m)
case *IntegrationEvent:
return s.pg.InsertWebIntegrationEvent(m)
case *IOSSessionStart:
return s.pg.InsertIOSSessionStart(m)
case *IOSSessionEnd:
return s.pg.InsertIOSSessionEnd(m)
case *IOSUserID:
return s.pg.InsertIOSUserID(m)
case *IOSUserAnonymousID:
return s.pg.InsertIOSUserAnonymousID(m)
case *IOSCustomEvent:
return s.pg.InsertIOSCustomEvent(m)
case *IOSClickEvent:
return s.pg.InsertIOSClickEvent(m)
case *IOSInputEvent:
return s.pg.InsertIOSInputEvent(m)
case *IOSNetworkCall:
return s.pg.InsertIOSNetworkCall(m)
case *IOSScreenEnter:
return s.pg.InsertIOSScreenEnter(m)
case *IOSCrash:
return s.pg.InsertIOSCrash(m)
}
return nil
}
func (s *saverImpl) Commit() error {
if s.pg != nil {
s.pg.Commit()
}
if s.ch != nil {
s.ch.Commit()
}
return nil
}
func (s *saverImpl) Close() error {
if s.pg != nil {
if err := s.pg.Close(); err != nil {
log.Printf("pg.Close error: %s", err)
}
}
if s.ch != nil {
if err := s.ch.Stop(); err != nil {
log.Printf("ch.Close error: %s", err)
}
}
return nil
}

View file

@ -1,29 +0,0 @@
package datasaver
import (
. "openreplay/backend/pkg/db/types"
. "openreplay/backend/pkg/messages"
)
func (si *Saver) InitStats() {
// noop
}
func (si *Saver) InsertStats(session *Session, msg Message) error {
switch m := msg.(type) {
// Web
case *PerformanceTrackAggr:
return si.pg.InsertWebStatsPerformance(session.SessionID, m)
case *ResourceEvent:
return si.pg.InsertWebStatsResourceEvent(session.SessionID, m)
}
return nil
}
func (si *Saver) CommitStats() error {
return nil
}
func (si *Saver) Close() error {
return nil
}

View file

@ -0,0 +1,56 @@
package db
import (
"log"
"time"
"openreplay/backend/internal/config/db"
"openreplay/backend/internal/db/datasaver"
"openreplay/backend/internal/service"
"openreplay/backend/pkg/queue/types"
)
type dbImpl struct {
cfg *db.Config
consumer types.Consumer
saver datasaver.Saver
}
func New(cfg *db.Config, consumer types.Consumer, saver datasaver.Saver) service.Interface {
s := &dbImpl{
cfg: cfg,
consumer: consumer,
saver: saver,
}
go s.run()
return s
}
func (d *dbImpl) run() {
commitTick := time.Tick(d.cfg.CommitBatchTimeout)
for {
select {
case <-commitTick:
d.commit()
case msg := <-d.consumer.Rebalanced():
log.Println(msg)
default:
if err := d.consumer.ConsumeNext(); err != nil {
log.Fatalf("Error on consumption: %v", err)
}
}
}
}
func (d *dbImpl) commit() {
d.saver.Commit()
d.consumer.Commit()
}
func (d *dbImpl) Stop() {
d.commit()
if err := d.saver.Close(); err != nil {
log.Printf("saver.Close error: %s", err)
}
d.consumer.Close()
}

View file

@ -0,0 +1,64 @@
package heuristics
import (
"log"
"time"
"openreplay/backend/internal/config/heuristics"
"openreplay/backend/internal/service"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/sessions"
)
type heuristicsImpl struct {
cfg *heuristics.Config
producer types.Producer
consumer types.Consumer
events sessions.EventBuilder
}
func New(cfg *heuristics.Config, p types.Producer, c types.Consumer, e sessions.EventBuilder) service.Interface {
s := &heuristicsImpl{
cfg: cfg,
producer: p,
consumer: c,
events: e,
}
go s.run()
return s
}
func (h *heuristicsImpl) run() {
tick := time.Tick(10 * time.Second)
for {
select {
case evt := <-h.events.Events():
if err := h.producer.Produce(h.cfg.TopicAnalytics, evt.SessionID(), evt.Encode()); err != nil {
log.Printf("can't send new event to queue: %s", err)
}
case <-tick:
h.producer.Flush(h.cfg.ProducerTimeout)
h.consumer.Commit()
case msg := <-h.consumer.Rebalanced():
log.Println(msg)
default:
if err := h.consumer.ConsumeNext(); err != nil {
log.Fatalf("Error on consuming: %v", err)
}
}
}
}
func (h *heuristicsImpl) Stop() {
// Stop event builder and flush all events
log.Println("stopping heuristics service")
h.events.Stop()
for evt := range h.events.Events() {
if err := h.producer.Produce(h.cfg.TopicAnalytics, evt.SessionID(), evt.Encode()); err != nil {
log.Printf("can't send new event to queue: %s", err)
}
}
h.producer.Close(h.cfg.ProducerTimeout)
h.consumer.Commit()
h.consumer.Close()
}

View file

@ -0,0 +1,5 @@
package service
type Interface interface {
Stop()
}

View file

@ -9,13 +9,13 @@ import (
)
// EndedSessionHandler handler for ended sessions
type EndedSessionHandler func(sessionID uint64, timestamp int64) bool
type EndedSessionHandler func(sessionID uint64, timestamp uint64) bool
// session holds information about user's session live status
type session struct {
lastTimestamp int64
lastUpdate int64
lastUserTime int64
lastUserTime uint64
isEnded bool
}

View file

@ -21,7 +21,8 @@ func (c *PGCache) HandleSessionEnd(sessionID uint64) error {
return nil
}
func (c *PGCache) InsertIssueEvent(sessionID uint64, crash *IssueEvent) error {
func (c *PGCache) InsertIssueEvent(crash *IssueEvent) error {
sessionID := crash.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
@ -29,7 +30,8 @@ func (c *PGCache) InsertIssueEvent(sessionID uint64, crash *IssueEvent) error {
return c.Conn.InsertIssueEvent(sessionID, session.ProjectID, crash)
}
func (c *PGCache) InsertMetadata(sessionID uint64, metadata *Metadata) error {
func (c *PGCache) InsertMetadata(metadata *Metadata) error {
sessionID := metadata.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err

View file

@ -6,7 +6,8 @@ import (
. "openreplay/backend/pkg/messages"
)
func (c *PGCache) InsertIOSSessionStart(sessionID uint64, s *IOSSessionStart) error {
func (c *PGCache) InsertIOSSessionStart(s *IOSSessionStart) error {
sessionID := s.SessionID()
if c.Cache.HasSession(sessionID) {
return fmt.Errorf("session %d already in cache", sessionID)
}
@ -33,13 +34,15 @@ func (c *PGCache) InsertIOSSessionStart(sessionID uint64, s *IOSSessionStart) er
return nil
}
func (c *PGCache) InsertIOSSessionEnd(sessionID uint64, e *IOSSessionEnd) error {
func (c *PGCache) InsertIOSSessionEnd(e *IOSSessionEnd) error {
sessionID := e.SessionID()
_, err := c.InsertSessionEnd(sessionID, e.Timestamp)
return err
}
func (c *PGCache) InsertIOSScreenEnter(sessionID uint64, screenEnter *IOSScreenEnter) error {
if err := c.Conn.InsertIOSScreenEnter(sessionID, screenEnter); err != nil {
func (c *PGCache) InsertIOSScreenEnter(screenEnter *IOSScreenEnter) error {
sessionID := screenEnter.SessionID()
if err := c.Conn.InsertIOSScreenEnter(screenEnter); err != nil {
return err
}
session, err := c.Cache.GetSession(sessionID)
@ -50,8 +53,9 @@ func (c *PGCache) InsertIOSScreenEnter(sessionID uint64, screenEnter *IOSScreenE
return nil
}
func (c *PGCache) InsertIOSClickEvent(sessionID uint64, clickEvent *IOSClickEvent) error {
if err := c.Conn.InsertIOSClickEvent(sessionID, clickEvent); err != nil {
func (c *PGCache) InsertIOSClickEvent(clickEvent *IOSClickEvent) error {
sessionID := clickEvent.SessionID()
if err := c.Conn.InsertIOSClickEvent(clickEvent); err != nil {
return err
}
session, err := c.Cache.GetSession(sessionID)
@ -62,8 +66,9 @@ func (c *PGCache) InsertIOSClickEvent(sessionID uint64, clickEvent *IOSClickEven
return nil
}
func (c *PGCache) InsertIOSInputEvent(sessionID uint64, inputEvent *IOSInputEvent) error {
if err := c.Conn.InsertIOSInputEvent(sessionID, inputEvent); err != nil {
func (c *PGCache) InsertIOSInputEvent(inputEvent *IOSInputEvent) error {
sessionID := inputEvent.SessionID()
if err := c.Conn.InsertIOSInputEvent(inputEvent); err != nil {
return err
}
session, err := c.Cache.GetSession(sessionID)
@ -74,18 +79,15 @@ func (c *PGCache) InsertIOSInputEvent(sessionID uint64, inputEvent *IOSInputEven
return nil
}
func (c *PGCache) InsertIOSCrash(sessionID uint64, crash *IOSCrash) error {
func (c *PGCache) InsertIOSCrash(crash *IOSCrash) error {
sessionID := crash.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
}
if err := c.Conn.InsertIOSCrash(sessionID, session.ProjectID, crash); err != nil {
if err := c.Conn.InsertIOSCrash(session.ProjectID, crash); err != nil {
return err
}
session.ErrorsCount += 1
return nil
}
func (c *PGCache) InsertIOSIssueEvent(sessionID uint64, issueEvent *IOSIssueEvent) error {
return nil
}

View file

@ -30,7 +30,8 @@ func (c *PGCache) InsertWebSessionStart(sessionID uint64, s *SessionStart) error
})
}
func (c *PGCache) HandleWebSessionStart(sessionID uint64, s *SessionStart) error {
func (c *PGCache) HandleWebSessionStart(s *SessionStart) error {
sessionID := s.SessionID()
if c.Cache.HasSession(sessionID) {
return fmt.Errorf("session %d already in cache", sessionID)
}
@ -69,7 +70,8 @@ func (c *PGCache) InsertWebSessionEnd(sessionID uint64, e *SessionEnd) error {
return err
}
func (c *PGCache) HandleWebSessionEnd(sessionID uint64, e *SessionEnd) error {
func (c *PGCache) HandleWebSessionEnd(e *SessionEnd) error {
sessionID := e.SessionID()
return c.HandleSessionEnd(sessionID)
}
@ -99,7 +101,8 @@ func (c *PGCache) InsertSessionReferrer(sessionID uint64, referrer string) error
return c.Conn.InsertSessionReferrer(sessionID, referrer)
}
func (c *PGCache) InsertWebNetworkRequest(sessionID uint64, e *NetworkRequest) error {
func (c *PGCache) InsertWebNetworkRequest(e *NetworkRequest) error {
sessionID := e.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
@ -111,7 +114,8 @@ func (c *PGCache) InsertWebNetworkRequest(sessionID uint64, e *NetworkRequest) e
return c.Conn.InsertWebNetworkRequest(sessionID, session.ProjectID, project.SaveRequestPayloads, e)
}
func (c *PGCache) InsertWebGraphQL(sessionID uint64, e *GraphQL) error {
func (c *PGCache) InsertWebGraphQL(e *GraphQL) error {
sessionID := e.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
@ -123,7 +127,8 @@ func (c *PGCache) InsertWebGraphQL(sessionID uint64, e *GraphQL) error {
return c.Conn.InsertWebGraphQL(sessionID, session.ProjectID, project.SaveRequestPayloads, e)
}
func (c *PGCache) InsertWebCustomEvent(sessionID uint64, e *CustomEvent) error {
func (c *PGCache) InsertWebCustomEvent(e *CustomEvent) error {
sessionID := e.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
@ -131,7 +136,8 @@ func (c *PGCache) InsertWebCustomEvent(sessionID uint64, e *CustomEvent) error {
return c.Conn.InsertWebCustomEvent(sessionID, session.ProjectID, e)
}
func (c *PGCache) InsertWebUserID(sessionID uint64, userID *UserID) error {
func (c *PGCache) InsertWebUserID(userID *UserID) error {
sessionID := userID.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
@ -139,7 +145,8 @@ func (c *PGCache) InsertWebUserID(sessionID uint64, userID *UserID) error {
return c.Conn.InsertWebUserID(sessionID, session.ProjectID, userID)
}
func (c *PGCache) InsertWebUserAnonymousID(sessionID uint64, userAnonymousID *UserAnonymousID) error {
func (c *PGCache) InsertWebUserAnonymousID(userAnonymousID *UserAnonymousID) error {
sessionID := userAnonymousID.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
@ -147,7 +154,8 @@ func (c *PGCache) InsertWebUserAnonymousID(sessionID uint64, userAnonymousID *Us
return c.Conn.InsertWebUserAnonymousID(sessionID, session.ProjectID, userAnonymousID)
}
func (c *PGCache) InsertWebPageEvent(sessionID uint64, e *PageEvent) error {
func (c *PGCache) InsertWebPageEvent(e *PageEvent) error {
sessionID := e.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
@ -155,7 +163,8 @@ func (c *PGCache) InsertWebPageEvent(sessionID uint64, e *PageEvent) error {
return c.Conn.InsertWebPageEvent(sessionID, session.ProjectID, e)
}
func (c *PGCache) InsertWebClickEvent(sessionID uint64, e *ClickEvent) error {
func (c *PGCache) InsertWebClickEvent(e *MouseClick) error {
sessionID := e.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
@ -163,7 +172,8 @@ func (c *PGCache) InsertWebClickEvent(sessionID uint64, e *ClickEvent) error {
return c.Conn.InsertWebClickEvent(sessionID, session.ProjectID, e)
}
func (c *PGCache) InsertWebInputEvent(sessionID uint64, e *InputEvent) error {
func (c *PGCache) InsertWebInputEvent(e *InputEvent) error {
sessionID := e.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err

View file

@ -0,0 +1,24 @@
package clickhouse
import (
"openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/messages"
)
type Connector interface {
Prepare() error
Commit() error
Stop() error
InsertWebSession(session *types.Session) error
InsertWebResourceEvent(session *types.Session, msg *messages.ResourceTiming) error
InsertWebPageEvent(session *types.Session, msg *messages.PageEvent) error
InsertWebClickEvent(session *types.Session, msg *messages.MouseClick) error
InsertWebInputEvent(session *types.Session, msg *messages.InputEvent) error
InsertWebErrorEvent(session *types.Session, msg *types.ErrorEvent) error
InsertWebPerformanceTrackAggr(session *types.Session, msg *messages.PerformanceTrackAggr) error
InsertAutocomplete(session *types.Session, msgType, msgValue string) error
InsertRequest(session *types.Session, msg *messages.NetworkRequest, savePayload bool) error
InsertCustom(session *types.Session, msg *messages.CustomEvent) error
InsertGraphQL(session *types.Session, msg *messages.GraphQL) error
InsertIssue(session *types.Session, msg *messages.IssueEvent) error
}

View file

@ -193,9 +193,7 @@ func (conn *BatchSet) worker() {
for {
select {
case t := <-conn.workerTask:
start := time.Now()
conn.sendBatches(t)
log.Printf("pg batches dur: %d", time.Now().Sub(start).Milliseconds())
case <-conn.done:
if len(conn.workerTask) > 0 {
for t := range conn.workerTask {

View file

@ -2,7 +2,6 @@ package postgres
import (
"log"
"time"
)
type bulksTask struct {
@ -243,9 +242,7 @@ func (conn *BulkSet) worker() {
for {
select {
case t := <-conn.workerTask:
start := time.Now()
conn.sendBulks(t)
log.Printf("pg bulks dur: %d", time.Now().Sub(start).Milliseconds())
case <-conn.done:
if len(conn.workerTask) > 0 {
for t := range conn.workerTask {

View file

@ -17,7 +17,7 @@ type Conn struct {
c Pool
batches *BatchSet
bulks *BulkSet
chConn CH
chConn CH // hack for autocomplete inserts, TODO: rewrite
}
func (conn *Conn) SetClickHouse(ch CH) {

View file

@ -6,7 +6,8 @@ import (
"openreplay/backend/pkg/url"
)
func (conn *Conn) InsertIOSCustomEvent(sessionID uint64, e *messages.IOSCustomEvent) error {
func (conn *Conn) InsertIOSCustomEvent(e *messages.IOSCustomEvent) error {
sessionID := e.SessionID()
err := conn.InsertCustomEvent(sessionID, e.Timestamp, truncSqIdx(e.Index), e.Name, e.Payload)
if err == nil {
conn.insertAutocompleteValue(sessionID, 0, "CUSTOM_IOS", e.Name)
@ -14,7 +15,8 @@ func (conn *Conn) InsertIOSCustomEvent(sessionID uint64, e *messages.IOSCustomEv
return err
}
func (conn *Conn) InsertIOSUserID(sessionID uint64, userID *messages.IOSUserID) error {
func (conn *Conn) InsertIOSUserID(userID *messages.IOSUserID) error {
sessionID := userID.SessionID()
err := conn.InsertUserID(sessionID, userID.Value)
if err == nil {
conn.insertAutocompleteValue(sessionID, 0, "USERID_IOS", userID.Value)
@ -22,7 +24,8 @@ func (conn *Conn) InsertIOSUserID(sessionID uint64, userID *messages.IOSUserID)
return err
}
func (conn *Conn) InsertIOSUserAnonymousID(sessionID uint64, userAnonymousID *messages.IOSUserAnonymousID) error {
func (conn *Conn) InsertIOSUserAnonymousID(userAnonymousID *messages.IOSUserAnonymousID) error {
sessionID := userAnonymousID.SessionID()
err := conn.InsertUserAnonymousID(sessionID, userAnonymousID.Value)
if err == nil {
conn.insertAutocompleteValue(sessionID, 0, "USERANONYMOUSID_IOS", userAnonymousID.Value)
@ -30,7 +33,8 @@ func (conn *Conn) InsertIOSUserAnonymousID(sessionID uint64, userAnonymousID *me
return err
}
func (conn *Conn) InsertIOSNetworkCall(sessionID uint64, e *messages.IOSNetworkCall) error {
func (conn *Conn) InsertIOSNetworkCall(e *messages.IOSNetworkCall) error {
sessionID := e.SessionID()
err := conn.InsertRequest(sessionID, e.Timestamp, truncSqIdx(e.Index), e.URL, e.Duration, e.Success)
if err == nil {
conn.insertAutocompleteValue(sessionID, 0, "REQUEST_IOS", url.DiscardURLQuery(e.URL))
@ -38,7 +42,8 @@ func (conn *Conn) InsertIOSNetworkCall(sessionID uint64, e *messages.IOSNetworkC
return err
}
func (conn *Conn) InsertIOSScreenEnter(sessionID uint64, screenEnter *messages.IOSScreenEnter) error {
func (conn *Conn) InsertIOSScreenEnter(screenEnter *messages.IOSScreenEnter) error {
sessionID := screenEnter.SessionID()
tx, err := conn.c.Begin()
if err != nil {
return err
@ -69,7 +74,8 @@ func (conn *Conn) InsertIOSScreenEnter(sessionID uint64, screenEnter *messages.I
return nil
}
func (conn *Conn) InsertIOSClickEvent(sessionID uint64, clickEvent *messages.IOSClickEvent) error {
func (conn *Conn) InsertIOSClickEvent(clickEvent *messages.IOSClickEvent) error {
sessionID := clickEvent.SessionID()
tx, err := conn.c.Begin()
if err != nil {
return err
@ -100,7 +106,8 @@ func (conn *Conn) InsertIOSClickEvent(sessionID uint64, clickEvent *messages.IOS
return nil
}
func (conn *Conn) InsertIOSInputEvent(sessionID uint64, inputEvent *messages.IOSInputEvent) error {
func (conn *Conn) InsertIOSInputEvent(inputEvent *messages.IOSInputEvent) error {
sessionID := inputEvent.SessionID()
tx, err := conn.c.Begin()
if err != nil {
return err
@ -137,7 +144,8 @@ func (conn *Conn) InsertIOSInputEvent(sessionID uint64, inputEvent *messages.IOS
return nil
}
func (conn *Conn) InsertIOSCrash(sessionID uint64, projectID uint32, crash *messages.IOSCrash) error {
func (conn *Conn) InsertIOSCrash(projectID uint32, crash *messages.IOSCrash) error {
sessionID := crash.SessionID()
tx, err := conn.c.Begin()
if err != nil {
return err

View file

@ -5,7 +5,8 @@ import (
"openreplay/backend/pkg/url"
)
func (conn *Conn) InsertWebStatsPerformance(sessionID uint64, p *PerformanceTrackAggr) error {
func (conn *Conn) InsertWebStatsPerformance(p *PerformanceTrackAggr) error {
sessionID := p.SessionID()
timestamp := (p.TimestampEnd + p.TimestampStart) / 2
sqlRequest := `
@ -35,40 +36,37 @@ func (conn *Conn) InsertWebStatsPerformance(sessionID uint64, p *PerformanceTrac
return nil
}
func (conn *Conn) InsertWebStatsResourceEvent(sessionID uint64, e *ResourceEvent) error {
func (conn *Conn) InsertWebStatsResourceEvent(e *ResourceTiming) error {
sessionID := e.SessionID()
host, _, _, err := url.GetURLParts(e.URL)
if err != nil {
return err
}
msgType := url.GetResourceType(e.Initiator, e.URL)
sqlRequest := `
INSERT INTO events.resources (
session_id, timestamp, message_id,
type,
url, url_host, url_hostpath,
success, status,
method,
duration, ttfb, header_size, encoded_body_size, decoded_body_size
) VALUES (
$1, $2, $3,
$4,
LEFT($5, 8000), LEFT($6, 300), LEFT($7, 2000),
$8, $9,
NULLIF($10, '')::events.resource_method,
NULLIF($11, 0), NULLIF($12, 0), NULLIF($13, 0), NULLIF($14, 0), NULLIF($15, 0)
NULLIF($10, 0), NULLIF($11, 0), NULLIF($12, 0), NULLIF($13, 0), NULLIF($14, 0)
)`
urlQuery := url.DiscardURLQuery(e.URL)
urlMethod := url.EnsureMethod(e.Method)
conn.batchQueue(sessionID, sqlRequest,
sessionID, e.Timestamp, truncSqIdx(e.MessageID),
e.Type,
sessionID, e.Timestamp, truncSqIdx(e.MsgID()),
msgType,
e.URL, host, urlQuery,
e.Success, e.Status,
urlMethod,
e.Duration != 0, 0,
e.Duration, e.TTFB, e.HeaderSize, e.EncodedBodySize, e.DecodedBodySize,
)
// Record approximate message size
conn.updateBatchSize(sessionID, len(sqlRequest)+len(e.Type)+len(e.URL)+len(host)+len(urlQuery)+len(urlMethod)+8*9+1)
conn.updateBatchSize(sessionID, len(sqlRequest)+len(msgType)+len(e.URL)+len(host)+len(urlQuery)+8*9+1)
return nil
}

View file

@ -57,10 +57,13 @@ func (conn *Conn) InsertWebPageEvent(sessionID uint64, projectID uint32, e *Page
return nil
}
func (conn *Conn) InsertWebClickEvent(sessionID uint64, projectID uint32, e *ClickEvent) error {
func (conn *Conn) InsertWebClickEvent(sessionID uint64, projectID uint32, e *MouseClick) error {
if e.Label == "" {
return nil
}
var host, path string
host, path, _, _ = url.GetURLParts(e.Url)
if err := conn.bulks.Get("webClickEvents").Append(sessionID, truncSqIdx(e.MessageID), e.Timestamp, e.Label, e.Selector, host+path, path); err != nil {
if err := conn.bulks.Get("webClickEvents").Append(sessionID, truncSqIdx(e.MsgID()), e.Timestamp, e.Label, e.Selector, host+path, path); err != nil {
log.Printf("insert web click err: %s", err)
}
// Accumulate session updates and exec inside batch with another sql commands

View file

@ -120,3 +120,15 @@ func (e *ErrorEvent) ID(projectID uint32) string {
}
return strconv.FormatUint(uint64(projectID), 16) + hex.EncodeToString(hash.Sum(nil))
}
func WrapCustomEvent(m *CustomEvent) *IssueEvent {
msg := &IssueEvent{
Type: "custom",
Timestamp: m.Time(),
MessageID: m.MsgID(),
ContextString: m.Name,
Payload: m.Payload,
}
msg.Meta().SetMeta(m.Meta())
return msg
}

View file

@ -1,82 +0,0 @@
package custom
import (
"net/url"
"strings"
. "openreplay/backend/pkg/messages"
)
func getURLExtention(URL string) string {
u, err := url.Parse(URL)
if err != nil {
return ""
}
i := strings.LastIndex(u.Path, ".")
return u.Path[i+1:]
}
func getResourceType(initiator string, URL string) string {
switch initiator {
case "xmlhttprequest", "fetch":
return "fetch"
case "img":
return "img"
default:
switch getURLExtention(URL) {
case "css":
return "stylesheet"
case "js":
return "script"
case "png", "gif", "jpg", "jpeg", "svg":
return "img"
case "mp4", "mkv", "ogg", "webm", "avi", "mp3":
return "media"
default:
return "other"
}
}
}
type EventMapper struct{}
func (b *EventMapper) Build() Message {
return nil
}
func (b *EventMapper) Handle(message Message, messageID uint64, timestamp uint64) Message {
switch msg := message.(type) {
case *MouseClick:
if msg.Label != "" {
return &ClickEvent{
MessageID: messageID,
Label: msg.Label,
HesitationTime: msg.HesitationTime,
Timestamp: timestamp,
Selector: msg.Selector,
}
}
case *ResourceTiming:
return &ResourceEvent{
MessageID: messageID,
Timestamp: msg.Timestamp,
Duration: msg.Duration,
TTFB: msg.TTFB,
HeaderSize: msg.HeaderSize,
EncodedBodySize: msg.EncodedBodySize,
DecodedBodySize: msg.DecodedBodySize,
URL: msg.URL,
Type: getResourceType(msg.Initiator, msg.URL),
Success: msg.Duration != 0,
}
case *CustomIssue:
return &IssueEvent{
Type: "custom",
Timestamp: timestamp,
MessageID: messageID,
ContextString: msg.Name,
Payload: msg.Payload,
}
}
return nil
}

View file

@ -4,7 +4,7 @@ import (
. "openreplay/backend/pkg/messages"
)
const INPUT_EVENT_TIMEOUT = 1 * 60 * 1000
const InputEventTimeout = 1 * 60 * 1000
type inputLabels map[uint64]string
@ -24,7 +24,7 @@ func (b *inputEventBuilder) clearLabels() {
b.inputLabels = make(inputLabels)
}
func (b *inputEventBuilder) Handle(message Message, messageID uint64, timestamp uint64) Message {
func (b *inputEventBuilder) Handle(message Message, timestamp uint64) Message {
var inputEvent Message = nil
switch msg := message.(type) {
case *SetInputTarget:
@ -41,7 +41,7 @@ func (b *inputEventBuilder) Handle(message Message, messageID uint64, timestamp
}
if b.inputEvent == nil {
b.inputEvent = &InputEvent{
MessageID: messageID,
MessageID: message.MsgID(),
Timestamp: timestamp,
Value: msg.Value,
ValueMasked: msg.Mask > 0,
@ -59,7 +59,7 @@ func (b *inputEventBuilder) Handle(message Message, messageID uint64, timestamp
return b.Build()
}
if b.inputEvent != nil && b.inputEvent.Timestamp+INPUT_EVENT_TIMEOUT < timestamp {
if b.inputEvent != nil && b.inputEvent.Timestamp+InputEventTimeout < timestamp {
return b.Build()
}
return nil

View file

@ -4,7 +4,7 @@ import (
. "openreplay/backend/pkg/messages"
)
const PAGE_EVENT_TIMEOUT = 1 * 60 * 1000
const PageEventTimeout = 1 * 60 * 1000
type pageEventBuilder struct {
pageEvent *PageEvent
@ -16,7 +16,7 @@ func NewPageEventBuilder() *pageEventBuilder {
return ieBuilder
}
func (b *pageEventBuilder) Handle(message Message, messageID uint64, timestamp uint64) Message {
func (b *pageEventBuilder) Handle(message Message, timestamp uint64) Message {
switch msg := message.(type) {
case *SetPageLocation:
if msg.NavigationStart == 0 { // routing without new page loading
@ -24,7 +24,7 @@ func (b *pageEventBuilder) Handle(message Message, messageID uint64, timestamp u
URL: msg.URL,
Referrer: msg.Referrer,
Loaded: false,
MessageID: messageID,
MessageID: message.MsgID(),
Timestamp: timestamp,
}
} else {
@ -33,7 +33,7 @@ func (b *pageEventBuilder) Handle(message Message, messageID uint64, timestamp u
URL: msg.URL,
Referrer: msg.Referrer,
Loaded: true,
MessageID: messageID,
MessageID: message.MsgID(),
Timestamp: timestamp,
}
return pageEvent
@ -81,7 +81,7 @@ func (b *pageEventBuilder) Handle(message Message, messageID uint64, timestamp u
}
if b.pageEvent != nil && b.pageEvent.Timestamp+PAGE_EVENT_TIMEOUT < timestamp {
if b.pageEvent != nil && b.pageEvent.Timestamp+PageEventTimeout < timestamp {
return b.Build()
}
return nil

View file

@ -48,7 +48,7 @@ func (h *ClickRageDetector) Handle(message Message, messageID uint64, timestamp
}
func (h *ClickRageDetector) Build() Message {
if h.countsInARow >= web.MIN_CLICKS_IN_A_ROW {
if h.countsInARow >= web.MinClicksInARow {
event := &IOSIssueEvent{
Type: "click_rage",
ContextString: h.lastLabel,

View file

@ -6,6 +6,6 @@ import . "openreplay/backend/pkg/messages"
// U can create your own message handler and easily connect to heuristics service
type MessageProcessor interface {
Handle(message Message, messageID uint64, timestamp uint64) Message
Handle(message Message, timestamp uint64) Message
Build() Message
}

View file

@ -7,14 +7,8 @@ import (
. "openreplay/backend/pkg/messages"
)
/*
Handler name: ClickRage
Input event: MouseClick
Output event: IssueEvent
*/
const MAX_TIME_DIFF = 300
const MIN_CLICKS_IN_A_ROW = 3
const MaxTimeDiff = 300
const MinClicksInARow = 3
type ClickRageDetector struct {
lastTimestamp uint64
@ -34,46 +28,54 @@ func (crd *ClickRageDetector) reset() {
crd.url = ""
}
func (crd *ClickRageDetector) Build() Message {
defer crd.reset()
if crd.countsInARow >= MIN_CLICKS_IN_A_ROW {
payload, err := json.Marshal(struct{ Count int }{crd.countsInARow})
func (crd *ClickRageDetector) createPayload() string {
p, err := json.Marshal(struct{ Count int }{crd.countsInARow})
if err != nil {
log.Printf("can't marshal ClickRage payload to json: %s", err)
return ""
}
event := &IssueEvent{
return string(p)
}
func (crd *ClickRageDetector) Build() Message {
defer crd.reset()
if crd.countsInARow < MinClicksInARow {
return nil
}
return &IssueEvent{
Type: "click_rage",
ContextString: crd.lastLabel,
Payload: string(payload),
Payload: crd.createPayload(),
Timestamp: crd.firstInARawTimestamp,
MessageID: crd.firstInARawMessageId,
URL: crd.url,
}
return event
}
return nil
}
func (crd *ClickRageDetector) Handle(message Message, messageID uint64, timestamp uint64) Message {
func (crd *ClickRageDetector) Handle(message Message, timestamp uint64) Message {
switch msg := message.(type) {
case *MouseClick:
// Set click url
if crd.url == "" && msg.Url != "" {
crd.url = msg.Url
}
// TODO: check if we it is ok to capture clickRage event without the connected ClickEvent in db.
// Click on different object -> build if we can and reset the builder
if msg.Label == "" {
return crd.Build()
}
if crd.lastLabel == msg.Label && timestamp-crd.lastTimestamp < MAX_TIME_DIFF {
// Update builder with last information
if crd.lastLabel == msg.Label && timestamp-crd.lastTimestamp < MaxTimeDiff {
crd.lastTimestamp = timestamp
crd.countsInARow += 1
return nil
}
// Try to build event
event := crd.Build()
// Use current message as init values for new event
crd.lastTimestamp = timestamp
crd.lastLabel = msg.Label
crd.firstInARawTimestamp = timestamp
crd.firstInARawMessageId = messageID
crd.firstInARawMessageId = message.MsgID()
crd.countsInARow = 1
if crd.url == "" && msg.Url != "" {
crd.url = msg.Url

View file

@ -15,8 +15,8 @@ import (
Output event: IssueEvent
*/
const CPU_THRESHOLD = 70 // % out of 100
const CPU_MIN_DURATION_TRIGGER = 6 * 1000
const CpuThreshold = 70 // % out of 100
const CpuMinDurationTrigger = 6 * 1000
type CpuIssueDetector struct {
startTimestamp uint64
@ -26,66 +26,62 @@ type CpuIssueDetector struct {
contextString string
}
func (f *CpuIssueDetector) Build() Message {
if f.startTimestamp == 0 {
return nil
}
duration := f.lastTimestamp - f.startTimestamp
timestamp := f.startTimestamp
messageID := f.startMessageID
maxRate := f.maxRate
f.startTimestamp = 0
f.startMessageID = 0
f.maxRate = 0
if duration < CPU_MIN_DURATION_TRIGGER {
return nil
}
payload, err := json.Marshal(struct {
func (f *CpuIssueDetector) createPayload() string {
p, err := json.Marshal(struct {
Duration uint64
Rate uint64
}{duration, maxRate})
}{f.duration(), f.maxRate})
if err != nil {
log.Printf("can't marshal CpuIssue payload to json: %s", err)
}
return string(p)
}
func (f *CpuIssueDetector) duration() uint64 {
return f.lastTimestamp - f.startTimestamp
}
func (f *CpuIssueDetector) reset() {
f.startTimestamp = 0
f.startMessageID = 0
f.maxRate = 0
}
func (f *CpuIssueDetector) Build() Message {
defer f.reset()
if f.startTimestamp == 0 || f.duration() < CpuMinDurationTrigger {
return nil
}
return &IssueEvent{
Type: "cpu",
Timestamp: timestamp,
MessageID: messageID,
Timestamp: f.startTimestamp,
MessageID: f.startMessageID,
ContextString: f.contextString,
Payload: string(payload),
Payload: f.createPayload(),
}
}
func (f *CpuIssueDetector) Handle(message Message, messageID uint64, timestamp uint64) Message {
func (f *CpuIssueDetector) Handle(message Message, timestamp uint64) Message {
switch msg := message.(type) {
case *PerformanceTrack:
dt := performance.TimeDiff(timestamp, f.lastTimestamp)
if dt == 0 {
return nil // TODO: handle error
// Ignore if it's a wrong message order
if timestamp < f.lastTimestamp {
return nil
}
f.lastTimestamp = timestamp
if msg.Frames == -1 || msg.Ticks == -1 {
cpuRate := performance.CPURate(msg.Ticks, performance.TimeDiff(timestamp, f.lastTimestamp))
// Build event if cpu issue have gone
if msg.Frames == -1 || msg.Ticks == -1 || cpuRate < CpuThreshold {
return f.Build()
}
cpuRate := performance.CPURate(msg.Ticks, dt)
if cpuRate >= CPU_THRESHOLD {
// Update values
if f.startTimestamp == 0 {
f.startTimestamp = timestamp
f.startMessageID = messageID
f.startMessageID = message.MsgID()
}
if f.maxRate < cpuRate {
f.maxRate = cpuRate
}
} else {
return f.Build()
}
case *SetPageLocation:
f.contextString = msg.URL
}

View file

@ -4,43 +4,39 @@ import (
. "openreplay/backend/pkg/messages"
)
/*
Handler name: DeadClick
Input events: SetInputTarget,
CreateDocument,
MouseClick,
SetNodeAttribute,
RemoveNodeAttribute,
CreateElementNode,
CreateTextNode,
MoveNode,
RemoveNode,
SetCSSData,
CSSInsertRule,
CSSDeleteRule
Output event: IssueEvent
*/
const CLICK_RELATION_TIME = 1234
const ClickRelationTime = 1234
type DeadClickDetector struct {
lastTimestamp uint64
lastMouseClick *MouseClick
lastTimestamp uint64
lastClickTimestamp uint64
lastMessageID uint64
inputIDSet map[uint64]bool
}
func NewDeadClickDetector() *DeadClickDetector {
return &DeadClickDetector{inputIDSet: make(map[uint64]bool)}
}
func (d *DeadClickDetector) addInputID(id uint64) {
d.inputIDSet[id] = true
}
func (d *DeadClickDetector) clearInputIDs() {
d.inputIDSet = make(map[uint64]bool)
}
func (d *DeadClickDetector) reset() {
d.inputIDSet = nil
d.lastMouseClick = nil
d.lastClickTimestamp = 0
d.lastMessageID = 0
d.clearInputIDs()
}
func (d *DeadClickDetector) build(timestamp uint64) Message {
func (d *DeadClickDetector) Build() Message {
// remove reset from external Build call
defer d.reset()
if d.lastMouseClick == nil || d.lastClickTimestamp+CLICK_RELATION_TIME > timestamp { // reaction is instant
if d.lastMouseClick == nil || d.lastClickTimestamp+ClickRelationTime > d.lastTimestamp { // reaction is instant
return nil
}
event := &IssueEvent{
@ -52,42 +48,37 @@ func (d *DeadClickDetector) build(timestamp uint64) Message {
return event
}
func (d *DeadClickDetector) Build() Message {
return d.build(d.lastTimestamp)
}
func (d *DeadClickDetector) Handle(message Message, messageID uint64, timestamp uint64) Message {
func (d *DeadClickDetector) Handle(message Message, timestamp uint64) Message {
d.lastTimestamp = timestamp
switch msg := message.(type) {
case *SetInputTarget:
if d.inputIDSet == nil {
d.inputIDSet = make(map[uint64]bool)
}
d.inputIDSet[msg.ID] = true
d.addInputID(msg.ID)
case *CreateDocument:
d.inputIDSet = nil
d.clearInputIDs()
case *MouseClick:
if msg.Label == "" {
return nil
}
event := d.build(timestamp)
if d.inputIDSet[msg.ID] { // ignore if input
isInputEvent := d.inputIDSet[msg.ID]
event := d.Build()
if isInputEvent {
return event
}
d.lastMouseClick = msg
d.lastClickTimestamp = timestamp
d.lastMessageID = messageID
d.lastMessageID = message.MsgID()
return event
case *SetNodeAttribute,
*RemoveNodeAttribute,
*CreateElementNode,
*CreateTextNode,
*SetNodeFocus,
*MoveNode,
*RemoveNode,
*SetCSSData,
*CSSInsertRule,
*CSSDeleteRule:
return d.build(timestamp)
return d.Build()
}
return nil
}

View file

@ -1,55 +0,0 @@
package web
import (
. "openreplay/backend/pkg/messages"
)
/*
Handler name: DomDrop
Input events: CreateElementNode,
CreateTextNode,
RemoveNode
Output event: DOMDrop
*/
const DROP_WINDOW = 200 //ms
const CRITICAL_COUNT = 1 // Our login page contains 20. But on crush it removes only roots (1-3 nodes).
// TODO: smart detection (making whole DOM tree would eat all memory)
type domDropDetector struct {
removedCount int
lastDropTimestamp uint64
}
func (dd *domDropDetector) reset() {
dd.removedCount = 0
dd.lastDropTimestamp = 0
}
func (dd *domDropDetector) Handle(message Message, _ uint64, timestamp uint64) Message {
switch message.(type) {
case *CreateElementNode,
*CreateTextNode:
dd.removedCount = 0
dd.lastDropTimestamp = 0
case *RemoveNode:
if dd.lastDropTimestamp+DROP_WINDOW > timestamp {
dd.removedCount += 1
} else {
dd.removedCount = 1
}
dd.lastDropTimestamp = timestamp
}
return nil
}
func (dd *domDropDetector) Build() Message {
defer dd.reset()
if dd.removedCount >= CRITICAL_COUNT {
domDrop := &DOMDrop{
Timestamp: dd.lastDropTimestamp,
}
return domDrop
}
return nil
}

View file

@ -8,13 +8,6 @@ import (
. "openreplay/backend/pkg/messages"
)
/*
Handler name: MemoryIssue
Input events: PerformanceTrack,
SetPageLocation
Output event: IssueEvent
*/
const MIN_COUNT = 3
const MEM_RATE_THRESHOLD = 300 // % to average
@ -52,7 +45,7 @@ func (f *MemoryIssueDetector) Build() Message {
return event
}
func (f *MemoryIssueDetector) Handle(message Message, messageID uint64, timestamp uint64) Message {
func (f *MemoryIssueDetector) Handle(message Message, timestamp uint64) Message {
switch msg := message.(type) {
case *PerformanceTrack:
if f.count < MIN_COUNT {
@ -70,7 +63,7 @@ func (f *MemoryIssueDetector) Handle(message Message, messageID uint64, timestam
if rate >= MEM_RATE_THRESHOLD {
if f.startTimestamp == 0 {
f.startTimestamp = timestamp
f.startMessageID = messageID
f.startMessageID = message.MsgID()
}
if f.rate < rate {
f.rate = rate

View file

@ -4,26 +4,19 @@ import (
. "openreplay/backend/pkg/messages"
)
/*
Handler name: NetworkIssue
Input events: ResourceTiming,
NetworkRequest
Output event: IssueEvent
*/
type NetworkIssueDetector struct{}
func (f *NetworkIssueDetector) Build() Message {
return nil
}
func (f *NetworkIssueDetector) Handle(message Message, messageID uint64, timestamp uint64) Message {
func (f *NetworkIssueDetector) Handle(message Message, timestamp uint64) Message {
switch msg := message.(type) {
case *NetworkRequest:
if msg.Status >= 400 {
return &IssueEvent{
Type: "bad_request",
MessageID: messageID,
MessageID: message.MsgID(),
Timestamp: msg.Timestamp,
ContextString: msg.URL,
}

View file

@ -7,13 +7,7 @@ import (
"openreplay/backend/pkg/messages/performance"
)
/*
Handler name: PerformanceAggregator
Input event: PerformanceTrack
Output event: PerformanceTrackAggr
*/
const AGGREGATION_WINDOW = 2 * 60 * 1000
const AggregationWindow = 2 * 60 * 1000
type PerformanceAggregator struct {
*PerformanceTrackAggr
@ -42,7 +36,7 @@ func (b *PerformanceAggregator) reset() {
b.lastTimestamp = 0
}
func (b *PerformanceAggregator) Handle(message Message, _ uint64, timestamp uint64) Message {
func (b *PerformanceAggregator) Handle(message Message, timestamp uint64) Message {
switch msg := message.(type) {
case *PerformanceTrack:
if b.PerformanceTrackAggr == nil || msg.Frames == -1 || msg.Ticks == -1 {
@ -93,7 +87,7 @@ func (b *PerformanceAggregator) Handle(message Message, _ uint64, timestamp uint
b.lastTimestamp = timestamp
}
if b.PerformanceTrackAggr != nil &&
timestamp-b.PerformanceTrackAggr.TimestampStart >= AGGREGATION_WINDOW {
timestamp-b.PerformanceTrackAggr.TimestampStart >= AggregationWindow {
return b.Build()
}
return nil

View file

@ -2,7 +2,7 @@
package messages
func IsReplayerType(id int) bool {
return 1 != id && 3 != id && 17 != id && 23 != id && 24 != id && 25 != id && 26 != id && 27 != id && 28 != id && 29 != id && 30 != id && 31 != id && 32 != id && 33 != id && 35 != id && 42 != id && 52 != id && 56 != id && 62 != id && 63 != id && 64 != id && 66 != id && 78 != id && 80 != id && 81 != id && 82 != id && 125 != id && 126 != id && 127 != id && 107 != id && 91 != id && 92 != id && 94 != id && 95 != id && 97 != id && 98 != id && 99 != id && 101 != id && 104 != id && 110 != id && 111 != id
return 1 != id && 3 != id && 17 != id && 23 != id && 24 != id && 25 != id && 26 != id && 27 != id && 28 != id && 29 != id && 30 != id && 31 != id && 32 != id && 42 != id && 56 != id && 62 != id && 63 != id && 64 != id && 66 != id && 78 != id && 80 != id && 81 != id && 82 != id && 125 != id && 126 != id && 127 != id && 107 != id && 91 != id && 92 != id && 94 != id && 95 != id && 97 != id && 98 != id && 99 != id && 101 != id && 104 != id && 110 != id && 111 != id
}
func IsIOSType(id int) bool {

View file

@ -126,7 +126,7 @@ func (i *enderMessageIteratorImpl) preprocessing(msg Message) error {
return fmt.Errorf("incorrect batch version: %d, skip current batch, info: %s", i.version, i.batchInfo.Info())
}
i.messageInfo.Index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha)
i.messageInfo.Timestamp = m.Timestamp
i.messageInfo.Timestamp = uint64(m.Timestamp)
if m.Timestamp == 0 {
i.zeroTsLog("BatchMetadata")
}
@ -139,7 +139,7 @@ func (i *enderMessageIteratorImpl) preprocessing(msg Message) error {
return fmt.Errorf("batchMeta found at the end of the batch, info: %s", i.batchInfo.Info())
}
i.messageInfo.Index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha)
i.messageInfo.Timestamp = m.Timestamp
i.messageInfo.Timestamp = uint64(m.Timestamp)
if m.Timestamp == 0 {
i.zeroTsLog("BatchMeta")
}
@ -149,13 +149,13 @@ func (i *enderMessageIteratorImpl) preprocessing(msg Message) error {
}
case *Timestamp:
i.messageInfo.Timestamp = int64(m.Timestamp)
i.messageInfo.Timestamp = m.Timestamp
if m.Timestamp == 0 {
i.zeroTsLog("Timestamp")
}
case *SessionStart:
i.messageInfo.Timestamp = int64(m.Timestamp)
i.messageInfo.Timestamp = m.Timestamp
if m.Timestamp == 0 {
i.zeroTsLog("SessionStart")
log.Printf("zero session start, project: %d, UA: %s, tracker: %s, info: %s",
@ -163,7 +163,7 @@ func (i *enderMessageIteratorImpl) preprocessing(msg Message) error {
}
case *SessionEnd:
i.messageInfo.Timestamp = int64(m.Timestamp)
i.messageInfo.Timestamp = m.Timestamp
if m.Timestamp == 0 {
i.zeroTsLog("SessionEnd")
}

View file

@ -128,7 +128,7 @@ func (i *sinkMessageIteratorImpl) preprocessing(msg Message) error {
return fmt.Errorf("incorrect batch version: %d, skip current batch, info: %s", i.version, i.batchInfo.Info())
}
i.messageInfo.Index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha)
i.messageInfo.Timestamp = m.Timestamp
i.messageInfo.Timestamp = uint64(m.Timestamp)
if m.Timestamp == 0 {
i.zeroTsLog("BatchMetadata")
}
@ -141,7 +141,7 @@ func (i *sinkMessageIteratorImpl) preprocessing(msg Message) error {
return fmt.Errorf("batchMeta found at the end of the batch, info: %s", i.batchInfo.Info())
}
i.messageInfo.Index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha)
i.messageInfo.Timestamp = m.Timestamp
i.messageInfo.Timestamp = uint64(m.Timestamp)
if m.Timestamp == 0 {
i.zeroTsLog("BatchMeta")
}
@ -151,13 +151,13 @@ func (i *sinkMessageIteratorImpl) preprocessing(msg Message) error {
}
case *Timestamp:
i.messageInfo.Timestamp = int64(m.Timestamp)
i.messageInfo.Timestamp = m.Timestamp
if m.Timestamp == 0 {
i.zeroTsLog("Timestamp")
}
case *SessionStart:
i.messageInfo.Timestamp = int64(m.Timestamp)
i.messageInfo.Timestamp = m.Timestamp
if m.Timestamp == 0 {
i.zeroTsLog("SessionStart")
log.Printf("zero session start, project: %d, UA: %s, tracker: %s, info: %s",
@ -165,7 +165,7 @@ func (i *sinkMessageIteratorImpl) preprocessing(msg Message) error {
}
case *SessionEnd:
i.messageInfo.Timestamp = int64(m.Timestamp)
i.messageInfo.Timestamp = m.Timestamp
if m.Timestamp == 0 {
i.zeroTsLog("SessionEnd")
}

View file

@ -108,11 +108,20 @@ func (i *messageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) {
// Set meta information for message
msg.Meta().SetMeta(i.messageInfo)
// Update timestamp value for iOS message types
if IsIOSType(msgType) {
msg.Meta().Timestamp = i.getIOSTimestamp(msg)
}
// Process message
i.handler(msg)
}
}
func (i *messageIteratorImpl) getIOSTimestamp(msg Message) uint64 {
return GetTimestamp(msg)
}
func (i *messageIteratorImpl) zeroTsLog(msgType string) {
log.Printf("zero timestamp in %s, info: %s", msgType, i.batchInfo.Info())
}
@ -127,7 +136,7 @@ func (i *messageIteratorImpl) preprocessing(msg Message) error {
return fmt.Errorf("incorrect batch version: %d, skip current batch, info: %s", i.version, i.batchInfo.Info())
}
i.messageInfo.Index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha)
i.messageInfo.Timestamp = m.Timestamp
i.messageInfo.Timestamp = uint64(m.Timestamp)
if m.Timestamp == 0 {
i.zeroTsLog("BatchMetadata")
}
@ -140,7 +149,7 @@ func (i *messageIteratorImpl) preprocessing(msg Message) error {
return fmt.Errorf("batchMeta found at the end of the batch, info: %s", i.batchInfo.Info())
}
i.messageInfo.Index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha)
i.messageInfo.Timestamp = m.Timestamp
i.messageInfo.Timestamp = uint64(m.Timestamp)
if m.Timestamp == 0 {
i.zeroTsLog("BatchMeta")
}
@ -150,13 +159,13 @@ func (i *messageIteratorImpl) preprocessing(msg Message) error {
}
case *Timestamp:
i.messageInfo.Timestamp = int64(m.Timestamp)
i.messageInfo.Timestamp = m.Timestamp
if m.Timestamp == 0 {
i.zeroTsLog("Timestamp")
}
case *SessionStart:
i.messageInfo.Timestamp = int64(m.Timestamp)
i.messageInfo.Timestamp = m.Timestamp
if m.Timestamp == 0 {
i.zeroTsLog("SessionStart")
log.Printf("zero session start, project: %d, UA: %s, tracker: %s, info: %s",
@ -164,7 +173,7 @@ func (i *messageIteratorImpl) preprocessing(msg Message) error {
}
case *SessionEnd:
i.messageInfo.Timestamp = int64(m.Timestamp)
i.messageInfo.Timestamp = m.Timestamp
if m.Timestamp == 0 {
i.zeroTsLog("SessionEnd")
}

View file

@ -8,6 +8,8 @@ type Message interface {
TypeID() int
Meta() *message
SessionID() uint64
MsgID() uint64
Time() uint64
}
// BatchInfo represents common information for all messages inside data batch
@ -47,7 +49,7 @@ func (b *BatchInfo) Info() string {
}
type message struct {
Timestamp int64
Timestamp uint64
Index uint64
Url string
batch *BatchInfo
@ -72,6 +74,14 @@ func (m *message) SessionID() uint64 {
return m.batch.sessionID
}
func (m *message) MsgID() uint64 {
return m.Meta().Index
}
func (m *message) Time() uint64 {
return m.Meta().Timestamp
}
func (m *message) SetSessionID(sessID uint64) {
if m.batch == nil {
m.batch = &BatchInfo{}

View file

@ -34,8 +34,6 @@ const (
MsgMetadata = 30
MsgPageEvent = 31
MsgInputEvent = 32
MsgClickEvent = 33
MsgResourceEvent = 35
MsgCSSInsertRule = 37
MsgCSSDeleteRule = 38
MsgFetch = 39
@ -50,7 +48,6 @@ const (
MsgPerformanceTrack = 49
MsgStringDict = 50
MsgSetNodeAttributeDict = 51
MsgDOMDrop = 52
MsgResourceTiming = 53
MsgConnectionInformation = 54
MsgSetPageVisibility = 55
@ -103,6 +100,7 @@ const (
MsgIOSIssueEvent = 111
)
type Timestamp struct {
message
Timestamp uint64
@ -269,6 +267,7 @@ func (msg *SetViewportScroll) TypeID() int {
type CreateDocument struct {
message
}
func (msg *CreateDocument) Encode() []byte {
@ -944,78 +943,6 @@ func (msg *InputEvent) TypeID() int {
return 32
}
type ClickEvent struct {
message
MessageID uint64
Timestamp uint64
HesitationTime uint64
Label string
Selector string
}
func (msg *ClickEvent) Encode() []byte {
buf := make([]byte, 51+len(msg.Label)+len(msg.Selector))
buf[0] = 33
p := 1
p = WriteUint(msg.MessageID, buf, p)
p = WriteUint(msg.Timestamp, buf, p)
p = WriteUint(msg.HesitationTime, buf, p)
p = WriteString(msg.Label, buf, p)
p = WriteString(msg.Selector, buf, p)
return buf[:p]
}
func (msg *ClickEvent) Decode() Message {
return msg
}
func (msg *ClickEvent) TypeID() int {
return 33
}
type ResourceEvent struct {
message
MessageID uint64
Timestamp uint64
Duration uint64
TTFB uint64
HeaderSize uint64
EncodedBodySize uint64
DecodedBodySize uint64
URL string
Type string
Success bool
Method string
Status uint64
}
func (msg *ResourceEvent) Encode() []byte {
buf := make([]byte, 121+len(msg.URL)+len(msg.Type)+len(msg.Method))
buf[0] = 35
p := 1
p = WriteUint(msg.MessageID, buf, p)
p = WriteUint(msg.Timestamp, buf, p)
p = WriteUint(msg.Duration, buf, p)
p = WriteUint(msg.TTFB, buf, p)
p = WriteUint(msg.HeaderSize, buf, p)
p = WriteUint(msg.EncodedBodySize, buf, p)
p = WriteUint(msg.DecodedBodySize, buf, p)
p = WriteString(msg.URL, buf, p)
p = WriteString(msg.Type, buf, p)
p = WriteBoolean(msg.Success, buf, p)
p = WriteString(msg.Method, buf, p)
p = WriteUint(msg.Status, buf, p)
return buf[:p]
}
func (msg *ResourceEvent) Decode() Message {
return msg
}
func (msg *ResourceEvent) TypeID() int {
return 35
}
type CSSInsertRule struct {
message
ID uint64
@ -1366,27 +1293,6 @@ func (msg *SetNodeAttributeDict) TypeID() int {
return 51
}
type DOMDrop struct {
message
Timestamp uint64
}
func (msg *DOMDrop) Encode() []byte {
buf := make([]byte, 11)
buf[0] = 52
p := 1
p = WriteUint(msg.Timestamp, buf, p)
return buf[:p]
}
func (msg *DOMDrop) Decode() Message {
return msg
}
func (msg *DOMDrop) TypeID() int {
return 52
}
type ResourceTiming struct {
message
Timestamp uint64
@ -2748,3 +2654,4 @@ func (msg *IOSIssueEvent) Decode() Message {
func (msg *IOSIssueEvent) TypeID() int {
return 111
}

View file

@ -42,3 +42,17 @@ func (m *RawMessage) SessionID() uint64 {
}
return 0
}
func (m *RawMessage) MsgID() uint64 {
if m.meta != nil {
return m.meta.Index
}
return 0
}
func (m *RawMessage) Time() uint64 {
if m.meta != nil {
return m.meta.Timestamp
}
return 0
}

View file

@ -546,69 +546,6 @@ func DecodeInputEvent(reader BytesReader) (Message, error) {
return msg, err
}
func DecodeClickEvent(reader BytesReader) (Message, error) {
var err error = nil
msg := &ClickEvent{}
if msg.MessageID, err = reader.ReadUint(); err != nil {
return nil, err
}
if msg.Timestamp, err = reader.ReadUint(); err != nil {
return nil, err
}
if msg.HesitationTime, err = reader.ReadUint(); err != nil {
return nil, err
}
if msg.Label, err = reader.ReadString(); err != nil {
return nil, err
}
if msg.Selector, err = reader.ReadString(); err != nil {
return nil, err
}
return msg, err
}
func DecodeResourceEvent(reader BytesReader) (Message, error) {
var err error = nil
msg := &ResourceEvent{}
if msg.MessageID, err = reader.ReadUint(); err != nil {
return nil, err
}
if msg.Timestamp, err = reader.ReadUint(); err != nil {
return nil, err
}
if msg.Duration, err = reader.ReadUint(); err != nil {
return nil, err
}
if msg.TTFB, err = reader.ReadUint(); err != nil {
return nil, err
}
if msg.HeaderSize, err = reader.ReadUint(); err != nil {
return nil, err
}
if msg.EncodedBodySize, err = reader.ReadUint(); err != nil {
return nil, err
}
if msg.DecodedBodySize, err = reader.ReadUint(); err != nil {
return nil, err
}
if msg.URL, err = reader.ReadString(); err != nil {
return nil, err
}
if msg.Type, err = reader.ReadString(); err != nil {
return nil, err
}
if msg.Success, err = reader.ReadBoolean(); err != nil {
return nil, err
}
if msg.Method, err = reader.ReadString(); err != nil {
return nil, err
}
if msg.Status, err = reader.ReadUint(); err != nil {
return nil, err
}
return msg, err
}
func DecodeCSSInsertRule(reader BytesReader) (Message, error) {
var err error = nil
msg := &CSSInsertRule{}
@ -819,15 +756,6 @@ func DecodeSetNodeAttributeDict(reader BytesReader) (Message, error) {
return msg, err
}
func DecodeDOMDrop(reader BytesReader) (Message, error) {
var err error = nil
msg := &DOMDrop{}
if msg.Timestamp, err = reader.ReadUint(); err != nil {
return nil, err
}
return msg, err
}
func DecodeResourceTiming(reader BytesReader) (Message, error) {
var err error = nil
msg := &ResourceTiming{}
@ -1812,10 +1740,6 @@ func ReadMessage(t uint64, reader BytesReader) (Message, error) {
return DecodePageEvent(reader)
case 32:
return DecodeInputEvent(reader)
case 33:
return DecodeClickEvent(reader)
case 35:
return DecodeResourceEvent(reader)
case 37:
return DecodeCSSInsertRule(reader)
case 38:
@ -1844,8 +1768,6 @@ func ReadMessage(t uint64, reader BytesReader) (Message, error) {
return DecodeStringDict(reader)
case 51:
return DecodeSetNodeAttributeDict(reader)
case 52:
return DecodeDOMDrop(reader)
case 53:
return DecodeResourceTiming(reader)
case 54:

View file

@ -10,7 +10,7 @@ import (
type builder struct {
sessionID uint64
readyMsgs []Message
readyMsgs chan Message
timestamp uint64
lastMessageID uint64
lastSystemTime time.Time
@ -18,20 +18,14 @@ type builder struct {
ended bool
}
func NewBuilder(sessionID uint64, handlers ...handlers.MessageProcessor) *builder {
func NewBuilder(sessionID uint64, events chan Message, handlers ...handlers.MessageProcessor) *builder {
return &builder{
sessionID: sessionID,
processors: handlers,
readyMsgs: events,
}
}
func (b *builder) iterateReadyMessages(iter func(msg Message)) {
for _, readyMsg := range b.readyMsgs {
iter(readyMsg)
}
b.readyMsgs = nil
}
func (b *builder) checkSessionEnd(message Message) {
if _, isEnd := message.(*IOSSessionEnd); isEnd {
b.ended = true
@ -41,34 +35,31 @@ func (b *builder) checkSessionEnd(message Message) {
}
}
func (b *builder) handleMessage(message Message, messageID uint64) {
if messageID < b.lastMessageID {
func (b *builder) handleMessage(m Message) {
if m.MsgID() < b.lastMessageID {
// May happen in case of duplicated messages in kafka (if `idempotence: false`)
log.Printf("skip message with wrong msgID, sessID: %d, msgID: %d, lastID: %d", b.sessionID, messageID, b.lastMessageID)
log.Printf("skip message with wrong msgID, sessID: %d, msgID: %d, lastID: %d", b.sessionID, m.MsgID(), b.lastMessageID)
return
}
timestamp := GetTimestamp(message)
if timestamp == 0 {
switch message.(type) {
if m.Time() <= 0 {
switch m.(type) {
case *IssueEvent, *PerformanceTrackAggr:
break
default:
log.Printf("skip message with empty timestamp, sessID: %d, msgID: %d, msgType: %d", b.sessionID, messageID, message.TypeID())
log.Printf("skip message with incorrect timestamp, sessID: %d, msgID: %d, msgType: %d", b.sessionID, m.MsgID(), m.TypeID())
}
return
}
if timestamp < b.timestamp {
//log.Printf("skip message with wrong timestamp, sessID: %d, msgID: %d, type: %d, msgTS: %d, lastTS: %d", b.sessionID, messageID, message.TypeID(), timestamp, b.timestamp)
} else {
b.timestamp = timestamp
if m.Time() > b.timestamp {
b.timestamp = m.Time()
}
b.lastSystemTime = time.Now()
// Process current message
for _, p := range b.processors {
if rm := p.Handle(message, messageID, b.timestamp); rm != nil {
rm.Meta().SetMeta(message.Meta())
b.readyMsgs = append(b.readyMsgs, rm)
if rm := p.Handle(m, b.timestamp); rm != nil {
rm.Meta().SetMeta(m.Meta())
b.readyMsgs <- rm
}
}
b.checkSessionEnd(message)
b.checkSessionEnd(m)
}

View file

@ -2,92 +2,98 @@ package sessions
import (
"log"
"openreplay/backend/pkg/handlers"
"sync"
"time"
"openreplay/backend/pkg/handlers"
. "openreplay/backend/pkg/messages"
)
const FORCE_DELETE_TIMEOUT = 4 * time.Hour
const ForceDeleteTimeout = 30 * time.Minute
type builderMap struct {
handlersFabric func() []handlers.MessageProcessor
sessions map[uint64]*builder
mutex *sync.Mutex
events chan Message
done chan struct{}
}
func NewBuilderMap(handlersFabric func() []handlers.MessageProcessor) *builderMap {
return &builderMap{
type EventBuilder interface {
Events() chan Message
HandleMessage(msg Message)
Stop()
}
func NewBuilderMap(handlersFabric func() []handlers.MessageProcessor) EventBuilder {
b := &builderMap{
handlersFabric: handlersFabric,
sessions: make(map[uint64]*builder),
mutex: &sync.Mutex{},
events: make(chan Message, 1024*10),
done: make(chan struct{}),
}
}
func (m *builderMap) GetBuilder(sessionID uint64) *builder {
b := m.sessions[sessionID]
if b == nil {
b = NewBuilder(sessionID, m.handlersFabric()...) // Should create new instances
m.sessions[sessionID] = b
}
go b.worker()
return b
}
func (m *builderMap) HandleMessage(msg Message) {
sessionID := msg.SessionID()
messageID := msg.Meta().Index
b := m.GetBuilder(sessionID)
b.handleMessage(msg, messageID)
func (m *builderMap) getBuilder(sessionID uint64) *builder {
m.mutex.Lock()
b := m.sessions[sessionID]
if b == nil {
b = NewBuilder(sessionID, m.events, m.handlersFabric()...)
m.sessions[sessionID] = b
}
m.mutex.Unlock()
return b
}
func (m *builderMap) ClearOldSessions() {
func (m *builderMap) Events() chan Message {
return m.events
}
func (m *builderMap) HandleMessage(msg Message) {
m.getBuilder(msg.SessionID()).handleMessage(msg)
}
func (m *builderMap) worker() {
tick := time.Tick(10 * time.Second)
for {
select {
case <-tick:
m.checkSessions()
case <-m.done:
return
}
}
}
func (m *builderMap) checkSessions() {
m.mutex.Lock()
deleted := 0
now := time.Now()
for id, sess := range m.sessions {
if sess.lastSystemTime.Add(FORCE_DELETE_TIMEOUT).Before(now) {
// Should delete zombie session
delete(m.sessions, id)
for sessID, b := range m.sessions {
// Check session's events
if b.ended || b.lastSystemTime.Add(ForceDeleteTimeout).Before(now) {
// Build rest of messages
for _, p := range b.processors {
if rm := p.Build(); rm != nil {
rm.Meta().SetSessionID(sessID)
m.events <- rm
}
}
delete(m.sessions, sessID)
deleted++
}
}
m.mutex.Unlock()
if deleted > 0 {
log.Printf("deleted %d sessions from message builder", deleted)
}
}
func (m *builderMap) iterateSessionReadyMessages(sessionID uint64, b *builder, iter func(msg Message)) {
if b.ended || b.lastSystemTime.Add(FORCE_DELETE_TIMEOUT).Before(time.Now()) {
for _, p := range b.processors {
if rm := p.Build(); rm != nil {
rm.Meta().SetSessionID(sessionID)
b.readyMsgs = append(b.readyMsgs, rm)
}
}
}
b.iterateReadyMessages(iter)
if b.ended {
delete(m.sessions, sessionID)
}
}
func (m *builderMap) IterateReadyMessages(iter func(sessionID uint64, msg Message)) {
for sessionID, session := range m.sessions {
m.iterateSessionReadyMessages(
sessionID,
session,
func(msg Message) {
iter(sessionID, msg)
},
)
}
}
func (m *builderMap) IterateSessionReadyMessages(sessionID uint64, iter func(msg Message)) {
session, ok := m.sessions[sessionID]
if !ok {
return
}
m.iterateSessionReadyMessages(
sessionID,
session,
iter,
)
func (m *builderMap) Stop() {
m.done <- struct{}{}
m.checkSessions()
close(m.events)
}

View file

@ -0,0 +1,22 @@
package terminator
import (
"log"
"os"
"os/signal"
"syscall"
)
// ServiceStopper is a common interface for all services
type ServiceStopper interface {
Stop()
}
func Wait(s ServiceStopper) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigChan
log.Printf("Caught signal %v: terminating\n", sig)
s.Stop()
os.Exit(0)
}

View file

@ -1,7 +1,7 @@
package url
import (
_url "net/url"
"net/url"
"strings"
)
@ -11,7 +11,7 @@ func DiscardURLQuery(url string) string {
func GetURLParts(rawURL string) (string, string, string, error) {
rawURL = strings.Replace(rawURL, "\t", "", -1) // Other chars?
u, err := _url.Parse(rawURL)
u, err := url.Parse(rawURL)
if err != nil {
return "", "", "", err
}
@ -22,3 +22,34 @@ func GetURLParts(rawURL string) (string, string, string, error) {
}
return u.Host, path, u.RawQuery, nil
}
func getURLExtension(URL string) string {
u, err := url.Parse(URL)
if err != nil {
return ""
}
i := strings.LastIndex(u.Path, ".")
return u.Path[i+1:]
}
func GetResourceType(initiator string, URL string) string {
switch initiator {
case "xmlhttprequest", "fetch":
return "fetch"
case "img":
return "img"
default:
switch getURLExtension(URL) {
case "css":
return "stylesheet"
case "js":
return "script"
case "png", "gif", "jpg", "jpeg", "svg":
return "img"
case "mp4", "mkv", "ogg", "webm", "avi", "mp3":
return "media"
default:
return "other"
}
}
}

View file

@ -18,6 +18,20 @@ type NetworkRequestFTS struct {
Duration uint64 `json:"duration"`
}
func WrapNetworkRequest(m *messages.NetworkRequest, projID uint32) *NetworkRequestFTS {
return &NetworkRequestFTS{
SessionID: m.SessionID(),
ProjectID: projID,
Method: m.Method,
URL: m.URL,
Request: m.Request,
Response: m.Response,
Status: m.Status,
Timestamp: m.Timestamp,
Duration: m.Duration,
}
}
type PageEventFTS struct {
SessionID uint64 `json:"session_id"`
ProjectID uint32 `json:"project_id"`
@ -40,43 +54,9 @@ type PageEventFTS struct {
TimeToInteractive uint64 `json:"time_to_interactive"`
}
type GraphQLFTS struct {
SessionID uint64 `json:"session_id"`
ProjectID uint32 `json:"project_id"`
OperationKind string `json:"operation_kind"`
OperationName string `json:"operation_name"`
Variables string `json:"variables"`
Response string `json:"response"`
}
func (s *Saver) SendToFTS(msg messages.Message, projID uint32) {
// Skip, if FTS is disabled
if s.producer == nil {
return
}
var (
event []byte
err error
)
switch m := msg.(type) {
// Common
case *messages.NetworkRequest:
event, err = json.Marshal(NetworkRequestFTS{
SessionID: msg.SessionID(),
ProjectID: projID,
Method: m.Method,
URL: m.URL,
Request: m.Request,
Response: m.Response,
Status: m.Status,
Timestamp: m.Timestamp,
Duration: m.Duration,
})
case *messages.PageEvent:
event, err = json.Marshal(PageEventFTS{
SessionID: msg.SessionID(),
func WrapPageEvent(m *messages.PageEvent, projID uint32) *PageEventFTS {
return &PageEventFTS{
SessionID: m.SessionID(),
ProjectID: projID,
MessageID: m.MessageID,
Timestamp: m.Timestamp,
@ -95,22 +75,58 @@ func (s *Saver) SendToFTS(msg messages.Message, projID uint32) {
SpeedIndex: m.SpeedIndex,
VisuallyComplete: m.VisuallyComplete,
TimeToInteractive: m.TimeToInteractive,
})
case *messages.GraphQL:
event, err = json.Marshal(GraphQLFTS{
SessionID: msg.SessionID(),
}
}
type GraphQLFTS struct {
SessionID uint64 `json:"session_id"`
ProjectID uint32 `json:"project_id"`
OperationKind string `json:"operation_kind"`
OperationName string `json:"operation_name"`
Variables string `json:"variables"`
Response string `json:"response"`
}
func WrapGraphQL(m *messages.GraphQL, projID uint32) *GraphQLFTS {
return &GraphQLFTS{
SessionID: m.SessionID(),
ProjectID: projID,
OperationKind: m.OperationKind,
OperationName: m.OperationName,
Variables: m.Variables,
Response: m.Response,
})
}
}
func (s *saverImpl) sendToFTS(msg messages.Message) {
// Skip, if FTS is disabled
if s.producer == nil {
return
}
var (
projID uint32
event []byte
err error
)
if sess, err := s.pg.Cache.GetSession(msg.SessionID()); err == nil {
projID = sess.ProjectID
}
switch m := msg.(type) {
// Common
case *messages.NetworkRequest:
event, err = json.Marshal(WrapNetworkRequest(m, projID))
case *messages.PageEvent:
event, err = json.Marshal(WrapPageEvent(m, projID))
case *messages.GraphQL:
event, err = json.Marshal(WrapGraphQL(m, projID))
}
if err != nil {
log.Printf("can't marshal json for quickwit: %s", err)
} else {
if len(event) > 0 {
if err := s.producer.Produce(s.topic, msg.SessionID(), event); err != nil {
if err := s.producer.Produce(s.cfg.QuickwitTopic, msg.SessionID(), event); err != nil {
log.Printf("can't send event to quickwit: %s", err)
}
}

View file

@ -1,114 +0,0 @@
package datasaver
import (
"fmt"
"log"
. "openreplay/backend/pkg/messages"
)
func (mi *Saver) InsertMessage(msg Message) error {
sessionID := msg.SessionID()
switch m := msg.(type) {
// Common
case *Metadata:
if err := mi.pg.InsertMetadata(sessionID, m); err != nil {
return fmt.Errorf("insert metadata err: %s", err)
}
return nil
case *IssueEvent:
session, err := mi.pg.Cache.GetSession(sessionID)
if err != nil {
log.Printf("can't get session info for CH: %s", err)
} else {
if err := mi.ch.InsertIssue(session, m); err != nil {
log.Printf("can't insert issue event into clickhouse: %s", err)
}
}
return mi.pg.InsertIssueEvent(sessionID, m)
//TODO: message adapter (transformer) (at the level of pkg/message) for types: *IOSMetadata, *IOSIssueEvent and others
// Web
case *SessionStart:
return mi.pg.HandleWebSessionStart(sessionID, m)
case *SessionEnd:
return mi.pg.HandleWebSessionEnd(sessionID, m)
case *UserID:
return mi.pg.InsertWebUserID(sessionID, m)
case *UserAnonymousID:
return mi.pg.InsertWebUserAnonymousID(sessionID, m)
case *CustomEvent:
session, err := mi.pg.Cache.GetSession(sessionID)
if err != nil {
log.Printf("can't get session info for CH: %s", err)
} else {
if err := mi.ch.InsertCustom(session, m); err != nil {
log.Printf("can't insert graphQL event into clickhouse: %s", err)
}
}
return mi.pg.InsertWebCustomEvent(sessionID, m)
case *ClickEvent:
return mi.pg.InsertWebClickEvent(sessionID, m)
case *InputEvent:
return mi.pg.InsertWebInputEvent(sessionID, m)
// Unique Web messages
case *PageEvent:
return mi.pg.InsertWebPageEvent(sessionID, m)
case *JSException:
return mi.pg.InsertWebJSException(m)
case *IntegrationEvent:
return mi.pg.InsertWebIntegrationEvent(m)
case *NetworkRequest:
session, err := mi.pg.Cache.GetSession(sessionID)
if err != nil {
log.Printf("can't get session info for CH: %s", err)
} else {
project, err := mi.pg.GetProject(session.ProjectID)
if err != nil {
log.Printf("can't get project: %s", err)
} else {
if err := mi.ch.InsertRequest(session, m, project.SaveRequestPayloads); err != nil {
log.Printf("can't insert request event into clickhouse: %s", err)
}
}
}
return mi.pg.InsertWebNetworkRequest(sessionID, m)
case *GraphQL:
session, err := mi.pg.Cache.GetSession(sessionID)
if err != nil {
log.Printf("can't get session info for CH: %s", err)
} else {
if err := mi.ch.InsertGraphQL(session, m); err != nil {
log.Printf("can't insert graphQL event into clickhouse: %s", err)
}
}
return mi.pg.InsertWebGraphQL(sessionID, m)
case *SetPageLocation:
return mi.pg.InsertSessionReferrer(sessionID, m.Referrer)
// IOS
case *IOSSessionStart:
return mi.pg.InsertIOSSessionStart(sessionID, m)
case *IOSSessionEnd:
return mi.pg.InsertIOSSessionEnd(sessionID, m)
case *IOSUserID:
return mi.pg.InsertIOSUserID(sessionID, m)
case *IOSUserAnonymousID:
return mi.pg.InsertIOSUserAnonymousID(sessionID, m)
case *IOSCustomEvent:
return mi.pg.InsertIOSCustomEvent(sessionID, m)
case *IOSClickEvent:
return mi.pg.InsertIOSClickEvent(sessionID, m)
case *IOSInputEvent:
return mi.pg.InsertIOSInputEvent(sessionID, m)
// Unique IOS messages
case *IOSNetworkCall:
return mi.pg.InsertIOSNetworkCall(sessionID, m)
case *IOSScreenEnter:
return mi.pg.InsertIOSScreenEnter(sessionID, m)
case *IOSCrash:
return mi.pg.InsertIOSCrash(sessionID, m)
}
return nil // "Not implemented"
}

View file

@ -0,0 +1,83 @@
package datasaver
import (
"errors"
"log"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/clickhouse"
"openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/env"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
)
func (s *saverImpl) init() {
s.ch = clickhouse.NewConnector(env.String("CLICKHOUSE_STRING"))
if err := s.ch.Prepare(); err != nil {
log.Fatalf("can't prepare clickhouse: %s", err)
}
s.pg.Conn.SetClickHouse(s.ch)
if s.cfg.UseQuickwit {
s.producer = queue.NewProducer(s.cfg.MessageSizeLimit, true)
}
}
func (s *saverImpl) handleExtraMessage(msg Message) error {
// Send data to quickwit
s.sendToFTS(msg)
// Get session data
var (
session *types.Session
err error
)
if msg.TypeID() == MsgSessionEnd {
session, err = s.pg.GetSession(msg.SessionID())
} else {
session, err = s.pg.Cache.GetSession(msg.SessionID())
}
if session == nil {
if err != nil && !errors.Is(err, cache.NilSessionInCacheError) {
log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, msg.SessionID(), msg)
}
return err
}
// Handle message
switch m := msg.(type) {
case *SessionEnd:
return s.ch.InsertWebSession(session)
case *PerformanceTrackAggr:
return s.ch.InsertWebPerformanceTrackAggr(session, m)
case *MouseClick:
return s.ch.InsertWebClickEvent(session, m)
case *InputEvent:
return s.ch.InsertWebInputEvent(session, m)
// Unique for Web
case *PageEvent:
return s.ch.InsertWebPageEvent(session, m)
case *ResourceTiming:
return s.ch.InsertWebResourceEvent(session, m)
case *JSException:
return s.ch.InsertWebErrorEvent(session, types.WrapJSException(m))
case *IntegrationEvent:
return s.ch.InsertWebErrorEvent(session, types.WrapIntegrationEvent(m))
case *IssueEvent:
return s.ch.InsertIssue(session, m)
case *CustomEvent:
return s.ch.InsertCustom(session, m)
case *NetworkRequest:
project, err := s.pg.GetProject(session.ProjectID)
if err != nil {
log.Printf("can't get project: %s", err)
} else {
if err := s.ch.InsertRequest(session, m, project.SaveRequestPayloads); err != nil {
log.Printf("can't insert request event into clickhouse: %s", err)
}
}
case *GraphQL:
return s.ch.InsertGraphQL(session, m)
}
return nil
}

View file

@ -1,24 +0,0 @@
package datasaver
import (
"openreplay/backend/internal/config/db"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/clickhouse"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
)
type Saver struct {
pg *cache.PGCache
ch clickhouse.Connector
producer types.Producer
topic string
}
func New(pg *cache.PGCache, cfg *db.Config) *Saver {
var producer types.Producer = nil
if cfg.UseQuickwit {
producer = queue.NewProducer(cfg.MessageSizeLimit, true)
}
return &Saver{pg: pg, producer: producer, topic: cfg.QuickwitTopic}
}

View file

@ -1,56 +0,0 @@
package datasaver
import (
"log"
"openreplay/backend/pkg/db/clickhouse"
"openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/env"
"openreplay/backend/pkg/messages"
)
func (si *Saver) InitStats() {
si.ch = clickhouse.NewConnector(env.String("CLICKHOUSE_STRING"))
if err := si.ch.Prepare(); err != nil {
log.Fatalf("Clickhouse prepare error: %v\n", err)
}
si.pg.Conn.SetClickHouse(si.ch)
}
func (si *Saver) InsertStats(session *types.Session, msg messages.Message) error {
// Send data to quickwit
if sess, err := si.pg.Cache.GetSession(msg.SessionID()); err != nil {
si.SendToFTS(msg, 0)
} else {
si.SendToFTS(msg, sess.ProjectID)
}
switch m := msg.(type) {
// Web
case *messages.SessionEnd:
return si.ch.InsertWebSession(session)
case *messages.PerformanceTrackAggr:
return si.ch.InsertWebPerformanceTrackAggr(session, m)
case *messages.ClickEvent:
return si.ch.InsertWebClickEvent(session, m)
case *messages.InputEvent:
return si.ch.InsertWebInputEvent(session, m)
// Unique for Web
case *messages.PageEvent:
return si.ch.InsertWebPageEvent(session, m)
case *messages.ResourceEvent:
return si.ch.InsertWebResourceEvent(session, m)
case *messages.JSException:
return si.ch.InsertWebErrorEvent(session, types.WrapJSException(m))
case *messages.IntegrationEvent:
return si.ch.InsertWebErrorEvent(session, types.WrapIntegrationEvent(m))
}
return nil
}
func (si *Saver) CommitStats() error {
return si.ch.Commit()
}
func (si *Saver) Close() error {
return si.ch.Stop()
}

View file

@ -21,9 +21,9 @@ type Connector interface {
Commit() error
Stop() error
InsertWebSession(session *types.Session) error
InsertWebResourceEvent(session *types.Session, msg *messages.ResourceEvent) error
InsertWebResourceEvent(session *types.Session, msg *messages.ResourceTiming) error
InsertWebPageEvent(session *types.Session, msg *messages.PageEvent) error
InsertWebClickEvent(session *types.Session, msg *messages.ClickEvent) error
InsertWebClickEvent(session *types.Session, msg *messages.MouseClick) error
InsertWebInputEvent(session *types.Session, msg *messages.InputEvent) error
InsertWebErrorEvent(session *types.Session, msg *types.ErrorEvent) error
InsertWebPerformanceTrackAggr(session *types.Session, msg *messages.PerformanceTrackAggr) error
@ -147,9 +147,7 @@ func (c *connectorImpl) worker() {
for {
select {
case t := <-c.workerTask:
start := time.Now()
c.sendBulks(t)
log.Printf("ch bulks dur: %d", time.Now().Sub(start).Milliseconds())
case <-c.done:
for t := range c.workerTask {
c.sendBulks(t)
@ -242,28 +240,25 @@ func (c *connectorImpl) InsertWebSession(session *types.Session) error {
return nil
}
func (c *connectorImpl) InsertWebResourceEvent(session *types.Session, msg *messages.ResourceEvent) error {
var method interface{} = url.EnsureMethod(msg.Method)
if method == "" {
method = nil
}
resourceType := url.EnsureType(msg.Type)
func (c *connectorImpl) InsertWebResourceEvent(session *types.Session, msg *messages.ResourceTiming) error {
msgType := url.GetResourceType(msg.Initiator, msg.URL)
resourceType := url.EnsureType(msgType)
if resourceType == "" {
return fmt.Errorf("can't parse resource type, sess: %s, type: %s", session.SessionID, msg.Type)
return fmt.Errorf("can't parse resource type, sess: %d, type: %s", session.SessionID, msgType)
}
if err := c.batches["resources"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MessageID,
msg.MsgID(),
datetime(msg.Timestamp),
url.DiscardURLQuery(msg.URL),
msg.Type,
msgType,
nullableUint16(uint16(msg.Duration)),
nullableUint16(uint16(msg.TTFB)),
nullableUint16(uint16(msg.HeaderSize)),
nullableUint32(uint32(msg.EncodedBodySize)),
nullableUint32(uint32(msg.DecodedBodySize)),
msg.Success,
msg.Duration != 0,
); err != nil {
c.checkError("resources", err)
return fmt.Errorf("can't append to resources batch: %s", err)
@ -298,14 +293,14 @@ func (c *connectorImpl) InsertWebPageEvent(session *types.Session, msg *messages
return nil
}
func (c *connectorImpl) InsertWebClickEvent(session *types.Session, msg *messages.ClickEvent) error {
func (c *connectorImpl) InsertWebClickEvent(session *types.Session, msg *messages.MouseClick) error {
if msg.Label == "" {
return nil
}
if err := c.batches["clicks"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MessageID,
msg.MsgID(),
datetime(msg.Timestamp),
msg.Label,
nullableUint32(uint32(msg.HesitationTime)),

View file

@ -315,35 +315,6 @@ class InputEvent(Message):
self.label = label
class ClickEvent(Message):
__id__ = 33
def __init__(self, message_id, timestamp, hesitation_time, label, selector):
self.message_id = message_id
self.timestamp = timestamp
self.hesitation_time = hesitation_time
self.label = label
self.selector = selector
class ResourceEvent(Message):
__id__ = 35
def __init__(self, message_id, timestamp, duration, ttfb, header_size, encoded_body_size, decoded_body_size, url, type, success, method, status):
self.message_id = message_id
self.timestamp = timestamp
self.duration = duration
self.ttfb = ttfb
self.header_size = header_size
self.encoded_body_size = encoded_body_size
self.decoded_body_size = decoded_body_size
self.url = url
self.type = type
self.success = success
self.method = method
self.status = status
class CSSInsertRule(Message):
__id__ = 37
@ -470,13 +441,6 @@ class SetNodeAttributeDict(Message):
self.value_key = value_key
class DOMDrop(Message):
__id__ = 52
def __init__(self, timestamp):
self.timestamp = timestamp
class ResourceTiming(Message):
__id__ = 53

View file

@ -321,31 +321,6 @@ class MessageCodec(Codec):
label=self.read_string(reader)
)
if message_id == 33:
return ClickEvent(
message_id=self.read_uint(reader),
timestamp=self.read_uint(reader),
hesitation_time=self.read_uint(reader),
label=self.read_string(reader),
selector=self.read_string(reader)
)
if message_id == 35:
return ResourceEvent(
message_id=self.read_uint(reader),
timestamp=self.read_uint(reader),
duration=self.read_uint(reader),
ttfb=self.read_uint(reader),
header_size=self.read_uint(reader),
encoded_body_size=self.read_uint(reader),
decoded_body_size=self.read_uint(reader),
url=self.read_string(reader),
type=self.read_string(reader),
success=self.read_boolean(reader),
method=self.read_string(reader),
status=self.read_uint(reader)
)
if message_id == 37:
return CSSInsertRule(
id=self.read_uint(reader),
@ -444,11 +419,6 @@ class MessageCodec(Codec):
value_key=self.read_uint(reader)
)
if message_id == 52:
return DOMDrop(
timestamp=self.read_uint(reader)
)
if message_id == 53:
return ResourceTiming(
timestamp=self.read_uint(reader),

View file

@ -187,29 +187,6 @@ message 32, 'InputEvent', :tracker => false, :replayer => false do
boolean 'ValueMasked'
string 'Label'
end
message 33, 'ClickEvent', :tracker => false, :replayer => false do
uint 'MessageID'
uint 'Timestamp'
uint 'HesitationTime'
string 'Label'
string 'Selector'
end
## 34
message 35, 'ResourceEvent', :tracker => false, :replayer => false do
uint 'MessageID'
uint 'Timestamp'
uint 'Duration'
uint 'TTFB'
uint 'HeaderSize'
uint 'EncodedBodySize'
uint 'DecodedBodySize'
string 'URL'
string 'Type'
boolean 'Success'
string 'Method'
uint 'Status'
end
#36
# DEPRECATED since 4.0.2 in favor of AdoptedSSInsertRule + AdoptedSSAddOwner
message 37, 'CSSInsertRule' do
@ -288,12 +265,6 @@ message 51, "SetNodeAttributeDict" do
uint 'NameKey'
uint 'ValueKey'
end
## 50,51
# Doesn't work properly. TODO: Make proper detections in tracker
message 52, 'DOMDrop', :tracker => false, :replayer => false do
uint 'Timestamp'
end
message 53, 'ResourceTiming', :replayer => :devtools do
uint 'Timestamp'
uint 'Duration'