Ch improvements (#705)

* feat(backend): in-memory batches for click house
* feat(backend): new scheme for ClickHouse tables
This commit is contained in:
Alexander 2022-08-31 19:00:26 +02:00 committed by GitHub
parent 2a6b8904e2
commit 7421154939
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 369 additions and 163 deletions

View file

@ -15,6 +15,7 @@ require (
github.com/jackc/pgerrcode v0.0.0-20201024163028-a0d42d470451
github.com/jackc/pgx/v4 v4.6.0
github.com/klauspost/pgzip v1.2.5
github.com/lib/pq v1.2.0
github.com/oschwald/maxminddb-golang v1.7.0
github.com/pkg/errors v0.9.1
github.com/sethvargo/go-envconfig v0.7.0

View file

@ -83,6 +83,14 @@ func (c *PGCache) InsertWebErrorEvent(sessionID uint64, e *ErrorEvent) error {
return nil
}
func (c *PGCache) InsertSessionReferrer(sessionID uint64, referrer string) error {
_, err := c.GetSession(sessionID)
if err != nil {
return err
}
return c.Conn.InsertSessionReferrer(sessionID, referrer)
}
func (c *PGCache) InsertWebFetchEvent(sessionID uint64, e *FetchEvent) error {
session, err := c.GetSession(sessionID)
if err != nil {

View file

@ -3,7 +3,6 @@ package cache
import (
"errors"
"github.com/jackc/pgx/v4"
. "openreplay/backend/pkg/db/types"
)

View file

@ -5,6 +5,8 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"log"
"openreplay/backend/pkg/db/clickhouse"
"openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/monitoring"
"strings"
"time"
@ -37,6 +39,11 @@ type Conn struct {
batchSizeLines syncfloat64.Histogram
sqlRequestTime syncfloat64.Histogram
sqlRequestCounter syncfloat64.Counter
chConn clickhouse.Connector
}
func (conn *Conn) SetClickHouse(ch clickhouse.Connector) {
conn.chConn = ch
}
func NewConn(url string, queueLimit, sizeLimit int, metrics *monitoring.Metrics) *Conn {
@ -152,6 +159,13 @@ func (conn *Conn) insertAutocompleteValue(sessionID uint64, projectID uint32, tp
if err := conn.autocompletes.Append(value, tp, projectID); err != nil {
log.Printf("autocomplete bulk err: %s", err)
}
if conn.chConn == nil {
return
}
// Send autocomplete data to clickhouse
if err := conn.chConn.InsertAutocomplete(&types.Session{SessionID: sessionID, ProjectID: projectID}, tp, value); err != nil {
log.Printf("click house autocomplete err: %s", err)
}
}
func (conn *Conn) batchQueue(sessionID uint64, sql string, args ...interface{}) {

View file

@ -83,7 +83,6 @@ func (conn *Conn) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64,
}
func (conn *Conn) HandleSessionEnd(sessionID uint64) error {
// TODO: search acceleration?
sqlRequest := `
UPDATE sessions
SET issue_types=(SELECT
@ -96,11 +95,7 @@ func (conn *Conn) HandleSessionEnd(sessionID uint64) error {
INNER JOIN issues AS ps USING (issue_id)
WHERE session_id = $1)
WHERE session_id = $1`
conn.batchQueue(sessionID, sqlRequest, sessionID)
// Record approximate message size
conn.updateBatchSize(sessionID, len(sqlRequest)+8)
return nil
return conn.c.Exec(sqlRequest, sessionID)
}
func (conn *Conn) InsertRequest(sessionID uint64, timestamp uint64, index uint64, url string, duration uint64, success bool) error {

View file

@ -185,3 +185,15 @@ func (conn *Conn) InsertWebGraphQLEvent(sessionID uint64, projectID uint32, save
conn.insertAutocompleteValue(sessionID, projectID, "GRAPHQL", e.OperationName)
return nil
}
func (conn *Conn) InsertSessionReferrer(sessionID uint64, referrer string) error {
log.Printf("insert referrer, sessID: %d, referrer: %s", sessionID, referrer)
if referrer == "" {
return nil
}
return conn.c.Exec(`
UPDATE sessions
SET referrer = $1, base_referrer = $2
WHERE session_id = $3 AND referrer IS NULL`,
referrer, url.DiscardURLQuery(referrer), sessionID)
}

View file

@ -1,17 +1,24 @@
package postgres
import . "openreplay/backend/pkg/db/types"
import (
"github.com/jackc/pgtype"
"log"
. "openreplay/backend/pkg/db/types"
)
func (conn *Conn) GetSession(sessionID uint64) (*Session, error) {
s := &Session{SessionID: sessionID}
var revID, userOSVersion *string
var issueTypes pgtype.EnumArray
if err := conn.c.QueryRow(`
SELECT platform,
duration, project_id, start_ts,
user_uuid, user_os, user_os_version,
user_device, user_device_type, user_country,
rev_id, tracker_version,
user_id, user_anonymous_id,
user_id, user_anonymous_id, referrer,
pages_count, events_count, errors_count, issue_types,
user_browser, user_browser_version, issue_score,
metadata_1, metadata_2, metadata_3, metadata_4, metadata_5,
metadata_6, metadata_7, metadata_8, metadata_9, metadata_10
FROM sessions
@ -23,7 +30,9 @@ func (conn *Conn) GetSession(sessionID uint64) (*Session, error) {
&s.UserUUID, &s.UserOS, &userOSVersion,
&s.UserDevice, &s.UserDeviceType, &s.UserCountry,
&revID, &s.TrackerVersion,
&s.UserID, &s.UserAnonymousID,
&s.UserID, &s.UserAnonymousID, &s.Referrer,
&s.PagesCount, &s.EventsCount, &s.ErrorsCount, &issueTypes,
&s.UserBrowser, &s.UserBrowserVersion, &s.IssueScore,
&s.Metadata1, &s.Metadata2, &s.Metadata3, &s.Metadata4, &s.Metadata5,
&s.Metadata6, &s.Metadata7, &s.Metadata8, &s.Metadata9, &s.Metadata10); err != nil {
return nil, err
@ -34,5 +43,8 @@ func (conn *Conn) GetSession(sessionID uint64) (*Session, error) {
if revID != nil {
s.RevID = *revID
}
if err := issueTypes.AssignTo(&s.IssueTypes); err != nil {
log.Printf("can't scan IssueTypes, err: %s", err)
}
return s, nil
}

View file

@ -11,11 +11,14 @@ type Session struct {
UserOSVersion string
UserDevice string
UserCountry string
Referrer *string
Duration *uint64
PagesCount int
EventsCount int
ErrorsCount int
IssueTypes []string
IssueScore int
UserID *string // pointer??
UserAnonymousID *string

View file

@ -22,6 +22,7 @@ type iteratorImpl struct {
msgSize uint64
canSkip bool
msg Message
url string
}
func NewIterator(data []byte) Iterator {
@ -96,6 +97,7 @@ func (i *iteratorImpl) Next() bool {
i.index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha)
i.timestamp = m.Timestamp
i.version = m.Version
i.url = m.Url
isBatchMeta = true
log.Printf("new batch version: %d", i.version)
if i.version > 1 {
@ -133,9 +135,13 @@ func (i *iteratorImpl) Next() bool {
case MsgSessionEnd:
m := i.msg.Decode().(*SessionEnd)
i.timestamp = int64(m.Timestamp)
case MsgSetPageLocation:
m := i.msg.Decode().(*SetPageLocation)
i.url = m.URL
}
i.msg.Meta().Index = i.index
i.msg.Meta().Timestamp = i.timestamp
i.msg.Meta().Url = i.url
if !isBatchMeta { // Without that indexes will be unique anyway, though shifted by 1 because BatchMeta is not counted in tracker
i.index++

View file

@ -3,6 +3,7 @@ package messages
type message struct {
Timestamp int64
Index uint64
Url string
}
func (m *message) Meta() *message {
@ -12,6 +13,7 @@ func (m *message) Meta() *message {
func (m *message) SetMeta(origin *message) {
m.Timestamp = origin.Timestamp
m.Index = origin.Index
m.Url = origin.Url
}
type Message interface {

View file

@ -0,0 +1,107 @@
package datasaver
import (
"fmt"
"log"
"openreplay/backend/pkg/messages"
)
func (mi *Saver) InsertMessage(sessionID uint64, msg messages.Message) error {
switch m := msg.(type) {
// Common
case *messages.Metadata:
if err := mi.pg.InsertMetadata(sessionID, m); err != nil {
return fmt.Errorf("insert metadata err: %s", err)
}
return nil
case *messages.IssueEvent:
return mi.pg.InsertIssueEvent(sessionID, m)
//TODO: message adapter (transformer) (at the level of pkg/message) for types: *IOSMetadata, *IOSIssueEvent and others
// Web
case *messages.SessionStart:
return mi.pg.HandleWebSessionStart(sessionID, m)
case *messages.SessionEnd:
return mi.pg.HandleWebSessionEnd(sessionID, m)
case *messages.UserID:
return mi.pg.InsertWebUserID(sessionID, m)
case *messages.UserAnonymousID:
return mi.pg.InsertWebUserAnonymousID(sessionID, m)
case *messages.CustomEvent:
session, err := mi.pg.GetSession(sessionID)
if err != nil {
log.Printf("can't get session info for CH: %s", err)
} else {
if err := mi.ch.InsertCustom(session, m); err != nil {
log.Printf("can't insert graphQL event into clickhouse: %s", err)
}
}
return mi.pg.InsertWebCustomEvent(sessionID, m)
case *messages.ClickEvent:
return mi.pg.InsertWebClickEvent(sessionID, m)
case *messages.InputEvent:
return mi.pg.InsertWebInputEvent(sessionID, m)
// Unique Web messages
case *messages.PageEvent:
return mi.pg.InsertWebPageEvent(sessionID, m)
case *messages.ErrorEvent:
return mi.pg.InsertWebErrorEvent(sessionID, m)
case *messages.FetchEvent:
session, err := mi.pg.GetSession(sessionID)
if err != nil {
log.Printf("can't get session info for CH: %s", err)
} else {
if err := mi.ch.InsertRequest(session, m); err != nil {
log.Printf("can't insert request event into clickhouse: %s", err)
}
}
return mi.pg.InsertWebFetchEvent(sessionID, m)
case *messages.GraphQLEvent:
session, err := mi.pg.GetSession(sessionID)
if err != nil {
log.Printf("can't get session info for CH: %s", err)
} else {
if err := mi.ch.InsertGraphQL(session, m); err != nil {
log.Printf("can't insert graphQL event into clickhouse: %s", err)
}
}
return mi.pg.InsertWebGraphQLEvent(sessionID, m)
case *messages.IntegrationEvent:
return mi.pg.InsertWebErrorEvent(sessionID, &messages.ErrorEvent{
MessageID: m.Meta().Index,
Timestamp: m.Timestamp,
Source: m.Source,
Name: m.Name,
Message: m.Message,
Payload: m.Payload,
})
case *messages.SetPageLocation:
return mi.pg.InsertSessionReferrer(sessionID, m.Referrer)
// IOS
case *messages.IOSSessionStart:
return mi.pg.InsertIOSSessionStart(sessionID, m)
case *messages.IOSSessionEnd:
return mi.pg.InsertIOSSessionEnd(sessionID, m)
case *messages.IOSUserID:
return mi.pg.InsertIOSUserID(sessionID, m)
case *messages.IOSUserAnonymousID:
return mi.pg.InsertIOSUserAnonymousID(sessionID, m)
case *messages.IOSCustomEvent:
return mi.pg.InsertIOSCustomEvent(sessionID, m)
case *messages.IOSClickEvent:
return mi.pg.InsertIOSClickEvent(sessionID, m)
case *messages.IOSInputEvent:
return mi.pg.InsertIOSInputEvent(sessionID, m)
// Unique IOS messages
case *messages.IOSNetworkCall:
return mi.pg.InsertIOSNetworkCall(sessionID, m)
case *messages.IOSScreenEnter:
return mi.pg.InsertIOSScreenEnter(sessionID, m)
case *messages.IOSCrash:
return mi.pg.InsertIOSCrash(sessionID, m)
}
return nil // "Not implemented"
}

View file

@ -0,0 +1,15 @@
package datasaver
import (
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/clickhouse"
)
type Saver struct {
pg *cache.PGCache
ch clickhouse.Connector
}
func New(pg *cache.PGCache) *Saver {
return &Saver{pg: pg}
}

View file

@ -5,44 +5,42 @@ import (
"time"
"openreplay/backend/pkg/db/clickhouse"
. "openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/env"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/messages"
)
var ch clickhouse.Connector
var finalizeTicker <-chan time.Time
func (si *Saver) InitStats() {
ch = clickhouse.NewConnector(env.String("CLICKHOUSE_STRING"))
if err := ch.Prepare(); err != nil {
si.ch = clickhouse.NewConnector(env.String("CLICKHOUSE_STRING"))
if err := si.ch.Prepare(); err != nil {
log.Fatalf("Clickhouse prepare error: %v\n", err)
}
si.pg.Conn.SetClickHouse(si.ch)
finalizeTicker = time.Tick(20 * time.Minute)
}
func (si *Saver) InsertStats(session *Session, msg Message) error {
func (si *Saver) InsertStats(session *types.Session, msg messages.Message) error {
switch m := msg.(type) {
// Web
case *SessionEnd:
return ch.InsertWebSession(session)
case *PerformanceTrackAggr:
return ch.InsertWebPerformanceTrackAggr(session, m)
case *ClickEvent:
return ch.InsertWebClickEvent(session, m)
case *InputEvent:
return ch.InsertWebInputEvent(session, m)
// Unique for Web
case *PageEvent:
ch.InsertWebPageEvent(session, m)
case *ResourceEvent:
return ch.InsertWebResourceEvent(session, m)
case *ErrorEvent:
return ch.InsertWebErrorEvent(session, m)
case *LongTask:
return ch.InsertLongtask(session, m)
case *messages.SessionEnd:
// TODO: get issue_types and base_referrer before session end
return si.ch.InsertWebSession(session)
case *messages.PerformanceTrackAggr:
// TODO: page_path
return si.ch.InsertWebPerformanceTrackAggr(session, m)
case *messages.ClickEvent:
return si.ch.InsertWebClickEvent(session, m)
case *messages.InputEvent:
return si.ch.InsertWebInputEvent(session, m)
// Unique for Web
case *messages.PageEvent:
return si.ch.InsertWebPageEvent(session, m)
case *messages.ResourceEvent:
return si.ch.InsertWebResourceEvent(session, m)
case *messages.ErrorEvent:
return si.ch.InsertWebErrorEvent(session, m)
}
return nil
}
@ -50,15 +48,10 @@ func (si *Saver) InsertStats(session *Session, msg Message) error {
func (si *Saver) CommitStats() error {
select {
case <-finalizeTicker:
if err := ch.FinaliseSessionsTable(); err != nil {
if err := si.ch.FinaliseSessionsTable(); err != nil {
log.Printf("Stats: FinaliseSessionsTable returned an error. %v", err)
}
default:
}
errCommit := ch.Commit()
errPrepare := ch.Prepare()
if errCommit != nil {
return errCommit
}
return errPrepare
return si.ch.Commit()
}

View file

@ -7,6 +7,7 @@ import (
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"log"
"math"
"openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/hashid"
"openreplay/backend/pkg/messages"
@ -17,6 +18,50 @@ import (
"openreplay/backend/pkg/license"
)
type Bulk interface {
Append(args ...interface{}) error
Send() error
}
type bulkImpl struct {
conn driver.Conn
query string
values [][]interface{}
}
func NewBulk(conn driver.Conn, query string) (Bulk, error) {
switch {
case conn == nil:
return nil, errors.New("clickhouse connection is empty")
case query == "":
return nil, errors.New("query is empty")
}
return &bulkImpl{
conn: conn,
query: query,
values: make([][]interface{}, 0),
}, nil
}
func (b *bulkImpl) Append(args ...interface{}) error {
b.values = append(b.values, args)
return nil
}
func (b *bulkImpl) Send() error {
batch, err := b.conn.PrepareBatch(context.Background(), b.query)
if err != nil {
return fmt.Errorf("can't create new batch: %s", err)
}
for _, set := range b.values {
if err := batch.Append(set...); err != nil {
log.Printf("can't append value set to batch, err: %s", err)
}
}
b.values = make([][]interface{}, 0)
return batch.Send()
}
var CONTEXT_MAP = map[uint64]string{0: "unknown", 1: "self", 2: "same-origin-ancestor", 3: "same-origin-descendant", 4: "same-origin", 5: "cross-origin-ancestor", 6: "cross-origin-descendant", 7: "cross-origin-unreachable", 8: "multiple-contexts"}
var CONTAINER_TYPE_MAP = map[uint64]string{0: "window", 1: "iframe", 2: "embed", 3: "object"}
@ -31,12 +76,15 @@ type Connector interface {
InsertWebInputEvent(session *types.Session, msg *messages.InputEvent) error
InsertWebErrorEvent(session *types.Session, msg *messages.ErrorEvent) error
InsertWebPerformanceTrackAggr(session *types.Session, msg *messages.PerformanceTrackAggr) error
InsertLongtask(session *types.Session, msg *messages.LongTask) error
InsertAutocomplete(session *types.Session, msgType, msgValue string) error
InsertRequest(session *types.Session, msg *messages.FetchEvent) error
InsertCustom(session *types.Session, msg *messages.CustomEvent) error
InsertGraphQL(session *types.Session, msg *messages.GraphQLEvent) error
}
type connectorImpl struct {
conn driver.Conn
batches map[string]driver.Batch
batches map[string]Bulk //driver.Batch
}
func NewConnector(url string) Connector {
@ -62,33 +110,42 @@ func NewConnector(url string) Connector {
c := &connectorImpl{
conn: conn,
batches: make(map[string]driver.Batch, 9),
batches: make(map[string]Bulk, 9),
}
return c
}
func (c *connectorImpl) newBatch(name, query string) error {
batch, err := c.conn.PrepareBatch(context.Background(), query)
batch, err := NewBulk(c.conn, query)
if err != nil {
return fmt.Errorf("can't create new batch: %s", err)
}
if _, ok := c.batches[name]; ok {
delete(c.batches, name)
}
c.batches[name] = batch
return nil
}
/*
TODO:
+. add page_path to performance event (for the performance event, I need the page URL where that event happened
if it is not present in the message, you can extract it from the last page before that performance event)
2. add base_referrer to sessions table (if you check the code that adds page events to PG; it has columns called referrer
and base_referrer; I need these columns in the sessions table in clickhouse)
+. copy autocomplete to Clickhouse
+. add following tables from pg: REQUEST, CUSTOM, GRAPHQL
5. add issue_types to sessions (the same way it exists in PG) -> rewrite session end handler
*/
var batches = map[string]string{
"sessions": "INSERT INTO sessions (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, datetime, duration, pages_count, events_count, errors_count, user_browser, user_browser_version) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"metadata": "INSERT INTO sessions_metadata (session_id, user_id, user_anonymous_id, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10, datetime) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"resources": "INSERT INTO resources (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, url, type, duration, ttfb, header_size, encoded_body_size, decoded_body_size, success) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"pages": "INSERT INTO pages (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, url, request_start, response_start, response_end, dom_content_loaded_event_start, dom_content_loaded_event_end, load_event_start, load_event_end, first_paint, first_contentful_paint, speed_index, visually_complete, time_to_interactive) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"clicks": "INSERT INTO clicks (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, label, hesitation_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"inputs": "INSERT INTO inputs (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, label) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"errors": "INSERT INTO errors (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, source, name, message, error_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"performance": "INSERT INTO performance (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, min_fps, avg_fps, max_fps, min_cpu, avg_cpu, max_cpu, min_total_js_heap_size, avg_total_js_heap_size, max_total_js_heap_size, min_used_js_heap_size, avg_used_js_heap_size, max_used_js_heap_size) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"longtasks": "INSERT INTO longtasks (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, context, container_type, container_id, container_name, container_src) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"sessions": "INSERT INTO experimental.sessions (session_id, project_id, user_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, datetime, duration, pages_count, events_count, errors_count, issue_score, referrer, issue_types, tracker_version, user_browser, user_browser_version, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"resources": "INSERT INTO experimental.resources (session_id, project_id, datetime, url, type, duration, ttfb, header_size, encoded_body_size, decoded_body_size, success) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"autocompletes": "INSERT INTO experimental.autocomplete (project_id, type, value) VALUES (?, ?, ?)",
"pages": "INSERT INTO experimental.events (session_id, project_id, datetime, url, request_start, response_start, response_end, dom_content_loaded_event_start, dom_content_loaded_event_end, load_event_start, load_event_end, first_paint, first_contentful_paint_time, speed_index, visually_complete, time_to_interactive, event_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"clicks": "INSERT INTO experimental.events (session_id, project_id, datetime, label, hesitation_time, event_type) VALUES (?, ?, ?, ?, ?, ?)",
"inputs": "INSERT INTO experimental.events (session_id, project_id, datetime, label, event_type) VALUES (?, ?, ?, ?, ?)",
"errors": "INSERT INTO experimental.events (session_id, project_id, datetime, source, name, message, error_id, event_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
"performance": "INSERT INTO experimental.events (session_id, project_id, datetime, url, min_fps, avg_fps, max_fps, min_cpu, avg_cpu, max_cpu, min_total_js_heap_size, avg_total_js_heap_size, max_total_js_heap_size, min_used_js_heap_size, avg_used_js_heap_size, max_used_js_heap_size, event_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"requests": "INSERT INTO experimental.events (session_id, project_id, datetime, url, request_body, response_body, status, method, duration, success, event_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"custom": "INSERT INTO experimental.events (session_id, project_id, datetime, name, payload, event_type) VALUES (?, ?, ?, ?, ?, ?)",
"graphql": "INSERT INTO experimental.events (session_id, project_id, datetime, name, request_body, response_body, event_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
}
func (c *connectorImpl) Prepare() error {
@ -118,21 +175,19 @@ func (c *connectorImpl) FinaliseSessionsTable() error {
func (c *connectorImpl) checkError(name string, err error) {
if err != clickhouse.ErrBatchAlreadySent {
if batchErr := c.newBatch(name, batches[name]); batchErr != nil {
log.Printf("can't create %s batch after failed append operation: %s", name, batchErr)
}
log.Printf("can't create %s batch after failed append operation: %s", name, err)
}
}
func (c *connectorImpl) InsertWebSession(session *types.Session) error {
log.Printf("insert session: %+v", session)
if session.Duration == nil {
return errors.New("trying to insert session with nil duration")
}
if err := c.batches["sessions"].Append(
session.SessionID,
session.ProjectID,
session.TrackerVersion,
nullableString(session.RevID),
uint16(session.ProjectID),
session.UserID,
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
@ -144,17 +199,12 @@ func (c *connectorImpl) InsertWebSession(session *types.Session) error {
uint16(session.PagesCount),
uint16(session.EventsCount),
uint16(session.ErrorsCount),
// Web unique columns
uint32(session.IssueScore),
session.Referrer,
session.IssueTypes,
session.TrackerVersion,
session.UserBrowser,
nullableString(session.UserBrowserVersion),
); err != nil {
c.checkError("sessions", err)
return fmt.Errorf("can't append to sessions batch: %s", err)
}
if err := c.batches["metadata"].Append(
session.SessionID,
session.UserID,
session.UserAnonymousID,
session.Metadata1,
session.Metadata2,
session.Metadata3,
@ -165,10 +215,9 @@ func (c *connectorImpl) InsertWebSession(session *types.Session) error {
session.Metadata8,
session.Metadata9,
session.Metadata10,
datetime(session.Timestamp),
); err != nil {
c.checkError("metadata", err)
return fmt.Errorf("can't append to metadata batch: %s", err)
c.checkError("sessions", err)
return fmt.Errorf("can't append to sessions batch: %s", err)
}
return nil
}
@ -180,17 +229,7 @@ func (c *connectorImpl) InsertWebResourceEvent(session *types.Session, msg *mess
}
if err := c.batches["resources"].Append(
session.SessionID,
session.ProjectID,
session.TrackerVersion,
nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
session.UserBrowser,
nullableString(session.UserBrowserVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
uint16(session.ProjectID),
datetime(msg.Timestamp),
url.DiscardURLQuery(msg.URL),
msg.Type,
@ -210,16 +249,7 @@ func (c *connectorImpl) InsertWebResourceEvent(session *types.Session, msg *mess
func (c *connectorImpl) InsertWebPageEvent(session *types.Session, msg *messages.PageEvent) error {
if err := c.batches["pages"].Append(
session.SessionID,
session.ProjectID,
session.TrackerVersion, nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
session.UserBrowser,
nullableString(session.UserBrowserVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
uint16(session.ProjectID),
datetime(msg.Timestamp),
url.DiscardURLQuery(msg.URL),
nullableUint16(uint16(msg.RequestStart)),
@ -234,6 +264,7 @@ func (c *connectorImpl) InsertWebPageEvent(session *types.Session, msg *messages
nullableUint16(uint16(msg.SpeedIndex)),
nullableUint16(uint16(msg.VisuallyComplete)),
nullableUint16(uint16(msg.TimeToInteractive)),
"LOCATION",
); err != nil {
c.checkError("pages", err)
return fmt.Errorf("can't append to pages batch: %s", err)
@ -247,20 +278,11 @@ func (c *connectorImpl) InsertWebClickEvent(session *types.Session, msg *message
}
if err := c.batches["clicks"].Append(
session.SessionID,
session.ProjectID,
session.TrackerVersion,
nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
session.UserBrowser,
nullableString(session.UserBrowserVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
uint16(session.ProjectID),
datetime(msg.Timestamp),
msg.Label,
nullableUint32(uint32(msg.HesitationTime)),
"CLICK",
); err != nil {
c.checkError("clicks", err)
return fmt.Errorf("can't append to clicks batch: %s", err)
@ -274,19 +296,10 @@ func (c *connectorImpl) InsertWebInputEvent(session *types.Session, msg *message
}
if err := c.batches["inputs"].Append(
session.SessionID,
session.ProjectID,
session.TrackerVersion,
nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
session.UserBrowser,
nullableString(session.UserBrowserVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
uint16(session.ProjectID),
datetime(msg.Timestamp),
msg.Label,
"INPUT",
); err != nil {
c.checkError("inputs", err)
return fmt.Errorf("can't append to inputs batch: %s", err)
@ -297,22 +310,13 @@ func (c *connectorImpl) InsertWebInputEvent(session *types.Session, msg *message
func (c *connectorImpl) InsertWebErrorEvent(session *types.Session, msg *messages.ErrorEvent) error {
if err := c.batches["errors"].Append(
session.SessionID,
session.ProjectID,
session.TrackerVersion,
nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
session.UserBrowser,
nullableString(session.UserBrowserVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
uint16(session.ProjectID),
datetime(msg.Timestamp),
msg.Source,
nullableString(msg.Name),
msg.Message,
hashid.WebErrorID(session.ProjectID, msg),
"ERROR",
); err != nil {
c.checkError("errors", err)
return fmt.Errorf("can't append to errors batch: %s", err)
@ -324,18 +328,9 @@ func (c *connectorImpl) InsertWebPerformanceTrackAggr(session *types.Session, ms
var timestamp uint64 = (msg.TimestampStart + msg.TimestampEnd) / 2
if err := c.batches["performance"].Append(
session.SessionID,
session.ProjectID,
session.TrackerVersion,
nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
session.UserBrowser,
nullableString(session.UserBrowserVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
uint16(session.ProjectID),
datetime(timestamp),
nullableString(msg.Meta().Url),
uint8(msg.MinFPS),
uint8(msg.AvgFPS),
uint8(msg.MaxFPS),
@ -348,6 +343,7 @@ func (c *connectorImpl) InsertWebPerformanceTrackAggr(session *types.Session, ms
msg.MinUsedJSHeapSize,
msg.AvgUsedJSHeapSize,
msg.MaxUsedJSHeapSize,
"PERFORMANCE",
); err != nil {
c.checkError("performance", err)
return fmt.Errorf("can't append to performance batch: %s", err)
@ -355,29 +351,68 @@ func (c *connectorImpl) InsertWebPerformanceTrackAggr(session *types.Session, ms
return nil
}
func (c *connectorImpl) InsertLongtask(session *types.Session, msg *messages.LongTask) error {
if err := c.batches["longtasks"].Append(
session.SessionID,
session.ProjectID,
session.TrackerVersion,
nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
session.UserBrowser,
nullableString(session.UserBrowserVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
datetime(msg.Timestamp),
CONTEXT_MAP[msg.Context],
CONTAINER_TYPE_MAP[msg.ContainerType],
msg.ContainerId,
msg.ContainerName,
msg.ContainerSrc,
func (c *connectorImpl) InsertAutocomplete(session *types.Session, msgType, msgValue string) error {
if len(msgValue) == 0 {
return nil
}
if err := c.batches["autocompletes"].Append(
uint16(session.ProjectID),
msgType,
msgValue,
); err != nil {
c.checkError("longtasks", err)
return fmt.Errorf("can't append to longtasks batch: %s", err)
c.checkError("autocompletes", err)
return fmt.Errorf("can't append to autocompletes batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertRequest(session *types.Session, msg *messages.FetchEvent) error {
if err := c.batches["requests"].Append(
session.SessionID,
uint16(session.ProjectID),
datetime(msg.Timestamp),
msg.URL,
nullableString(msg.Request),
nullableString(msg.Response),
msg.Status,
url.EnsureMethod(msg.Method),
msg.Duration,
msg.Status < 400,
"REQUEST",
); err != nil {
c.checkError("requests", err)
return fmt.Errorf("can't append to requests batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertCustom(session *types.Session, msg *messages.CustomEvent) error {
if err := c.batches["custom"].Append(
session.SessionID,
uint16(session.ProjectID),
datetime(msg.Timestamp),
msg.Name,
msg.Payload,
"CUSTOM",
); err != nil {
c.checkError("custom", err)
return fmt.Errorf("can't append to custom batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertGraphQL(session *types.Session, msg *messages.GraphQLEvent) error {
if err := c.batches["graphql"].Append(
session.SessionID,
uint16(session.ProjectID),
datetime(msg.Timestamp),
msg.OperationName,
nullableString(msg.Variables),
nullableString(msg.Response),
"GRAPHQL",
); err != nil {
c.checkError("graphql", err)
return fmt.Errorf("can't append to graphql batch: %s", err)
}
return nil
}
@ -414,3 +449,7 @@ func datetime(timestamp uint64) time.Time {
}
return t
}
func getSqIdx(messageID uint64) uint {
return uint(messageID % math.MaxInt32)
}