Adapt CH client for a new PA events table (#2960)

* feat(db): use a new CH events schema

* feat(db): added a missing columns to issue events

* feat(db): correct order of the issue's arguments

* feat(db): crop for url related strings

* feat(db): added missing values

* feat(db): moved materialized columns to json

* feat(db): use the latest ch library with JSON support

* feat(db): added missing duration for requests event
This commit is contained in:
Alexander 2025-01-20 14:21:57 +01:00 committed by GitHub
parent 3f13eeef75
commit 9370a7a50e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 579 additions and 292 deletions

View file

@ -5,7 +5,7 @@ go 1.23
require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.16.0
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.5.0
github.com/ClickHouse/clickhouse-go/v2 v2.30.0
github.com/ClickHouse/clickhouse-go/v2 v2.30.1
github.com/DataDog/datadog-api-client-go/v2 v2.34.0
github.com/Masterminds/semver v1.5.0
github.com/andybalholm/brotli v1.1.1
@ -14,6 +14,7 @@ require (
github.com/confluentinc/confluent-kafka-go/v2 v2.6.1
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/docker/distribution v2.8.3+incompatible
github.com/elastic/go-elasticsearch/v7 v7.17.10
github.com/elastic/go-elasticsearch/v8 v8.17.0
github.com/fernet/fernet-go v0.0.0-20240119011108-303da6aec611
github.com/getsentry/sentry-go v0.30.0
@ -37,12 +38,12 @@ require (
github.com/tomasen/realip v0.0.0-20180522021738-f0c99a92ddce
github.com/ua-parser/uap-go v0.0.0-20241012191800-bbb40edc15aa
go.uber.org/zap v1.27.0
golang.org/x/net v0.33.0
golang.org/x/net v0.34.0
)
require (
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect
github.com/ClickHouse/ch-go v0.61.5 // indirect
github.com/ClickHouse/ch-go v0.63.1 // indirect
github.com/DataDog/zstd v1.5.2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
@ -77,12 +78,13 @@ require (
go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/crypto v0.32.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 // indirect
google.golang.org/protobuf v1.36.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

View file

@ -19,10 +19,10 @@ github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg6
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU=
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/ClickHouse/ch-go v0.61.5 h1:zwR8QbYI0tsMiEcze/uIMK+Tz1D3XZXLdNrlaOpeEI4=
github.com/ClickHouse/ch-go v0.61.5/go.mod h1:s1LJW/F/LcFs5HJnuogFMta50kKDO0lf9zzfrbl0RQg=
github.com/ClickHouse/clickhouse-go/v2 v2.30.0 h1:AG4D/hW39qa58+JHQIFOSnxyL46H6h2lrmGGk17dhFo=
github.com/ClickHouse/ch-go v0.63.1 h1:s2JyZvWLTCSAGdtjMBBmAgQQHMco6pawLJMOXi0FODM=
github.com/ClickHouse/ch-go v0.63.1/go.mod h1:I1kJJCL3WJcBMGe1m+HVK0+nREaG+JOYYBWjrDrF3R0=
github.com/ClickHouse/clickhouse-go/v2 v2.30.0/go.mod h1:i9ZQAojcayW3RsdCb3YR+n+wC2h65eJsZCscZ1Z1wyo=
github.com/ClickHouse/clickhouse-go/v2 v2.30.1/go.mod h1:szk8BMoQV/NgHXZ20ZbwDyvPWmpfhRKjFkc6wzASGxM=
github.com/DataDog/datadog-api-client-go/v2 v2.34.0 h1:0VVmv8uZg8vdBuEpiF2nBGUezl2QITrxdEsLgh38j8M=
github.com/DataDog/datadog-api-client-go/v2 v2.34.0/go.mod h1:d3tOEgUd2kfsr9uuHQdY+nXrWp4uikgTgVCPdKNK30U=
github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8=
@ -130,8 +130,8 @@ github.com/docker/compose/v2 v2.28.1 h1:ORPfiVHrpnRQBDoC3F8JJyWAY8N5gWuo3Fgwyivx
github.com/docker/compose/v2 v2.28.1/go.mod h1:wDtGQFHe99sPLCHXeVbCkc+Wsl4Y/2ZxiAJa/nga6rA=
github.com/docker/distribution v2.8.3+incompatible h1:AtKxIZ36LoNK51+Z6RpzLpddBirtxJnzDrHLEKxTAYk=
github.com/docker/distribution v2.8.3+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
github.com/docker/docker v27.3.0+incompatible h1:BNb1QY6o4JdKpqwi9IB+HUYcRRrVN4aGFUTvDmWYK1A=
github.com/docker/docker v27.3.0+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/docker v27.4.1+incompatible h1:ZJvcY7gfwHn1JF48PfbyXg7Jyt9ZCWDW+GGXOIxEwp4=
github.com/docker/docker v27.4.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/docker-credential-helpers v0.8.0 h1:YQFtbBQb4VrpoPxhFuzEBPQ9E16qz5SpHLS+uswaCp8=
github.com/docker/docker-credential-helpers v0.8.0/go.mod h1:UGFXcuoQ5TxPiB54nHOZ32AWRqQdECoh/Mg0AlEYb40=
github.com/docker/go v1.5.1-1.0.20160303222718-d30aec9fd63c h1:lzqkGL9b3znc+ZUgi7FlLnqjQhcXxkNM/quxIjBVMD0=
@ -146,6 +146,8 @@ github.com/eiannone/keyboard v0.0.0-20220611211555-0d226195f203 h1:XBBHcIb256gUJ
github.com/eiannone/keyboard v0.0.0-20220611211555-0d226195f203/go.mod h1:E1jcSv8FaEny+OP/5k9UxZVw9YFWGj7eI4KR/iOBqCg=
github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA=
github.com/elastic/elastic-transport-go/v8 v8.6.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk=
github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo=
github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/elastic/go-elasticsearch/v8 v8.17.0 h1:e9cWksE/Fr7urDRmGPGp47Nsp4/mvNOrU8As1l2HQQ0=
github.com/elastic/go-elasticsearch/v8 v8.17.0/go.mod h1:lGMlgKIbYoRvay3xWBeKahAiJOgmFDsjZC39nmO3H64=
github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g=
@ -500,8 +502,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/testcontainers/testcontainers-go v0.33.0 h1:zJS9PfXYT5O0ZFXM2xxXfk4J5UMw/kRiISng037Gxdw=
github.com/testcontainers/testcontainers-go v0.33.0/go.mod h1:W80YpTa8D5C3Yy16icheD01UTDu+LmXIA2Keo+jWtT8=
github.com/testcontainers/testcontainers-go/modules/compose v0.33.0 h1:PyrUOF+zG+xrS3p+FesyVxMI+9U+7pwhZhyFozH3jKY=
@ -542,6 +544,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw=
github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
github.com/zavorotynskiy/clickhouse-go/v2 v2.30.1-0.20250110230205-a22b6d524deb h1:3gQycbGOJreRQkKORHjCn+wEPpnlra6Ek5TnW78z5vc=
github.com/zavorotynskiy/clickhouse-go/v2 v2.30.1-0.20250110230205-a22b6d524deb/go.mod h1:szk8BMoQV/NgHXZ20ZbwDyvPWmpfhRKjFkc6wzASGxM=
github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q=
go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 h1:4Pp6oUg3+e/6M4C0A/3kJ2VYa++dsWVTtGgLVj5xtHg=
@ -608,8 +612,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZPQ=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 h1:hNQpMuAJe5CtcUqCXaWga3FHu+kQvCqcsoVaQgSV60o=
golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
@ -632,8 +636,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs=
golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@ -666,16 +670,16 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q=
golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM=
golang.org/x/term v0.28.0 h1:/Ts8HFuMR2E6IP/jlo7QVLZHggjKQbhu/7H0LJFr3Gg=
golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=

View file

@ -1,14 +1,18 @@
package clickhouse
import (
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"hash/fnv"
"log"
"strings"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/google/uuid"
"openreplay/backend/internal/config/common"
"openreplay/backend/pkg/db/types"
@ -22,7 +26,6 @@ type Connector interface {
Prepare() error
Commit() error
Stop() error
// Web
InsertWebSession(session *sessions.Session) error
InsertWebPageEvent(session *sessions.Session, msg *messages.PageEvent) error
InsertWebClickEvent(session *sessions.Session, msg *messages.MouseClick) error
@ -35,7 +38,6 @@ type Connector interface {
InsertIssue(session *sessions.Session, msg *messages.IssueEvent) error
InsertWebInputDuration(session *sessions.Session, msg *messages.InputChange) error
InsertMouseThrashing(session *sessions.Session, msg *messages.MouseThrashing) error
// Mobile
InsertMobileSession(session *sessions.Session) error
InsertMobileCustom(session *sessions.Session, msg *messages.MobileEvent) error
InsertMobileClick(session *sessions.Session, msg *messages.MobileClickEvent) error
@ -101,27 +103,25 @@ func (c *connectorImpl) newBatch(name, query string) error {
}
var batches = map[string]string{
// Web
"sessions": "INSERT INTO experimental.sessions (session_id, project_id, user_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, user_state, user_city, datetime, duration, pages_count, events_count, errors_count, issue_score, referrer, issue_types, tracker_version, user_browser, user_browser_version, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10, timezone, utm_source, utm_medium, utm_campaign) VALUES (?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), ?, ?, ?, ?)",
"autocompletes": "INSERT INTO experimental.autocomplete (project_id, type, value) VALUES (?, ?, SUBSTR(?, 1, 8000))",
"pages": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, url, request_start, response_start, response_end, dom_content_loaded_event_start, dom_content_loaded_event_end, load_event_start, load_event_end, first_paint, first_contentful_paint_time, speed_index, visually_complete, time_to_interactive, url_path, event_type) VALUES (?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), ?)",
"clicks": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, label, hesitation_time, event_type, selector, normalized_x, normalized_y, url, url_path) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000))",
"inputs": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, label, event_type, duration, hesitation_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
"errors": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, source, name, message, error_id, event_type, error_tags_keys, error_tags_values) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"performance": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, url, min_fps, avg_fps, max_fps, min_cpu, avg_cpu, max_cpu, min_total_js_heap_size, avg_total_js_heap_size, max_total_js_heap_size, min_used_js_heap_size, avg_used_js_heap_size, max_used_js_heap_size, event_type) VALUES (?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"requests": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, url, request_body, response_body, status, method, duration, success, event_type, transfer_size, url_path) VALUES (?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000))",
"custom": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, name, payload, event_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
"graphql": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, name, request_body, response_body, event_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
"issuesEvents": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, issue_id, issue_type, event_type, url, url_path) VALUES (?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000))",
"issues": "INSERT INTO experimental.issues (project_id, issue_id, type, context_string) VALUES (?, ?, ?, ?)",
//Mobile
"ios_sessions": "INSERT INTO experimental.sessions (session_id, project_id, user_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, user_state, user_city, datetime, duration, pages_count, events_count, errors_count, issue_score, referrer, issue_types, tracker_version, user_browser, user_browser_version, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10, platform, timezone) VALUES (?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), ?, ?)",
"ios_custom": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, name, payload, event_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
"ios_clicks": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, label, event_type) VALUES (?, ?, ?, ?, ?, ?)",
"ios_swipes": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, label, direction, event_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
"ios_inputs": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, label, event_type) VALUES (?, ?, ?, ?, ?, ?)",
"ios_requests": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, url, request_body, response_body, status, method, duration, success, event_type) VALUES (?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?)",
"ios_crashes": "INSERT INTO experimental.ios_events (session_id, project_id, message_id, datetime, name, reason, stacktrace, event_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
"sessions": "INSERT INTO experimental.sessions (session_id, project_id, user_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, user_state, user_city, datetime, duration, pages_count, events_count, errors_count, issue_score, referrer, issue_types, tracker_version, user_browser, user_browser_version, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10, timezone, utm_source, utm_medium, utm_campaign) VALUES (?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), ?, ?, ?, ?)",
"autocompletes": "INSERT INTO experimental.autocomplete (project_id, type, value) VALUES (?, ?, SUBSTR(?, 1, 8000))",
"pages": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$current_url", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"clicks": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$current_url", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"inputs": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$duration_s", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"errors": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"performance": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"requests": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$duration_s", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"custom": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"graphql": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"issuesEvents": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", issue_type, issue_id, "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"issues": "INSERT INTO experimental.issues (project_id, issue_id, type, context_string) VALUES (?, ?, ?, ?)",
"mobile_sessions": "INSERT INTO experimental.sessions (session_id, project_id, user_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, user_state, user_city, datetime, duration, pages_count, events_count, errors_count, issue_score, referrer, issue_types, tracker_version, user_browser, user_browser_version, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10, platform, timezone) VALUES (?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, SUBSTR(?, 1, 8000), ?, ?, ?, ?, SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), SUBSTR(?, 1, 8000), ?, ?)",
"mobile_custom": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"mobile_clicks": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"mobile_swipes": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"mobile_inputs": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"mobile_requests": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"mobile_crashes": `INSERT INTO product_analytics.events (session_id, project_id, event_id, "$event_name", created_at, "$time", distinct_id, "$auto_captured", "$properties") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
}
func (c *connectorImpl) Prepare() error {
@ -175,96 +175,6 @@ func (c *connectorImpl) worker() {
}
}
func (c *connectorImpl) checkError(name string, err error) {
if err != clickhouse.ErrBatchAlreadySent {
log.Printf("can't create %s batch after failed append operation: %s", name, err)
}
}
func (c *connectorImpl) InsertWebInputDuration(session *sessions.Session, msg *messages.InputChange) error {
if msg.Label == "" {
return nil
}
if err := c.batches["inputs"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MsgID(),
datetime(msg.Timestamp),
msg.Label,
"INPUT",
nullableUint16(uint16(msg.InputDuration)),
nullableUint32(uint32(msg.HesitationTime)),
); err != nil {
c.checkError("inputs", err)
return fmt.Errorf("can't append to inputs batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertMouseThrashing(session *sessions.Session, msg *messages.MouseThrashing) error {
issueID := hashid.MouseThrashingID(session.ProjectID, session.SessionID, msg.Timestamp)
// Insert issue event to batches
if err := c.batches["issuesEvents"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MsgID(),
datetime(msg.Timestamp),
issueID,
"mouse_thrashing",
"ISSUE",
msg.Url,
extractUrlPath(msg.Url),
); err != nil {
c.checkError("issuesEvents", err)
return fmt.Errorf("can't append to issuesEvents batch: %s", err)
}
if err := c.batches["issues"].Append(
uint16(session.ProjectID),
issueID,
"mouse_thrashing",
msg.Url,
); err != nil {
c.checkError("issues", err)
return fmt.Errorf("can't append to issues batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertIssue(session *sessions.Session, msg *messages.IssueEvent) error {
issueID := hashid.IssueID(session.ProjectID, msg)
// Check issue type before insert to avoid panic from clickhouse lib
switch msg.Type {
case "click_rage", "dead_click", "excessive_scrolling", "bad_request", "missing_resource", "memory", "cpu", "slow_resource", "slow_page_load", "crash", "ml_cpu", "ml_memory", "ml_dead_click", "ml_click_rage", "ml_mouse_thrashing", "ml_excessive_scrolling", "ml_slow_resources", "custom", "js_exception", "mouse_thrashing", "app_crash":
default:
return fmt.Errorf("unknown issueType: %s", msg.Type)
}
// Insert issue event to batches
if err := c.batches["issuesEvents"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MessageID,
datetime(msg.Timestamp),
issueID,
msg.Type,
"ISSUE",
msg.URL,
extractUrlPath(msg.URL),
); err != nil {
c.checkError("issuesEvents", err)
return fmt.Errorf("can't append to issuesEvents batch: %s", err)
}
if err := c.batches["issues"].Append(
uint16(session.ProjectID),
issueID,
msg.Type,
msg.ContextString,
); err != nil {
c.checkError("issues", err)
return fmt.Errorf("can't append to issues batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertWebSession(session *sessions.Session) error {
if session.Duration == nil {
return errors.New("trying to insert session with nil duration")
@ -313,40 +223,217 @@ func (c *connectorImpl) InsertWebSession(session *sessions.Session) error {
return nil
}
func extractUrlPath(fullUrl string) string {
_, path, query, err := url.GetURLParts(fullUrl)
func (c *connectorImpl) InsertAutocomplete(session *sessions.Session, msgType, msgValue string) error {
if len(msgValue) == 0 {
return nil
}
if err := c.batches["autocompletes"].Append(
uint16(session.ProjectID),
msgType,
msgValue,
); err != nil {
c.checkError("autocompletes", err)
return fmt.Errorf("can't append to autocompletes batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertWebInputDuration(session *sessions.Session, msg *messages.InputChange) error {
if msg.Label == "" {
return nil
}
jsonString, err := json.Marshal(map[string]interface{}{
"label": msg.Label,
"hesitation_time": nullableUint32(uint32(msg.HesitationTime)),
})
if err != nil {
log.Printf("can't parse url: %s", err)
return ""
return fmt.Errorf("can't marshal input event: %s", err)
}
pathQuery := path
if query != "" {
pathQuery += "?" + query
eventTime := datetime(msg.Timestamp)
if err := c.batches["inputs"].Append(
session.SessionID,
uint16(session.ProjectID),
getUUID(msg),
"INPUT",
eventTime,
eventTime.Unix(),
session.UserUUID,
true,
nullableUint16(uint16(msg.InputDuration)),
jsonString,
); err != nil {
c.checkError("inputs", err)
return fmt.Errorf("can't append to inputs batch: %s", err)
}
return strings.ToLower(pathQuery)
return nil
}
func (c *connectorImpl) InsertMouseThrashing(session *sessions.Session, msg *messages.MouseThrashing) error {
issueID := hashid.MouseThrashingID(session.ProjectID, session.SessionID, msg.Timestamp)
host, path, hostpath, err := extractUrlParts(msg.Url)
if err != nil {
return fmt.Errorf("can't extract url parts: %s", err)
}
jsonString, err := json.Marshal(map[string]interface{}{
"issue_id": issueID,
"issue_type": "mouse_thrashing",
"url": cropString(msg.Url),
"url_host": host,
"url_path": path,
"url_hostpath": hostpath,
})
if err != nil {
return fmt.Errorf("can't marshal issue event: %s", err)
}
eventTime := datetime(msg.Timestamp)
if err := c.batches["issuesEvents"].Append(
session.SessionID,
uint16(session.ProjectID),
getUUID(msg),
"ISSUE",
eventTime,
eventTime.Unix(),
session.UserUUID,
true,
"mouse_thrashing",
issueID,
jsonString,
); err != nil {
c.checkError("issuesEvents", err)
return fmt.Errorf("can't append to issuesEvents batch: %s", err)
}
if err := c.batches["issues"].Append(
uint16(session.ProjectID),
issueID,
"mouse_thrashing",
msg.Url,
); err != nil {
c.checkError("issues", err)
return fmt.Errorf("can't append to issues batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertIssue(session *sessions.Session, msg *messages.IssueEvent) error {
issueID := hashid.IssueID(session.ProjectID, msg)
// Check issue type before insert to avoid panic from clickhouse lib
switch msg.Type {
case "click_rage", "dead_click", "excessive_scrolling", "bad_request", "missing_resource", "memory", "cpu", "slow_resource", "slow_page_load", "crash", "ml_cpu", "ml_memory", "ml_dead_click", "ml_click_rage", "ml_mouse_thrashing", "ml_excessive_scrolling", "ml_slow_resources", "custom", "js_exception", "mouse_thrashing", "app_crash":
default:
return fmt.Errorf("unknown issueType: %s", msg.Type)
}
host, path, hostpath, err := extractUrlParts(msg.Url)
if err != nil {
return fmt.Errorf("can't extract url parts: %s", err)
}
jsonString, err := json.Marshal(map[string]interface{}{
"issue_id": issueID,
"issue_type": msg.Type,
"url": cropString(msg.Url),
"url_host": host,
"url_path": path,
"url_hostpath": hostpath,
})
if err != nil {
return fmt.Errorf("can't marshal issue event: %s", err)
}
eventTime := datetime(msg.Timestamp)
if err := c.batches["issuesEvents"].Append(
session.SessionID,
uint16(session.ProjectID),
getUUID(msg),
"ISSUE",
eventTime,
eventTime.Unix(),
session.UserUUID,
true,
msg.Type,
issueID,
jsonString,
); err != nil {
c.checkError("issuesEvents", err)
return fmt.Errorf("can't append to issuesEvents batch: %s", err)
}
if err := c.batches["issues"].Append(
uint16(session.ProjectID),
issueID,
msg.Type,
msg.ContextString,
); err != nil {
c.checkError("issues", err)
return fmt.Errorf("can't append to issues batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertWebPageEvent(session *sessions.Session, msg *messages.PageEvent) error {
host, path, hostpath, err := extractUrlParts(msg.URL)
if err != nil {
return fmt.Errorf("can't extract url parts: %s", err)
}
ttfb := nullableUint16(0)
if msg.ResponseStart >= msg.RequestStart {
ttfb = nullableUint16(uint16(msg.ResponseStart - msg.RequestStart))
}
ttlb := nullableUint16(0)
if msg.ResponseEnd >= msg.RequestStart {
ttlb = nullableUint16(uint16(msg.ResponseEnd - msg.RequestStart))
}
responseTime := nullableUint16(0)
if msg.ResponseEnd >= msg.ResponseStart {
responseTime = nullableUint16(uint16(msg.ResponseEnd - msg.ResponseStart))
}
domBuildingTime := nullableUint16(0)
if msg.DomContentLoadedEventStart >= msg.ResponseEnd {
domBuildingTime = nullableUint16(uint16(msg.DomContentLoadedEventStart - msg.ResponseEnd))
}
domContentLoadedEventTime := nullableUint16(0)
if msg.DomContentLoadedEventEnd >= msg.DomContentLoadedEventStart {
domContentLoadedEventTime = nullableUint16(uint16(msg.DomContentLoadedEventEnd - msg.DomContentLoadedEventStart))
}
loadEventTime := nullableUint16(0)
if msg.LoadEventEnd >= msg.LoadEventStart {
loadEventTime = nullableUint16(uint16(msg.LoadEventEnd - msg.LoadEventStart))
}
jsonString, err := json.Marshal(map[string]interface{}{
"request_start": nullableUint16(uint16(msg.RequestStart)),
"response_start": nullableUint16(uint16(msg.ResponseStart)),
"response_end": nullableUint16(uint16(msg.ResponseEnd)),
"dom_content_loaded_event_start": nullableUint16(uint16(msg.DomContentLoadedEventStart)),
"dom_content_loaded_event_end": nullableUint16(uint16(msg.DomContentLoadedEventEnd)),
"load_event_start": nullableUint16(uint16(msg.LoadEventStart)),
"load_event_end": nullableUint16(uint16(msg.LoadEventEnd)),
"first_paint": nullableUint16(uint16(msg.FirstPaint)),
"first_contentful_paint_time": nullableUint16(uint16(msg.FirstContentfulPaint)),
"speed_index": nullableUint16(uint16(msg.SpeedIndex)),
"visually_complete": nullableUint16(uint16(msg.VisuallyComplete)),
"time_to_interactive": nullableUint16(uint16(msg.TimeToInteractive)),
"url": cropString(msg.URL),
"url_host": host,
"url_path": path,
"url_hostpath": hostpath,
"ttfb": ttfb,
"ttlb": ttlb,
"response_time": responseTime,
"dom_building_time": domBuildingTime,
"dom_content_loaded_event_time": domContentLoadedEventTime,
"load_event_time": loadEventTime,
})
if err != nil {
return fmt.Errorf("can't marshal page event: %s", err)
}
eventTime := datetime(msg.Timestamp)
if err := c.batches["pages"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MessageID,
datetime(msg.Timestamp),
msg.URL,
nullableUint16(uint16(msg.RequestStart)),
nullableUint16(uint16(msg.ResponseStart)),
nullableUint16(uint16(msg.ResponseEnd)),
nullableUint16(uint16(msg.DomContentLoadedEventStart)),
nullableUint16(uint16(msg.DomContentLoadedEventEnd)),
nullableUint16(uint16(msg.LoadEventStart)),
nullableUint16(uint16(msg.LoadEventEnd)),
nullableUint16(uint16(msg.FirstPaint)),
nullableUint16(uint16(msg.FirstContentfulPaint)),
nullableUint16(uint16(msg.SpeedIndex)),
nullableUint16(uint16(msg.VisuallyComplete)),
nullableUint16(uint16(msg.TimeToInteractive)),
extractUrlPath(msg.URL),
getUUID(msg),
"LOCATION",
eventTime,
eventTime.Unix(),
session.UserUUID,
true,
cropString(msg.URL),
jsonString,
); err != nil {
c.checkError("pages", err)
return fmt.Errorf("can't append to pages batch: %s", err)
@ -373,19 +460,36 @@ func (c *connectorImpl) InsertWebClickEvent(session *sessions.Session, msg *mess
nYVal := normalizedY
nY = &nYVal
}
host, path, hostpath, err := extractUrlParts(msg.Url)
if err != nil {
return fmt.Errorf("can't extract url parts: %s", err)
}
jsonString, err := json.Marshal(map[string]interface{}{
"label": msg.Label,
"hesitation_time": nullableUint32(uint32(msg.HesitationTime)),
"selector": msg.Selector,
"normalized_x": nX,
"normalized_y": nY,
"url": cropString(msg.Url),
"url_host": host,
"url_path": path,
"url_hostpath": hostpath,
})
if err != nil {
return fmt.Errorf("can't marshal click event: %s", err)
}
eventTime := datetime(msg.Timestamp)
if err := c.batches["clicks"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MsgID(),
datetime(msg.Timestamp),
msg.Label,
nullableUint32(uint32(msg.HesitationTime)),
getUUID(msg),
"CLICK",
msg.Selector,
nX,
nY,
msg.Url,
extractUrlPath(msg.Url),
eventTime,
eventTime.Unix(),
session.UserUUID,
true,
cropString(msg.Url),
jsonString,
); err != nil {
c.checkError("clicks", err)
return fmt.Errorf("can't append to clicks batch: %s", err)
@ -406,19 +510,28 @@ func (c *connectorImpl) InsertWebErrorEvent(session *sessions.Session, msg *type
return fmt.Errorf("unknown error source: %s", msg.Source)
}
msgID, _ := msg.ID(session.ProjectID)
// Insert event to batch
jsonString, err := json.Marshal(map[string]interface{}{
"source": msg.Source,
"name": nullableString(msg.Name),
"message": msg.Message,
"error_id": msgID,
"error_tags_keys": keys,
"error_tags_values": values,
})
if err != nil {
return fmt.Errorf("can't marshal error event: %s", err)
}
eventTime := datetime(msg.Timestamp)
if err := c.batches["errors"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MessageID,
datetime(msg.Timestamp),
msg.Source,
nullableString(msg.Name),
msg.Message,
msgID,
msg.GetUUID(session.SessionID),
"ERROR",
keys,
values,
eventTime,
eventTime.Unix(),
session.UserUUID,
true,
jsonString,
); err != nil {
c.checkError("errors", err)
return fmt.Errorf("can't append to errors batch: %s", err)
@ -427,26 +540,43 @@ func (c *connectorImpl) InsertWebErrorEvent(session *sessions.Session, msg *type
}
func (c *connectorImpl) InsertWebPerformanceTrackAggr(session *sessions.Session, msg *messages.PerformanceTrackAggr) error {
var timestamp uint64 = (msg.TimestampStart + msg.TimestampEnd) / 2
var timestamp = (msg.TimestampStart + msg.TimestampEnd) / 2
host, path, hostpath, err := extractUrlParts(msg.Url)
if err != nil {
return fmt.Errorf("can't extract url parts: %s", err)
}
jsonString, err := json.Marshal(map[string]interface{}{
"url": cropString(msg.Url),
"url_host": host,
"url_path": path,
"url_hostpath": hostpath,
"min_fps": uint8(msg.MinFPS),
"avg_fps": uint8(msg.AvgFPS),
"max_fps": uint8(msg.MaxFPS),
"min_cpu": uint8(msg.MinCPU),
"avg_cpu": uint8(msg.AvgCPU),
"max_cpu": uint8(msg.MaxCPU),
"min_total_js_heap_size": msg.MinTotalJSHeapSize,
"avg_total_js_heap_size": msg.AvgTotalJSHeapSize,
"max_total_js_heap_size": msg.MaxTotalJSHeapSize,
"min_used_js_heap_size": msg.MinUsedJSHeapSize,
"avg_used_js_heap_size": msg.AvgUsedJSHeapSize,
"max_used_js_heap_size": msg.MaxUsedJSHeapSize,
})
if err != nil {
return fmt.Errorf("can't marshal performance event: %s", err)
}
eventTime := datetime(timestamp)
if err := c.batches["performance"].Append(
session.SessionID,
uint16(session.ProjectID),
uint64(0), // TODO: find messageID for performance events
datetime(timestamp),
nullableString(msg.Meta().Url),
uint8(msg.MinFPS),
uint8(msg.AvgFPS),
uint8(msg.MaxFPS),
uint8(msg.MinCPU),
uint8(msg.AvgCPU),
uint8(msg.MaxCPU),
msg.MinTotalJSHeapSize,
msg.AvgTotalJSHeapSize,
msg.MaxTotalJSHeapSize,
msg.MinUsedJSHeapSize,
msg.AvgUsedJSHeapSize,
msg.MaxUsedJSHeapSize,
getUUID(msg),
"PERFORMANCE",
eventTime,
eventTime.Unix(),
session.UserUUID,
true,
jsonString,
); err != nil {
c.checkError("performance", err)
return fmt.Errorf("can't append to performance batch: %s", err)
@ -454,21 +584,6 @@ func (c *connectorImpl) InsertWebPerformanceTrackAggr(session *sessions.Session,
return nil
}
func (c *connectorImpl) InsertAutocomplete(session *sessions.Session, msgType, msgValue string) error {
if len(msgValue) == 0 {
return nil
}
if err := c.batches["autocompletes"].Append(
uint16(session.ProjectID),
msgType,
msgValue,
); err != nil {
c.checkError("autocompletes", err)
return fmt.Errorf("can't append to autocompletes batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertRequest(session *sessions.Session, msg *messages.NetworkRequest, savePayload bool) error {
urlMethod := url.EnsureMethod(msg.Method)
if urlMethod == "" {
@ -479,21 +594,37 @@ func (c *connectorImpl) InsertRequest(session *sessions.Session, msg *messages.N
request = &msg.Request
response = &msg.Response
}
host, path, hostpath, err := extractUrlParts(msg.URL)
if err != nil {
return fmt.Errorf("can't extract url parts: %s", err)
}
jsonString, err := json.Marshal(map[string]interface{}{
"request_body": request,
"response_body": response,
"status": uint16(msg.Status),
"method": url.EnsureMethod(msg.Method),
"success": msg.Status < 400,
"transfer_size": uint32(msg.TransferredBodySize),
"url": cropString(msg.URL),
"url_host": host,
"url_path": path,
"url_hostpath": hostpath,
})
if err != nil {
return fmt.Errorf("can't marshal request event: %s", err)
}
eventTime := datetime(msg.Timestamp)
if err := c.batches["requests"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.Meta().Index,
datetime(uint64(msg.Meta().Timestamp)),
msg.URL,
request,
response,
uint16(msg.Status),
url.EnsureMethod(msg.Method),
uint16(msg.Duration),
msg.Status < 400,
getUUID(msg),
"REQUEST",
uint32(msg.TransferredBodySize),
extractUrlPath(msg.URL),
eventTime,
eventTime.Unix(),
session.UserUUID,
true,
nullableUint16(uint16(msg.Duration)),
jsonString,
); err != nil {
c.checkError("requests", err)
return fmt.Errorf("can't append to requests batch: %s", err)
@ -502,14 +633,24 @@ func (c *connectorImpl) InsertRequest(session *sessions.Session, msg *messages.N
}
func (c *connectorImpl) InsertCustom(session *sessions.Session, msg *messages.CustomEvent) error {
jsonString, err := json.Marshal(map[string]interface{}{
"name": msg.Name,
"payload": msg.Payload,
})
if err != nil {
return fmt.Errorf("can't marshal custom event: %s", err)
}
eventTime := datetime(msg.Timestamp)
if err := c.batches["custom"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.Meta().Index,
datetime(uint64(msg.Meta().Timestamp)),
msg.Name,
msg.Payload,
getUUID(msg),
"CUSTOM",
eventTime,
eventTime.Unix(),
session.UserUUID,
true,
jsonString,
); err != nil {
c.checkError("custom", err)
return fmt.Errorf("can't append to custom batch: %s", err)
@ -518,15 +659,25 @@ func (c *connectorImpl) InsertCustom(session *sessions.Session, msg *messages.Cu
}
func (c *connectorImpl) InsertGraphQL(session *sessions.Session, msg *messages.GraphQL) error {
jsonString, err := json.Marshal(map[string]interface{}{
"name": msg.OperationName,
"request_body": nullableString(msg.Variables),
"response_body": nullableString(msg.Response),
})
if err != nil {
return fmt.Errorf("can't marshal graphql event: %s", err)
}
eventTime := datetime(msg.Timestamp)
if err := c.batches["graphql"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.Meta().Index,
datetime(uint64(msg.Meta().Timestamp)),
msg.OperationName,
nullableString(msg.Variables),
nullableString(msg.Response),
getUUID(msg),
"GRAPHQL",
eventTime,
eventTime.Unix(),
session.UserUUID,
true,
jsonString,
); err != nil {
c.checkError("graphql", err)
return fmt.Errorf("can't append to graphql batch: %s", err)
@ -540,7 +691,7 @@ func (c *connectorImpl) InsertMobileSession(session *sessions.Session) error {
if session.Duration == nil {
return errors.New("trying to insert mobile session with nil duration")
}
if err := c.batches["ios_sessions"].Append(
if err := c.batches["mobile_sessions"].Append(
session.SessionID,
uint16(session.ProjectID),
session.UserID,
@ -576,23 +727,33 @@ func (c *connectorImpl) InsertMobileSession(session *sessions.Session) error {
"ios",
session.Timezone,
); err != nil {
c.checkError("ios_sessions", err)
c.checkError("mobile_sessions", err)
return fmt.Errorf("can't append to sessions batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertMobileCustom(session *sessions.Session, msg *messages.MobileEvent) error {
if err := c.batches["ios_custom"].Append(
jsonString, err := json.Marshal(map[string]interface{}{
"name": msg.Name,
"payload": msg.Payload,
})
if err != nil {
return fmt.Errorf("can't marshal mobile custom event: %s", err)
}
eventTime := datetime(msg.Timestamp)
if err := c.batches["mobile_custom"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.Meta().Index,
datetime(uint64(msg.Meta().Timestamp)),
msg.Name,
msg.Payload,
getUUID(msg),
"CUSTOM",
eventTime,
eventTime.Unix(),
session.UserUUID,
true,
jsonString,
); err != nil {
c.checkError("ios_custom", err)
c.checkError("mobile_custom", err)
return fmt.Errorf("can't append to mobile custom batch: %s", err)
}
return nil
@ -602,15 +763,25 @@ func (c *connectorImpl) InsertMobileClick(session *sessions.Session, msg *messag
if msg.Label == "" {
return nil
}
if err := c.batches["ios_clicks"].Append(
jsonString, err := json.Marshal(map[string]interface{}{
"label": msg.Label,
})
if err != nil {
return fmt.Errorf("can't marshal mobile clicks event: %s", err)
}
eventTime := datetime(msg.Timestamp)
if err := c.batches["mobile_clicks"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MsgID(),
datetime(msg.Timestamp),
msg.Label,
getUUID(msg),
"TAP",
eventTime,
eventTime.Unix(),
session.UserUUID,
true,
jsonString,
); err != nil {
c.checkError("ios_clicks", err)
c.checkError("mobile_clicks", err)
return fmt.Errorf("can't append to mobile clicks batch: %s", err)
}
return nil
@ -620,17 +791,27 @@ func (c *connectorImpl) InsertMobileSwipe(session *sessions.Session, msg *messag
if msg.Label == "" {
return nil
}
if err := c.batches["ios_swipes"].Append(
jsonString, err := json.Marshal(map[string]interface{}{
"label": msg.Label,
"direction": nullableString(msg.Direction),
})
if err != nil {
return fmt.Errorf("can't marshal mobile swipe event: %s", err)
}
eventTime := datetime(msg.Timestamp)
if err := c.batches["mobile_swipes"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MsgID(),
datetime(msg.Timestamp),
msg.Label,
nullableString(msg.Direction),
getUUID(msg),
"SWIPE",
eventTime,
eventTime.Unix(),
session.UserUUID,
true,
jsonString,
); err != nil {
c.checkError("ios_clicks", err)
return fmt.Errorf("can't append to mobile clicks batch: %s", err)
c.checkError("mobile_swipes", err)
return fmt.Errorf("can't append to mobile swipe batch: %s", err)
}
return nil
}
@ -639,15 +820,25 @@ func (c *connectorImpl) InsertMobileInput(session *sessions.Session, msg *messag
if msg.Label == "" {
return nil
}
if err := c.batches["ios_inputs"].Append(
jsonString, err := json.Marshal(map[string]interface{}{
"label": msg.Label,
})
if err != nil {
return fmt.Errorf("can't marshal mobile input event: %s", err)
}
eventTime := datetime(msg.Timestamp)
if err := c.batches["mobile_inputs"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MsgID(),
datetime(msg.Timestamp),
msg.Label,
getUUID(msg),
"INPUT",
eventTime,
eventTime.Unix(),
session.UserUUID,
true,
jsonString,
); err != nil {
c.checkError("ios_inputs", err)
c.checkError("mobile_inputs", err)
return fmt.Errorf("can't append to mobile inputs batch: %s", err)
}
return nil
@ -663,39 +854,103 @@ func (c *connectorImpl) InsertMobileRequest(session *sessions.Session, msg *mess
request = &msg.Request
response = &msg.Response
}
if err := c.batches["ios_requests"].Append(
jsonString, err := json.Marshal(map[string]interface{}{
"url": cropString(msg.URL),
"request_body": request,
"response_body": response,
"status": uint16(msg.Status),
"method": url.EnsureMethod(msg.Method),
"duration": uint16(msg.Duration),
"success": msg.Status < 400,
})
if err != nil {
return fmt.Errorf("can't marshal mobile request event: %s", err)
}
eventTime := datetime(msg.Timestamp)
if err := c.batches["mobile_requests"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.Meta().Index,
datetime(uint64(msg.Meta().Timestamp)),
msg.URL,
request,
response,
uint16(msg.Status),
url.EnsureMethod(msg.Method),
uint16(msg.Duration),
msg.Status < 400,
getUUID(msg),
"REQUEST",
eventTime,
eventTime.Unix(),
session.UserUUID,
true,
jsonString,
); err != nil {
c.checkError("ios_requests", err)
c.checkError("mobile_requests", err)
return fmt.Errorf("can't append to mobile requests batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertMobileCrash(session *sessions.Session, msg *messages.MobileCrash) error {
if err := c.batches["ios_crashes"].Append(
jsonString, err := json.Marshal(map[string]interface{}{
"name": msg.Name,
"reason": msg.Reason,
"stacktrace": msg.Stacktrace,
})
if err != nil {
return fmt.Errorf("can't marshal mobile crash event: %s", err)
}
eventTime := datetime(msg.Timestamp)
if err := c.batches["mobile_crashes"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MsgID(),
datetime(msg.Timestamp),
msg.Name,
msg.Reason,
msg.Stacktrace,
getUUID(msg),
"CRASH",
eventTime,
eventTime.Unix(),
session.UserUUID,
true,
jsonString,
); err != nil {
c.checkError("ios_crashes", err)
return fmt.Errorf("can't append to mobile crashges batch: %s", err)
c.checkError("mobile_crashes", err)
return fmt.Errorf("can't append to mobile crashs batch: %s", err)
}
return nil
}
func (c *connectorImpl) checkError(name string, err error) {
if !errors.Is(err, clickhouse.ErrBatchAlreadySent) {
log.Printf("can't create %s batch after failed append operation: %s", name, err)
}
}
func extractUrlParts(fullUrl string) (string, string, string, error) {
host, path, query, err := url.GetURLParts(strings.ToLower(fullUrl))
if err != nil {
return "", "", "", err
}
pathQuery := path
if query != "" {
pathQuery += "?" + query
}
return cropString(host), cropString(pathQuery), cropString(host + pathQuery), nil
}
func getUUID(m messages.Message) string {
hash := fnv.New128a()
hash.Write(uint64ToBytes(m.SessionID()))
hash.Write(uint64ToBytes(m.MsgID()))
hash.Write(uint64ToBytes(uint64(m.TypeID())))
uuidObj, err := uuid.FromBytes(hash.Sum(nil))
if err != nil {
fmt.Printf("can't create uuid from bytes: %s", err)
uuidObj = uuid.New()
}
return uuidObj.String()
}
func uint64ToBytes(num uint64) []byte {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, num)
return buf
}
func cropString(s string) string {
if len(s) > 8000 {
return s[:8000]
}
return s
}

View file

@ -31,7 +31,7 @@ func nullableString(v string) *string {
func datetime(timestamp uint64) time.Time {
t := time.Unix(int64(timestamp/1e3), 0)
// Temporal solution for not correct timestamps in performance messages
if t.Year() < 2022 || t.Year() > 2025 {
if t.Year() < 2023 || t.Year() > 2026 {
return time.Now()
}
return t

View file

@ -1,9 +1,11 @@
package types
import (
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"github.com/google/uuid"
"hash/fnv"
"strconv"
@ -13,13 +15,14 @@ import (
const SOURCE_JS = "js_exception"
type ErrorEvent struct {
MessageID uint64
Timestamp uint64
Source string
Name string
Message string
Payload string
Tags map[string]*string
MessageID uint64
Timestamp uint64
Source string
Name string
Message string
Payload string
Tags map[string]*string
OriginType int
}
func unquote(s string) string {
@ -60,24 +63,26 @@ func parseTags(tagsJSON string) (tags map[string]*string, err error) {
func WrapJSException(m *JSException) (*ErrorEvent, error) {
meta, err := parseTags(m.Metadata)
return &ErrorEvent{
MessageID: m.Meta().Index,
Timestamp: m.Meta().Timestamp,
Source: SOURCE_JS,
Name: m.Name,
Message: m.Message,
Payload: m.Payload,
Tags: meta,
MessageID: m.Meta().Index,
Timestamp: m.Meta().Timestamp,
Source: SOURCE_JS,
Name: m.Name,
Message: m.Message,
Payload: m.Payload,
Tags: meta,
OriginType: m.TypeID(),
}, err
}
func WrapIntegrationEvent(m *IntegrationEvent) *ErrorEvent {
return &ErrorEvent{
MessageID: m.Meta().Index, // This will be always 0 here since it's coming from backend TODO: find another way to index
Timestamp: m.Timestamp,
Source: m.Source,
Name: m.Name,
Message: m.Message,
Payload: m.Payload,
MessageID: m.Meta().Index, // This will be always 0 here since it's coming from backend TODO: find another way to index
Timestamp: m.Timestamp,
Source: m.Source,
Name: m.Name,
Message: m.Message,
Payload: m.Payload,
OriginType: m.TypeID(),
}
}
@ -129,3 +134,22 @@ func WrapCustomEvent(m *CustomEvent) *IssueEvent {
msg.Meta().SetMeta(m.Meta())
return msg
}
func (e *ErrorEvent) GetUUID(sessID uint64) string {
hash := fnv.New128a()
hash.Write(Uint64ToBytes(sessID))
hash.Write(Uint64ToBytes(e.MessageID))
hash.Write(Uint64ToBytes(uint64(e.OriginType)))
uuidObj, err := uuid.FromBytes(hash.Sum(nil))
if err != nil {
fmt.Printf("can't create uuid from bytes: %s", err)
uuidObj = uuid.New()
}
return uuidObj.String()
}
func Uint64ToBytes(num uint64) []byte {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, num)
return buf
}

View file

@ -1,6 +1,8 @@
package messages
import "fmt"
import (
"fmt"
)
type Message interface {
Encode() []byte