openreplay/ee/backend/pkg/db/clickhouse/connector.go
Alexander f1e4d60ea8 DB improvements (#647)
* feat(backend/db): updated ClickHouse library version from 1.5.4 to 2.2.0
* feat(backend/db): refactored ClickHouse connector
* feat(backend/db): rewritten batch implementation for ClickHouse inserts
* feat(backend/db): found and fix memory leak in db service
2022-07-29 09:23:27 +02:00

416 lines
14 KiB
Go

package clickhouse
import (
"context"
"errors"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"log"
"openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/hashid"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/url"
"strings"
"time"
"openreplay/backend/pkg/license"
)
var CONTEXT_MAP = map[uint64]string{0: "unknown", 1: "self", 2: "same-origin-ancestor", 3: "same-origin-descendant", 4: "same-origin", 5: "cross-origin-ancestor", 6: "cross-origin-descendant", 7: "cross-origin-unreachable", 8: "multiple-contexts"}
var CONTAINER_TYPE_MAP = map[uint64]string{0: "window", 1: "iframe", 2: "embed", 3: "object"}
type Connector interface {
Prepare() error
Commit() error
FinaliseSessionsTable() error
InsertWebSession(session *types.Session) error
InsertWebResourceEvent(session *types.Session, msg *messages.ResourceEvent) error
InsertWebPageEvent(session *types.Session, msg *messages.PageEvent) error
InsertWebClickEvent(session *types.Session, msg *messages.ClickEvent) error
InsertWebInputEvent(session *types.Session, msg *messages.InputEvent) error
InsertWebErrorEvent(session *types.Session, msg *messages.ErrorEvent) error
InsertWebPerformanceTrackAggr(session *types.Session, msg *messages.PerformanceTrackAggr) error
InsertLongtask(session *types.Session, msg *messages.LongTask) error
}
type connectorImpl struct {
conn driver.Conn
batches map[string]driver.Batch
}
func NewConnector(url string) Connector {
license.CheckLicense()
url = strings.TrimPrefix(url, "tcp://")
url = strings.TrimSuffix(url, "/default")
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{url},
Auth: clickhouse.Auth{
Database: "default",
},
MaxOpenConns: 20,
MaxIdleConns: 15,
ConnMaxLifetime: 3 * time.Minute,
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
},
// Debug: true,
})
if err != nil {
log.Fatal(err)
}
c := &connectorImpl{
conn: conn,
batches: make(map[string]driver.Batch, 9),
}
return c
}
func (c *connectorImpl) newBatch(name, query string) error {
batch, err := c.conn.PrepareBatch(context.Background(), query)
if err != nil {
return fmt.Errorf("can't create new batch: %s", err)
}
if _, ok := c.batches[name]; ok {
delete(c.batches, name)
}
c.batches[name] = batch
return nil
}
var batches = map[string]string{
"sessions": "INSERT INTO sessions (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, datetime, duration, pages_count, events_count, errors_count, user_browser, user_browser_version) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"metadata": "INSERT INTO sessions_metadata (session_id, user_id, user_anonymous_id, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10, datetime) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"resources": "INSERT INTO resources (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, url, type, duration, ttfb, header_size, encoded_body_size, decoded_body_size, success) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"pages": "INSERT INTO pages (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, url, request_start, response_start, response_end, dom_content_loaded_event_start, dom_content_loaded_event_end, load_event_start, load_event_end, first_paint, first_contentful_paint, speed_index, visually_complete, time_to_interactive) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"clicks": "INSERT INTO clicks (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, label, hesitation_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"inputs": "INSERT INTO inputs (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, label) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"errors": "INSERT INTO errors (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, source, name, message, error_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"performance": "INSERT INTO performance (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, min_fps, avg_fps, max_fps, min_cpu, avg_cpu, max_cpu, min_total_js_heap_size, avg_total_js_heap_size, max_total_js_heap_size, min_used_js_heap_size, avg_used_js_heap_size, max_used_js_heap_size) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"longtasks": "INSERT INTO longtasks (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, context, container_type, container_id, container_name, container_src) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
}
func (c *connectorImpl) Prepare() error {
for table, query := range batches {
if err := c.newBatch(table, query); err != nil {
return fmt.Errorf("can't create %s batch: %s", table, err)
}
}
return nil
}
func (c *connectorImpl) Commit() error {
for _, b := range c.batches {
if err := b.Send(); err != nil {
return fmt.Errorf("can't send batch: %s", err)
}
}
return nil
}
func (c *connectorImpl) FinaliseSessionsTable() error {
if err := c.conn.Exec(context.Background(), "OPTIMIZE TABLE sessions FINAL"); err != nil {
return fmt.Errorf("can't finalise sessions table: %s", err)
}
return nil
}
func (c *connectorImpl) checkError(name string, err error) {
if err != clickhouse.ErrBatchAlreadySent {
if batchErr := c.newBatch(name, batches[name]); batchErr != nil {
log.Printf("can't create %s batch after failed append operation: %s", name, batchErr)
}
}
}
func (c *connectorImpl) InsertWebSession(session *types.Session) error {
if session.Duration == nil {
return errors.New("trying to insert session with nil duration")
}
if err := c.batches["sessions"].Append(
session.SessionID,
session.ProjectID,
session.TrackerVersion,
nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
datetime(session.Timestamp),
uint32(*session.Duration),
uint16(session.PagesCount),
uint16(session.EventsCount),
uint16(session.ErrorsCount),
// Web unique columns
session.UserBrowser,
nullableString(session.UserBrowserVersion),
); err != nil {
c.checkError("sessions", err)
return fmt.Errorf("can't append to sessions batch: %s", err)
}
if err := c.batches["metadata"].Append(
session.SessionID,
session.UserID,
session.UserAnonymousID,
session.Metadata1,
session.Metadata2,
session.Metadata3,
session.Metadata4,
session.Metadata5,
session.Metadata6,
session.Metadata7,
session.Metadata8,
session.Metadata9,
session.Metadata10,
datetime(session.Timestamp),
); err != nil {
c.checkError("metadata", err)
return fmt.Errorf("can't append to metadata batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertWebResourceEvent(session *types.Session, msg *messages.ResourceEvent) error {
var method interface{} = url.EnsureMethod(msg.Method)
if method == "" {
method = nil
}
if err := c.batches["resources"].Append(
session.SessionID,
session.ProjectID,
session.TrackerVersion,
nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
session.UserBrowser,
nullableString(session.UserBrowserVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
datetime(msg.Timestamp),
url.DiscardURLQuery(msg.URL),
msg.Type,
nullableUint16(uint16(msg.Duration)),
nullableUint16(uint16(msg.TTFB)),
nullableUint16(uint16(msg.HeaderSize)),
nullableUint32(uint32(msg.EncodedBodySize)),
nullableUint32(uint32(msg.DecodedBodySize)),
msg.Success,
); err != nil {
c.checkError("resources", err)
return fmt.Errorf("can't append to resources batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertWebPageEvent(session *types.Session, msg *messages.PageEvent) error {
if err := c.batches["pages"].Append(
session.SessionID,
session.ProjectID,
session.TrackerVersion, nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
session.UserBrowser,
nullableString(session.UserBrowserVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
datetime(msg.Timestamp),
url.DiscardURLQuery(msg.URL),
nullableUint16(uint16(msg.RequestStart)),
nullableUint16(uint16(msg.ResponseStart)),
nullableUint16(uint16(msg.ResponseEnd)),
nullableUint16(uint16(msg.DomContentLoadedEventStart)),
nullableUint16(uint16(msg.DomContentLoadedEventEnd)),
nullableUint16(uint16(msg.LoadEventStart)),
nullableUint16(uint16(msg.LoadEventEnd)),
nullableUint16(uint16(msg.FirstPaint)),
nullableUint16(uint16(msg.FirstContentfulPaint)),
nullableUint16(uint16(msg.SpeedIndex)),
nullableUint16(uint16(msg.VisuallyComplete)),
nullableUint16(uint16(msg.TimeToInteractive)),
); err != nil {
c.checkError("pages", err)
return fmt.Errorf("can't append to pages batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertWebClickEvent(session *types.Session, msg *messages.ClickEvent) error {
if msg.Label == "" {
return nil
}
if err := c.batches["clicks"].Append(
session.SessionID,
session.ProjectID,
session.TrackerVersion,
nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
session.UserBrowser,
nullableString(session.UserBrowserVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
datetime(msg.Timestamp),
msg.Label,
nullableUint32(uint32(msg.HesitationTime)),
); err != nil {
c.checkError("clicks", err)
return fmt.Errorf("can't append to clicks batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertWebInputEvent(session *types.Session, msg *messages.InputEvent) error {
if msg.Label == "" {
return nil
}
if err := c.batches["inputs"].Append(
session.SessionID,
session.ProjectID,
session.TrackerVersion,
nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
session.UserBrowser,
nullableString(session.UserBrowserVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
datetime(msg.Timestamp),
msg.Label,
); err != nil {
c.checkError("inputs", err)
return fmt.Errorf("can't append to inputs batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertWebErrorEvent(session *types.Session, msg *messages.ErrorEvent) error {
if err := c.batches["errors"].Append(
session.SessionID,
session.ProjectID,
session.TrackerVersion,
nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
session.UserBrowser,
nullableString(session.UserBrowserVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
datetime(msg.Timestamp),
msg.Source,
nullableString(msg.Name),
msg.Message,
hashid.WebErrorID(session.ProjectID, msg),
); err != nil {
c.checkError("errors", err)
return fmt.Errorf("can't append to errors batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertWebPerformanceTrackAggr(session *types.Session, msg *messages.PerformanceTrackAggr) error {
var timestamp uint64 = (msg.TimestampStart + msg.TimestampEnd) / 2
if err := c.batches["performance"].Append(
session.SessionID,
session.ProjectID,
session.TrackerVersion,
nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
session.UserBrowser,
nullableString(session.UserBrowserVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
datetime(timestamp),
uint8(msg.MinFPS),
uint8(msg.AvgFPS),
uint8(msg.MaxFPS),
uint8(msg.MinCPU),
uint8(msg.AvgCPU),
uint8(msg.MaxCPU),
msg.MinTotalJSHeapSize,
msg.AvgTotalJSHeapSize,
msg.MaxTotalJSHeapSize,
msg.MinUsedJSHeapSize,
msg.AvgUsedJSHeapSize,
msg.MaxUsedJSHeapSize,
); err != nil {
c.checkError("performance", err)
return fmt.Errorf("can't append to performance batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertLongtask(session *types.Session, msg *messages.LongTask) error {
if err := c.batches["longtasks"].Append(
session.SessionID,
session.ProjectID,
session.TrackerVersion,
nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
session.UserBrowser,
nullableString(session.UserBrowserVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
datetime(msg.Timestamp),
CONTEXT_MAP[msg.Context],
CONTAINER_TYPE_MAP[msg.ContainerType],
msg.ContainerId,
msg.ContainerName,
msg.ContainerSrc,
); err != nil {
c.checkError("longtasks", err)
return fmt.Errorf("can't append to longtasks batch: %s", err)
}
return nil
}
func nullableUint16(v uint16) *uint16 {
var p *uint16 = nil
if v != 0 {
p = &v
}
return p
}
func nullableUint32(v uint32) *uint32 {
var p *uint32 = nil
if v != 0 {
p = &v
}
return p
}
func nullableString(v string) *string {
var p *string = nil
if v != "" {
p = &v
}
return p
}
func datetime(timestamp uint64) time.Time {
t := time.Unix(int64(timestamp/1e3), 0)
// Temporal solution for not correct timestamps in performance messages
if t.Year() < 2022 || t.Year() > 2025 {
return time.Now()
}
return t
}