diff --git a/backend/services/db/main.go b/backend/services/db/main.go index bd5c254e2..649c77bef 100644 --- a/backend/services/db/main.go +++ b/backend/services/db/main.go @@ -72,6 +72,7 @@ func main() { }) }, ) + consumer.DisableAutoCommit() sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) @@ -85,7 +86,13 @@ func main() { consumer.Close() os.Exit(0) case <-tick: - commitStats() // TODO: sync with wueue commit + if err := commitStats(); err != nil { + log.Printf("Error on stats commit: %v", err) + } + // TODO?: separate stats & regular messages + if err := consumer.Commit(); err != nil { + log.Printf("Error on consumer commit: %v", err) + } default: err := consumer.ConsumeNext() if err != nil { diff --git a/ee/backend/pkg/db/clickhouse/connector.go b/ee/backend/pkg/db/clickhouse/connector.go index 195f2fa48..0851ac76a 100644 --- a/ee/backend/pkg/db/clickhouse/connector.go +++ b/ee/backend/pkg/db/clickhouse/connector.go @@ -23,6 +23,7 @@ type Connector struct { errors *bulk performance *bulk longtasks *bulk + db *sql.DB } func NewConnector(url string) *Connector { @@ -31,6 +32,7 @@ func NewConnector(url string) *Connector { log.Fatalln(err) } return &Connector{ + db: db, sessionsIOS: newBulk(db, ` INSERT INTO sessions_ios (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, datetime, duration, views_count, events_count, crashes_count, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) @@ -204,3 +206,9 @@ func (conn *Connector) Commit() error { } return nil } + + +func (conn *Connector) FinaliseSessionsTable() error { + _, err := conn.db.Exec("OPTIMIZE TABLE sessions FINAL") + return err +} diff --git a/ee/backend/services/db/stats.go b/ee/backend/services/db/stats.go index cac5d63d4..b2fcb3f67 100644 --- a/ee/backend/services/db/stats.go +++ b/ee/backend/services/db/stats.go @@ -2,6 +2,7 @@ package main import ( "log" + "time" . "openreplay/backend/pkg/messages" @@ -11,6 +12,7 @@ import ( ) var ch *clickhouse.Connector +var finalizeTicker *time.Ticker func initStats() { ch = clickhouse.NewConnector(env.String("CLICKHOUSE_STRING")) @@ -18,6 +20,8 @@ func initStats() { log.Fatalf("Clickhouse prepare error: %v\n", err) } + finalizeTicker = time.NewTicker(20 * time.Minute) + } func insertStats(session *Session, msg Message) error { @@ -62,5 +66,13 @@ func insertStats(session *Session, msg Message) error { } func commitStats() error { - return nil -} \ No newline at end of file + select { + case <-finalizeTicker: + if err := ch.FinaliseSessionsTable(); err != nil { + log.Printf("Stats: FinaliseSessionsTable returned an error. %v", err) + } + default: + } + return ch.Commit(); +} +