fix (backend-db): stats commit process, add sessions table finalising
This commit is contained in:
parent
0203ed8ea1
commit
64fa88e60b
3 changed files with 30 additions and 3 deletions
|
|
@ -72,6 +72,7 @@ func main() {
|
|||
})
|
||||
},
|
||||
)
|
||||
consumer.DisableAutoCommit()
|
||||
|
||||
sigchan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
|
@ -85,7 +86,13 @@ func main() {
|
|||
consumer.Close()
|
||||
os.Exit(0)
|
||||
case <-tick:
|
||||
commitStats() // TODO: sync with wueue commit
|
||||
if err := commitStats(); err != nil {
|
||||
log.Printf("Error on stats commit: %v", err)
|
||||
}
|
||||
// TODO?: separate stats & regular messages
|
||||
if err := consumer.Commit(); err != nil {
|
||||
log.Printf("Error on consumer commit: %v", err)
|
||||
}
|
||||
default:
|
||||
err := consumer.ConsumeNext()
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ type Connector struct {
|
|||
errors *bulk
|
||||
performance *bulk
|
||||
longtasks *bulk
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
func NewConnector(url string) *Connector {
|
||||
|
|
@ -31,6 +32,7 @@ func NewConnector(url string) *Connector {
|
|||
log.Fatalln(err)
|
||||
}
|
||||
return &Connector{
|
||||
db: db,
|
||||
sessionsIOS: newBulk(db, `
|
||||
INSERT INTO sessions_ios (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, datetime, duration, views_count, events_count, crashes_count, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
|
|
@ -204,3 +206,9 @@ func (conn *Connector) Commit() error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
func (conn *Connector) FinaliseSessionsTable() error {
|
||||
_, err := conn.db.Exec("OPTIMIZE TABLE sessions FINAL")
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package main
|
|||
|
||||
import (
|
||||
"log"
|
||||
"time"
|
||||
|
||||
|
||||
. "openreplay/backend/pkg/messages"
|
||||
|
|
@ -11,6 +12,7 @@ import (
|
|||
)
|
||||
|
||||
var ch *clickhouse.Connector
|
||||
var finalizeTicker *time.Ticker
|
||||
|
||||
func initStats() {
|
||||
ch = clickhouse.NewConnector(env.String("CLICKHOUSE_STRING"))
|
||||
|
|
@ -18,6 +20,8 @@ func initStats() {
|
|||
log.Fatalf("Clickhouse prepare error: %v\n", err)
|
||||
}
|
||||
|
||||
finalizeTicker = time.NewTicker(20 * time.Minute)
|
||||
|
||||
}
|
||||
|
||||
func insertStats(session *Session, msg Message) error {
|
||||
|
|
@ -62,5 +66,13 @@ func insertStats(session *Session, msg Message) error {
|
|||
}
|
||||
|
||||
func commitStats() error {
|
||||
return nil
|
||||
select {
|
||||
case <-finalizeTicker:
|
||||
if err := ch.FinaliseSessionsTable(); err != nil {
|
||||
log.Printf("Stats: FinaliseSessionsTable returned an error. %v", err)
|
||||
}
|
||||
default:
|
||||
}
|
||||
return ch.Commit();
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue