feat(backend/db): fixed ee version

This commit is contained in:
Alexander Zavorotynskiy 2022-05-13 17:00:09 +02:00
parent 4ac3da241e
commit 44dae11886
4 changed files with 91 additions and 87 deletions

View file

@ -44,6 +44,7 @@ func main() {
// Init modules
saver := datasaver.New(pg)
saver.InitStats()
statsLogger := logger.NewQueueStats(cfg.LoggerTimeout)
// Handler logic
@ -117,10 +118,9 @@ func main() {
os.Exit(0)
case <-commitTick:
pg.CommitBatches()
// TODO: ee commit stats !!!
//if err := commitStats(); err != nil {
// log.Printf("Error on stats commit: %v", err)
//}
if err := saver.CommitStats(); err != nil {
log.Printf("Error on stats commit: %v", err)
}
// TODO?: separate stats & regular messages
if err := consumer.Commit(); err != nil {
log.Printf("Error on consumer commit: %v", err)

View file

@ -5,6 +5,10 @@ import (
. "openreplay/backend/pkg/messages"
)
func (si *Saver) InitStats() {
// noop
}
func (si *Saver) InsertStats(session *Session, msg Message) error {
switch m := msg.(type) {
// Web
@ -17,3 +21,7 @@ func (si *Saver) InsertStats(session *Session, msg Message) error {
}
return nil
}
func (si *Saver) CommitStats() error {
return nil
}

View file

@ -0,0 +1,79 @@
package datasaver
import (
"log"
"time"
"openreplay/backend/pkg/db/clickhouse"
"openreplay/backend/pkg/env"
)
var ch *clickhouse.Connector
var finalizeTicker <-chan time.Time
func (si *Saver) InitStats() {
ch = clickhouse.NewConnector(env.String("CLICKHOUSE_STRING"))
if err := ch.Prepare(); err != nil {
log.Fatalf("Clickhouse prepare error: %v\n", err)
}
finalizeTicker = time.Tick(20 * time.Minute)
}
func (si *Saver) InsertStats(session *Session, msg Message) error {
switch m := msg.(type) {
// Web
case *SessionEnd:
return si.pg.InsertWebSession(session)
case *PerformanceTrackAggr:
return si.pg.InsertWebPerformanceTrackAggr(session, m)
case *ClickEvent:
return si.pg.InsertWebClickEvent(session, m)
case *InputEvent:
return si.pg.InsertWebInputEvent(session, m)
// Unique for Web
case *PageEvent:
si.pg.InsertWebPageEvent(session, m)
case *ResourceEvent:
return si.pg.InsertWebResourceEvent(session, m)
case *ErrorEvent:
return si.pg.InsertWebErrorEvent(session, m)
case *LongTask:
return si.pg.InsertLongtask(session, m)
// IOS
case *IOSSessionEnd:
return si.pg.InsertIOSSession(session)
case *IOSPerformanceAggregated:
return si.pg.InsertIOSPerformanceAggregated(session, m)
case *IOSClickEvent:
return si.pg.InsertIOSClickEvent(session, m)
case *IOSInputEvent:
return si.pg.InsertIOSInputEvent(session, m)
// Unique for Web
case *IOSScreenEnter:
//ch.InsertIOSView(session, m)
case *IOSCrash:
return si.pg.InsertIOSCrash(session, m)
case *IOSNetworkCall:
return si.pg.InsertIOSNetworkCall(session, m)
}
return nil
}
func (si *Saver) CommitStats() error {
select {
case <-finalizeTicker:
if err := ch.FinaliseSessionsTable(); err != nil {
log.Printf("Stats: FinaliseSessionsTable returned an error. %v", err)
}
default:
}
errCommit := ch.Commit()
errPrepare := ch.Prepare()
if errCommit != nil {
return errCommit
}
return errPrepare
}

View file

@ -1,83 +0,0 @@
package main
import (
"log"
"time"
. "openreplay/backend/pkg/messages"
. "openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/db/clickhouse"
"openreplay/backend/pkg/env"
)
var ch *clickhouse.Connector
var finalizeTicker <-chan time.Time
func initStats() {
ch = clickhouse.NewConnector(env.String("CLICKHOUSE_STRING"))
if err := ch.Prepare(); err != nil {
log.Fatalf("Clickhouse prepare error: %v\n", err)
}
finalizeTicker = time.Tick(20 * time.Minute)
}
func insertStats(session *Session, msg Message) error {
switch m := msg.(type) {
// Web
case *SessionEnd:
return ch.InsertWebSession(session)
case *PerformanceTrackAggr:
return ch.InsertWebPerformanceTrackAggr(session, m)
case *ClickEvent:
return ch.InsertWebClickEvent(session, m)
case *InputEvent:
return ch.InsertWebInputEvent(session, m)
// Unique for Web
case *PageEvent:
ch.InsertWebPageEvent(session, m)
case *ResourceEvent:
return ch.InsertWebResourceEvent(session, m)
case *ErrorEvent:
return ch.InsertWebErrorEvent(session, m)
case *LongTask:
return ch.InsertLongtask(session, m)
// IOS
case *IOSSessionEnd:
return ch.InsertIOSSession(session)
case *IOSPerformanceAggregated:
return ch.InsertIOSPerformanceAggregated(session, m)
case *IOSClickEvent:
return ch.InsertIOSClickEvent(session, m)
case *IOSInputEvent:
return ch.InsertIOSInputEvent(session, m)
// Unique for Web
case *IOSScreenEnter:
//ch.InsertIOSView(session, m)
case *IOSCrash:
return ch.InsertIOSCrash(session, m)
case *IOSNetworkCall:
return ch.InsertIOSNetworkCall(session, m)
}
return nil
}
func commitStats() error {
select {
case <-finalizeTicker:
if err := ch.FinaliseSessionsTable(); err != nil {
log.Printf("Stats: FinaliseSessionsTable returned an error. %v", err)
}
default:
}
errCommit := ch.Commit()
errPrepare := ch.Prepare()
if errCommit != nil {
return errCommit
}
return errPrepare
}