From 44dae11886839152c84ce6f01218aa7be9e07f27 Mon Sep 17 00:00:00 2001 From: Alexander Zavorotynskiy Date: Fri, 13 May 2022 17:00:09 +0200 Subject: [PATCH] feat(backend/db): fixed ee version --- backend/cmd/db/main.go | 8 +-- backend/internal/datasaver/stats.go | 8 +++ ee/backend/internal/datasaver/stats.go | 79 ++++++++++++++++++++++++ ee/backend/services/db/stats.go | 83 -------------------------- 4 files changed, 91 insertions(+), 87 deletions(-) create mode 100644 ee/backend/internal/datasaver/stats.go delete mode 100644 ee/backend/services/db/stats.go diff --git a/backend/cmd/db/main.go b/backend/cmd/db/main.go index 20d6ce55b..1c1d3bf0e 100644 --- a/backend/cmd/db/main.go +++ b/backend/cmd/db/main.go @@ -44,6 +44,7 @@ func main() { // Init modules saver := datasaver.New(pg) + saver.InitStats() statsLogger := logger.NewQueueStats(cfg.LoggerTimeout) // Handler logic @@ -117,10 +118,9 @@ func main() { os.Exit(0) case <-commitTick: pg.CommitBatches() - // TODO: ee commit stats !!! - //if err := commitStats(); err != nil { - // log.Printf("Error on stats commit: %v", err) - //} + if err := saver.CommitStats(); err != nil { + log.Printf("Error on stats commit: %v", err) + } // TODO?: separate stats & regular messages if err := consumer.Commit(); err != nil { log.Printf("Error on consumer commit: %v", err) diff --git a/backend/internal/datasaver/stats.go b/backend/internal/datasaver/stats.go index a57d91824..26efe51b5 100644 --- a/backend/internal/datasaver/stats.go +++ b/backend/internal/datasaver/stats.go @@ -5,6 +5,10 @@ import ( . "openreplay/backend/pkg/messages" ) +func (si *Saver) InitStats() { + // noop +} + func (si *Saver) InsertStats(session *Session, msg Message) error { switch m := msg.(type) { // Web @@ -17,3 +21,7 @@ func (si *Saver) InsertStats(session *Session, msg Message) error { } return nil } + +func (si *Saver) CommitStats() error { + return nil +} diff --git a/ee/backend/internal/datasaver/stats.go b/ee/backend/internal/datasaver/stats.go new file mode 100644 index 000000000..501a861aa --- /dev/null +++ b/ee/backend/internal/datasaver/stats.go @@ -0,0 +1,79 @@ +package datasaver + +import ( + "log" + "time" + + "openreplay/backend/pkg/db/clickhouse" + "openreplay/backend/pkg/env" +) + +var ch *clickhouse.Connector +var finalizeTicker <-chan time.Time + +func (si *Saver) InitStats() { + ch = clickhouse.NewConnector(env.String("CLICKHOUSE_STRING")) + if err := ch.Prepare(); err != nil { + log.Fatalf("Clickhouse prepare error: %v\n", err) + } + + finalizeTicker = time.Tick(20 * time.Minute) + +} + +func (si *Saver) InsertStats(session *Session, msg Message) error { + switch m := msg.(type) { + // Web + case *SessionEnd: + return si.pg.InsertWebSession(session) + case *PerformanceTrackAggr: + return si.pg.InsertWebPerformanceTrackAggr(session, m) + case *ClickEvent: + return si.pg.InsertWebClickEvent(session, m) + case *InputEvent: + return si.pg.InsertWebInputEvent(session, m) + // Unique for Web + case *PageEvent: + si.pg.InsertWebPageEvent(session, m) + case *ResourceEvent: + return si.pg.InsertWebResourceEvent(session, m) + case *ErrorEvent: + return si.pg.InsertWebErrorEvent(session, m) + case *LongTask: + return si.pg.InsertLongtask(session, m) + + // IOS + case *IOSSessionEnd: + return si.pg.InsertIOSSession(session) + case *IOSPerformanceAggregated: + return si.pg.InsertIOSPerformanceAggregated(session, m) + case *IOSClickEvent: + return si.pg.InsertIOSClickEvent(session, m) + case *IOSInputEvent: + return si.pg.InsertIOSInputEvent(session, m) + // Unique for Web + case *IOSScreenEnter: + //ch.InsertIOSView(session, m) + case *IOSCrash: + return si.pg.InsertIOSCrash(session, m) + case *IOSNetworkCall: + return si.pg.InsertIOSNetworkCall(session, m) + } + return nil +} + +func (si *Saver) CommitStats() error { + select { + case <-finalizeTicker: + if err := ch.FinaliseSessionsTable(); err != nil { + log.Printf("Stats: FinaliseSessionsTable returned an error. %v", err) + } + default: + } + errCommit := ch.Commit() + errPrepare := ch.Prepare() + if errCommit != nil { + return errCommit + } + return errPrepare +} diff --git a/ee/backend/services/db/stats.go b/ee/backend/services/db/stats.go deleted file mode 100644 index 9d250fc51..000000000 --- a/ee/backend/services/db/stats.go +++ /dev/null @@ -1,83 +0,0 @@ -package main - -import ( - "log" - "time" - - - . "openreplay/backend/pkg/messages" - . "openreplay/backend/pkg/db/types" - "openreplay/backend/pkg/db/clickhouse" - "openreplay/backend/pkg/env" -) - -var ch *clickhouse.Connector -var finalizeTicker <-chan time.Time - -func initStats() { - ch = clickhouse.NewConnector(env.String("CLICKHOUSE_STRING")) - if err := ch.Prepare(); err != nil { - log.Fatalf("Clickhouse prepare error: %v\n", err) - } - - finalizeTicker = time.Tick(20 * time.Minute) - -} - -func insertStats(session *Session, msg Message) error { - switch m := msg.(type) { - // Web - case *SessionEnd: - return ch.InsertWebSession(session) - case *PerformanceTrackAggr: - return ch.InsertWebPerformanceTrackAggr(session, m) - case *ClickEvent: - return ch.InsertWebClickEvent(session, m) - case *InputEvent: - return ch.InsertWebInputEvent(session, m) - // Unique for Web - case *PageEvent: - ch.InsertWebPageEvent(session, m) - case *ResourceEvent: - return ch.InsertWebResourceEvent(session, m) - case *ErrorEvent: - return ch.InsertWebErrorEvent(session, m) - case *LongTask: - return ch.InsertLongtask(session, m) - - // IOS - case *IOSSessionEnd: - return ch.InsertIOSSession(session) - case *IOSPerformanceAggregated: - return ch.InsertIOSPerformanceAggregated(session, m) - case *IOSClickEvent: - return ch.InsertIOSClickEvent(session, m) - case *IOSInputEvent: - return ch.InsertIOSInputEvent(session, m) - // Unique for Web - case *IOSScreenEnter: - //ch.InsertIOSView(session, m) - case *IOSCrash: - return ch.InsertIOSCrash(session, m) - case *IOSNetworkCall: - return ch.InsertIOSNetworkCall(session, m) - } - return nil -} - -func commitStats() error { - select { - case <-finalizeTicker: - if err := ch.FinaliseSessionsTable(); err != nil { - log.Printf("Stats: FinaliseSessionsTable returned an error. %v", err) - } - default: - } - errCommit := ch.Commit() - errPrepare := ch.Prepare() - if errCommit != nil { - return errCommit - } - return errPrepare -} -