openreplay/ee/backend/internal/db/datasaver/stats.go
Alexander 0d82d7feac
DB improvements (#647)
* feat(backend/db): updated ClickHouse library version from 1.5.4 to 2.2.0
* feat(backend/db): refactored ClickHouse connector
* feat(backend/db): rewritten batch implementation for ClickHouse inserts
* feat(backend/db): found and fix memory leak in db service
2022-07-28 16:34:44 +02:00

64 lines
1.4 KiB
Go

package datasaver
import (
"log"
"time"
"openreplay/backend/pkg/db/clickhouse"
. "openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/env"
. "openreplay/backend/pkg/messages"
)
var ch clickhouse.Connector
var finalizeTicker <-chan time.Time
func (si *Saver) InitStats() {
ch = clickhouse.NewConnector(env.String("CLICKHOUSE_STRING"))
if err := ch.Prepare(); err != nil {
log.Fatalf("Clickhouse prepare error: %v\n", err)
}
finalizeTicker = time.Tick(20 * time.Minute)
}
func (si *Saver) InsertStats(session *Session, msg Message) error {
switch m := msg.(type) {
// Web
case *SessionEnd:
return ch.InsertWebSession(session)
case *PerformanceTrackAggr:
return ch.InsertWebPerformanceTrackAggr(session, m)
case *ClickEvent:
return ch.InsertWebClickEvent(session, m)
case *InputEvent:
return ch.InsertWebInputEvent(session, m)
// Unique for Web
case *PageEvent:
ch.InsertWebPageEvent(session, m)
case *ResourceEvent:
return ch.InsertWebResourceEvent(session, m)
case *ErrorEvent:
return ch.InsertWebErrorEvent(session, m)
case *LongTask:
return ch.InsertLongtask(session, m)
}
return nil
}
func (si *Saver) CommitStats() error {
select {
case <-finalizeTicker:
if err := ch.FinaliseSessionsTable(); err != nil {
log.Printf("Stats: FinaliseSessionsTable returned an error. %v", err)
}
default:
}
errCommit := ch.Commit()
errPrepare := ch.Prepare()
if errCommit != nil {
return errCommit
}
return errPrepare
}