diff --git a/backend/cmd/db/main.go b/backend/cmd/db/main.go index 2e962cb1b..1712b8a3f 100644 --- a/backend/cmd/db/main.go +++ b/backend/cmd/db/main.go @@ -122,11 +122,18 @@ func main() { os.Exit(0) case <-commitTick: // Send collected batches to db + start := time.Now() pg.CommitBatches() + pgDur := time.Now().Sub(start).Milliseconds() + + start = time.Now() if err := saver.CommitStats(); err != nil { log.Printf("Error on stats commit: %v", err) } - // TODO?: separate stats & regular messages + chDur := time.Now().Sub(start).Milliseconds() + log.Printf("commit duration(ms), pg: %d, ch: %d", pgDur, chDur) + + // TODO: use commit worker to save time each tick if err := consumer.Commit(); err != nil { log.Printf("Error on consumer commit: %v", err) } @@ -134,7 +141,7 @@ func main() { // Handle new message from queue err := consumer.ConsumeNext() if err != nil { - log.Fatalf("Error on consumption: %v", err) // TODO: is always fatal? + log.Fatalf("Error on consumption: %v", err) } } } diff --git a/backend/go.mod b/backend/go.mod index a15e23196..caaf1bf83 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -4,12 +4,12 @@ go 1.18 require ( cloud.google.com/go/logging v1.4.2 - github.com/ClickHouse/clickhouse-go v1.5.4 + github.com/ClickHouse/clickhouse-go/v2 v2.2.0 github.com/aws/aws-sdk-go v1.35.23 github.com/btcsuite/btcutil v1.0.2 github.com/elastic/go-elasticsearch/v7 v7.13.1 github.com/go-redis/redis v6.15.9+incompatible - github.com/google/uuid v1.1.2 + github.com/google/uuid v1.3.0 github.com/gorilla/mux v1.8.0 github.com/jackc/pgconn v1.6.0 github.com/jackc/pgerrcode v0.0.0-20201024163028-a0d42d470451 @@ -36,7 +36,6 @@ require ( cloud.google.com/go/storage v1.14.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect - github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect github.com/confluentinc/confluent-kafka-go v1.9.0 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -50,15 +49,19 @@ require ( github.com/jackc/pgproto3/v2 v2.0.2 // indirect github.com/jackc/pgservicefile v0.0.0-20200307190119-3430c5407db8 // indirect github.com/jackc/pgtype v1.3.0 // indirect - github.com/jackc/puddle v1.1.0 // indirect + github.com/jackc/puddle v1.2.2-0.20220404125616-4e959849469a // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect - github.com/klauspost/compress v1.11.9 // indirect + github.com/klauspost/compress v1.15.7 // indirect github.com/kr/pretty v0.3.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect + github.com/paulmach/orb v0.7.1 // indirect + github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/prometheus/client_golang v1.12.1 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect + github.com/shopspring/decimal v1.3.1 // indirect + github.com/stretchr/testify v1.8.0 // indirect go.opencensus.io v0.23.0 // indirect go.opentelemetry.io/otel/sdk v1.7.0 // indirect go.opentelemetry.io/otel/trace v1.7.0 // indirect @@ -73,5 +76,4 @@ require ( google.golang.org/grpc v1.46.2 // indirect google.golang.org/protobuf v1.28.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect - gopkg.in/yaml.v3 v3.0.0 // indirect ) diff --git a/backend/go.sum b/backend/go.sum index 433f2b895..6b76d1278 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -61,9 +61,11 @@ cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3f dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= -github.com/ClickHouse/clickhouse-go v1.5.4 h1:cKjXeYLNWVJIx2J1K6H2CqyRmfwVJVY1OV1coaaFcI0= github.com/ClickHouse/clickhouse-go v1.5.4/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI= +github.com/ClickHouse/clickhouse-go/v2 v2.2.0 h1:dj00TDKY+xwuTJdbpspCSmTLFyWzRJerTHwaBxut1C0= +github.com/ClickHouse/clickhouse-go/v2 v2.2.0/go.mod h1:8f2XZUi7XoeU+uPIytSi1cvx8fmJxi7vIgqpvYTF1+o= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -79,7 +81,6 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/bkaradzic/go-lz4 v1.0.0 h1:RXc4wYsyz985CkXXeX04y4VnZFGG8Rd43pRaHsOXAKk= github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwjwegp5jy4= github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= @@ -100,7 +101,6 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= -github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 h1:F1EaeKL/ta07PY/k9Os/UFtwERei2/XzGemhpGnBKNg= github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h4xcZ5GoxqC5SDxFQ8gwyZPKQoEzownBlhI80= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= @@ -151,6 +151,8 @@ github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= +github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= @@ -158,6 +160,7 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE= github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -230,8 +233,9 @@ github.com/google/pprof v0.0.0-20210601050228-01bbb1931b22/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= -github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pfu6SK+H1/DsU0= @@ -240,8 +244,10 @@ github.com/googleapis/gax-go/v2 v2.2.0/go.mod h1:as02EH8zWkzwUoLbBaFeQ+arQaj/Oth github.com/googleapis/gax-go/v2 v2.3.0/go.mod h1:b8LNqSzNabLiUpXKkY7HAR5jr6bIT99EXz9pXxye9YM= github.com/googleapis/gax-go/v2 v2.4.0 h1:dS9eYAjhrE2RjmzYw2XAPvcXfmcQLtFEQWn0CR82awk= github.com/googleapis/gax-go/v2 v2.4.0/go.mod h1:XOTVJ59hdnfJLIP/dh8n5CGryZR2LxK9wbMD5+iXC6c= +github.com/gorilla/handlers v1.4.2/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -290,8 +296,9 @@ github.com/jackc/pgx/v4 v4.6.0 h1:Fh0O9GdlG4gYpjpwOqjdEodJUQM9jzN3Hdv7PN0xmm0= github.com/jackc/pgx/v4 v4.6.0/go.mod h1:vPh43ZzxijXUVJ+t/EmXBtFmbFVO72cuneCT9oAlxAg= github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= -github.com/jackc/puddle v1.1.0 h1:musOWczZC/rSbqut475Vfcczg7jJsdUQf0D6oKPLgNU= github.com/jackc/puddle v1.1.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle v1.2.2-0.20220404125616-4e959849469a h1:oH7y/b+q2BEerCnARr/HZc1NxOYbKSJor4MqQXlhh+s= +github.com/jackc/puddle v1.2.2-0.20220404125616-4e959849469a/go.mod h1:ZQuO1Un86Xpe1ShKl08ERTzYhzWq+OvrvotbpeE3XO0= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= @@ -308,10 +315,11 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1 github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= -github.com/klauspost/compress v1.11.9 h1:5OCMOdde1TCT2sookEuVeEZzA8bmRSFV3AwPDZAG8AA= -github.com/klauspost/compress v1.11.9/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/klauspost/compress v1.15.7 h1:7cgTQxJCU/vy+oP/E3B9RGbQTgbiVzIJWIKOLoAsPok= +github.com/klauspost/compress v1.15.7/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/pgzip v1.2.5 h1:qnWYvvKqedOF2ulHpMG72XQol4ILEJ8k2wwRl/Km8oE= github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -339,6 +347,7 @@ github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2y github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/mkevac/debugcharts v0.0.0-20191222103121-ae1c48aa8615/go.mod h1:Ad7oeElCZqA1Ufj0U9/liOF4BtVepxRcTvr2ey7zTvM= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= @@ -353,8 +362,12 @@ github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/oschwald/maxminddb-golang v1.7.0 h1:JmU4Q1WBv5Q+2KZy5xJI+98aUwTIrPPxZUkd5Cwr8Zc= github.com/oschwald/maxminddb-golang v1.7.0/go.mod h1:RXZtst0N6+FY/3qCNmZMBApR19cdQj43/NM9VkrNAis= -github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I= +github.com/paulmach/orb v0.7.1 h1:Zha++Z5OX/l168sqHK3k4z18LDvr+YAO/VjK0ReQ9rU= +github.com/paulmach/orb v0.7.1/go.mod h1:FWRlTgl88VI1RBx/MkrwWDRhQ96ctqMCh8boXhmqB/A= +github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -393,8 +406,12 @@ github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThC github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sethvargo/go-envconfig v0.7.0 h1:P/ljQXSRjgAgsnIripHs53Jg/uNVXu2FYQ9yLSDappA= github.com/sethvargo/go-envconfig v0.7.0/go.mod h1:00S1FAhRUuTNJazWBWcJGvEHOM+NO6DhoRMAOX7FY5o= -github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 h1:pntxY8Ary0t43dCZ5dqY4YTJCObLY1kIXl0uzMv+7DE= +github.com/shirou/gopsutil v2.19.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= +github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= +github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4/go.mod h1:qsXQc7+bwAM3Q1u/4XEfrquwF8Lw7D7y5cD8CuHnfIc= github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= +github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= +github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= @@ -403,14 +420,19 @@ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasO github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk= +github.com/tklauser/numcpus v0.4.0/go.mod h1:1+UI3pD8NW14VMwdgJNJ1ESk2UnwhAnz5hMwiKKqXCQ= github.com/tomasen/realip v0.0.0-20180522021738-f0c99a92ddce h1:fb190+cK2Xz/dvi9Hv8eCYJYvIGUTN2/KLq1pT6CjEc= github.com/tomasen/realip v0.0.0-20180522021738-f0c99a92ddce/go.mod h1:o8v6yHRoik09Xen7gje4m9ERNah1d1PPsVq1VEx9vE4= github.com/ua-parser/uap-go v0.0.0-20200325213135-e1c09f13e2fe h1:aj/vX5epIlQQBEocKoM9nSAiNpakdQzElc8SaRFPu+I= @@ -420,6 +442,7 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= @@ -594,8 +617,10 @@ golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191220220014-0732a990476f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191224085550-c709ea063b76/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -649,6 +674,7 @@ golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220328115105-d36c6a25d886/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220429233432-b5fbb4746d32/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220502124256-b6088ccd6cba/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -706,6 +732,7 @@ golang.org/x/tools v0.0.0-20200501065659-ab2804fb9c9d/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20200512131952-2bc93b1c0c88/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200515010526-7d3b6ebf133d/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200618134242-20370b0cb4b2/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= @@ -714,6 +741,7 @@ golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= @@ -927,8 +955,8 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0 h1:hjy8E9ON/egN1tAYqKb61G10WtihqetD4sz2H+8nIeA= -gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/ee/backend/internal/db/datasaver/stats.go b/ee/backend/internal/db/datasaver/stats.go index d5bd74f83..7fa2fb9d0 100644 --- a/ee/backend/internal/db/datasaver/stats.go +++ b/ee/backend/internal/db/datasaver/stats.go @@ -10,7 +10,7 @@ import ( . "openreplay/backend/pkg/messages" ) -var ch *clickhouse.Connector +var ch clickhouse.Connector var finalizeTicker <-chan time.Time func (si *Saver) InitStats() { diff --git a/ee/backend/pkg/db/clickhouse/bulk.go b/ee/backend/pkg/db/clickhouse/bulk.go deleted file mode 100644 index 121cdbbf0..000000000 --- a/ee/backend/pkg/db/clickhouse/bulk.go +++ /dev/null @@ -1,45 +0,0 @@ -package clickhouse - -import ( - "errors" - "database/sql" -) - -type bulk struct { - db *sql.DB - query string - tx *sql.Tx - stmt *sql.Stmt -} - -func newBulk(db *sql.DB, query string) *bulk { - return &bulk{ - db: db, - query: query, - } -} - -func (b *bulk) prepare() error { - var err error - b.tx, err = b.db.Begin() - if err != nil { - return err - } - b.stmt, err = b.tx.Prepare(b.query) - if err != nil { - return err - } - return nil -} - -func (b *bulk) commit() error { - return b.tx.Commit() -} - -func (b *bulk) exec(args ...interface{}) error { - if b.stmt == nil { - return errors.New("Bulk is not prepared.") - } - _, err := b.stmt.Exec(args...) - return err -} diff --git a/ee/backend/pkg/db/clickhouse/connector.go b/ee/backend/pkg/db/clickhouse/connector.go index cc0d20497..1fd6e5d1e 100644 --- a/ee/backend/pkg/db/clickhouse/connector.go +++ b/ee/backend/pkg/db/clickhouse/connector.go @@ -1,138 +1,416 @@ package clickhouse import ( - "database/sql" - _ "github.com/ClickHouse/clickhouse-go" + "context" + "errors" + "fmt" + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "log" + "openreplay/backend/pkg/db/types" + "openreplay/backend/pkg/hashid" + "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/url" + "strings" + "time" "openreplay/backend/pkg/license" ) -type Connector struct { - sessions *bulk - metadata *bulk // TODO: join sessions, sessions_metadata & sessions_ios - resources *bulk - pages *bulk - clicks *bulk - inputs *bulk - errors *bulk - performance *bulk - longtasks *bulk - db *sql.DB +var CONTEXT_MAP = map[uint64]string{0: "unknown", 1: "self", 2: "same-origin-ancestor", 3: "same-origin-descendant", 4: "same-origin", 5: "cross-origin-ancestor", 6: "cross-origin-descendant", 7: "cross-origin-unreachable", 8: "multiple-contexts"} +var CONTAINER_TYPE_MAP = map[uint64]string{0: "window", 1: "iframe", 2: "embed", 3: "object"} + +type Connector interface { + Prepare() error + Commit() error + FinaliseSessionsTable() error + InsertWebSession(session *types.Session) error + InsertWebResourceEvent(session *types.Session, msg *messages.ResourceEvent) error + InsertWebPageEvent(session *types.Session, msg *messages.PageEvent) error + InsertWebClickEvent(session *types.Session, msg *messages.ClickEvent) error + InsertWebInputEvent(session *types.Session, msg *messages.InputEvent) error + InsertWebErrorEvent(session *types.Session, msg *messages.ErrorEvent) error + InsertWebPerformanceTrackAggr(session *types.Session, msg *messages.PerformanceTrackAggr) error + InsertLongtask(session *types.Session, msg *messages.LongTask) error } -func NewConnector(url string) *Connector { +type connectorImpl struct { + conn driver.Conn + batches map[string]driver.Batch +} + +func NewConnector(url string) Connector { license.CheckLicense() - - db, err := sql.Open("clickhouse", url) + url = strings.TrimPrefix(url, "tcp://") + url = strings.TrimSuffix(url, "/default") + conn, err := clickhouse.Open(&clickhouse.Options{ + Addr: []string{url}, + Auth: clickhouse.Auth{ + Database: "default", + }, + MaxOpenConns: 20, + MaxIdleConns: 15, + ConnMaxLifetime: 3 * time.Minute, + Compression: &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }, + // Debug: true, + }) if err != nil { - log.Fatalln(err) + log.Fatal(err) } - return &Connector{ - db: db, - sessions: newBulk(db, ` - INSERT INTO sessions (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, datetime, duration, pages_count, events_count, errors_count, user_browser, user_browser_version) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - `), - // TODO: join sessions, sessions_metadata & sessions_ios - metadata: newBulk(db, ` - INSERT INTO sessions_metadata (session_id, user_id, user_anonymous_id, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10, datetime) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - `), - resources: newBulk(db, ` - INSERT INTO resources (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, url, type, duration, ttfb, header_size, encoded_body_size, decoded_body_size, success) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - `), - pages: newBulk(db, ` - INSERT INTO pages (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, url, request_start, response_start, response_end, dom_content_loaded_event_start, dom_content_loaded_event_end, load_event_start, load_event_end, first_paint, first_contentful_paint, speed_index, visually_complete, time_to_interactive) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - `), - clicks: newBulk(db, ` - INSERT INTO clicks (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, label, hesitation_time) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - `), - inputs: newBulk(db, ` - INSERT INTO inputs (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, label) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - `), - errors: newBulk(db, ` - INSERT INTO errors (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, source, name, message, error_id) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - `), - performance: newBulk(db, ` - INSERT INTO performance (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, min_fps, avg_fps, max_fps, min_cpu, avg_cpu, max_cpu, min_total_js_heap_size, avg_total_js_heap_size, max_total_js_heap_size, min_used_js_heap_size, avg_used_js_heap_size, max_used_js_heap_size) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - `), - longtasks: newBulk(db, ` - INSERT INTO longtasks (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, context, container_type, container_id, container_name, container_src) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - `), + + c := &connectorImpl{ + conn: conn, + batches: make(map[string]driver.Batch, 9), } + return c } -func (conn *Connector) Prepare() error { - if err := conn.sessions.prepare(); err != nil { - return err +func (c *connectorImpl) newBatch(name, query string) error { + batch, err := c.conn.PrepareBatch(context.Background(), query) + if err != nil { + return fmt.Errorf("can't create new batch: %s", err) } - if err := conn.metadata.prepare(); err != nil { - return err + if _, ok := c.batches[name]; ok { + delete(c.batches, name) } - if err := conn.resources.prepare(); err != nil { - return err - } - if err := conn.pages.prepare(); err != nil { - return err - } - if err := conn.clicks.prepare(); err != nil { - return err - } - if err := conn.inputs.prepare(); err != nil { - return err - } - if err := conn.errors.prepare(); err != nil { - return err - } - if err := conn.performance.prepare(); err != nil { - return err - } - if err := conn.longtasks.prepare(); err != nil { - return err + c.batches[name] = batch + return nil +} + +var batches = map[string]string{ + "sessions": "INSERT INTO sessions (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, datetime, duration, pages_count, events_count, errors_count, user_browser, user_browser_version) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + "metadata": "INSERT INTO sessions_metadata (session_id, user_id, user_anonymous_id, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10, datetime) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + "resources": "INSERT INTO resources (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, url, type, duration, ttfb, header_size, encoded_body_size, decoded_body_size, success) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + "pages": "INSERT INTO pages (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, url, request_start, response_start, response_end, dom_content_loaded_event_start, dom_content_loaded_event_end, load_event_start, load_event_end, first_paint, first_contentful_paint, speed_index, visually_complete, time_to_interactive) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + "clicks": "INSERT INTO clicks (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, label, hesitation_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + "inputs": "INSERT INTO inputs (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, label) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + "errors": "INSERT INTO errors (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, source, name, message, error_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + "performance": "INSERT INTO performance (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, min_fps, avg_fps, max_fps, min_cpu, avg_cpu, max_cpu, min_total_js_heap_size, avg_total_js_heap_size, max_total_js_heap_size, min_used_js_heap_size, avg_used_js_heap_size, max_used_js_heap_size) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + "longtasks": "INSERT INTO longtasks (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, context, container_type, container_id, container_name, container_src) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", +} + +func (c *connectorImpl) Prepare() error { + for table, query := range batches { + if err := c.newBatch(table, query); err != nil { + return fmt.Errorf("can't create %s batch: %s", table, err) + } } return nil } -func (conn *Connector) Commit() error { - if err := conn.sessions.commit(); err != nil { - return err - } - if err := conn.metadata.commit(); err != nil { - return err - } - if err := conn.resources.commit(); err != nil { - return err - } - if err := conn.pages.commit(); err != nil { - return err - } - if err := conn.clicks.commit(); err != nil { - return err - } - if err := conn.inputs.commit(); err != nil { - return err - } - if err := conn.errors.commit(); err != nil { - return err - } - if err := conn.performance.commit(); err != nil { - return err - } - if err := conn.longtasks.commit(); err != nil { - return err +func (c *connectorImpl) Commit() error { + for _, b := range c.batches { + if err := b.Send(); err != nil { + return fmt.Errorf("can't send batch: %s", err) + } } return nil } -func (conn *Connector) FinaliseSessionsTable() error { - _, err := conn.db.Exec("OPTIMIZE TABLE sessions FINAL") - return err +func (c *connectorImpl) FinaliseSessionsTable() error { + if err := c.conn.Exec(context.Background(), "OPTIMIZE TABLE sessions FINAL"); err != nil { + return fmt.Errorf("can't finalise sessions table: %s", err) + } + return nil +} + +func (c *connectorImpl) checkError(name string, err error) { + if err != clickhouse.ErrBatchAlreadySent { + if batchErr := c.newBatch(name, batches[name]); batchErr != nil { + log.Printf("can't create %s batch after failed append operation: %s", name, batchErr) + } + } +} + +func (c *connectorImpl) InsertWebSession(session *types.Session) error { + if session.Duration == nil { + return errors.New("trying to insert session with nil duration") + } + if err := c.batches["sessions"].Append( + session.SessionID, + session.ProjectID, + session.TrackerVersion, + nullableString(session.RevID), + session.UserUUID, + session.UserOS, + nullableString(session.UserOSVersion), + nullableString(session.UserDevice), + session.UserDeviceType, + session.UserCountry, + datetime(session.Timestamp), + uint32(*session.Duration), + uint16(session.PagesCount), + uint16(session.EventsCount), + uint16(session.ErrorsCount), + // Web unique columns + session.UserBrowser, + nullableString(session.UserBrowserVersion), + ); err != nil { + c.checkError("sessions", err) + return fmt.Errorf("can't append to sessions batch: %s", err) + } + if err := c.batches["metadata"].Append( + session.SessionID, + session.UserID, + session.UserAnonymousID, + session.Metadata1, + session.Metadata2, + session.Metadata3, + session.Metadata4, + session.Metadata5, + session.Metadata6, + session.Metadata7, + session.Metadata8, + session.Metadata9, + session.Metadata10, + datetime(session.Timestamp), + ); err != nil { + c.checkError("metadata", err) + return fmt.Errorf("can't append to metadata batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertWebResourceEvent(session *types.Session, msg *messages.ResourceEvent) error { + var method interface{} = url.EnsureMethod(msg.Method) + if method == "" { + method = nil + } + if err := c.batches["resources"].Append( + session.SessionID, + session.ProjectID, + session.TrackerVersion, + nullableString(session.RevID), + session.UserUUID, + session.UserOS, + nullableString(session.UserOSVersion), + session.UserBrowser, + nullableString(session.UserBrowserVersion), + nullableString(session.UserDevice), + session.UserDeviceType, + session.UserCountry, + datetime(msg.Timestamp), + url.DiscardURLQuery(msg.URL), + msg.Type, + nullableUint16(uint16(msg.Duration)), + nullableUint16(uint16(msg.TTFB)), + nullableUint16(uint16(msg.HeaderSize)), + nullableUint32(uint32(msg.EncodedBodySize)), + nullableUint32(uint32(msg.DecodedBodySize)), + msg.Success, + ); err != nil { + c.checkError("resources", err) + return fmt.Errorf("can't append to resources batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertWebPageEvent(session *types.Session, msg *messages.PageEvent) error { + if err := c.batches["pages"].Append( + session.SessionID, + session.ProjectID, + session.TrackerVersion, nullableString(session.RevID), + session.UserUUID, + session.UserOS, + nullableString(session.UserOSVersion), + session.UserBrowser, + nullableString(session.UserBrowserVersion), + nullableString(session.UserDevice), + session.UserDeviceType, + session.UserCountry, + datetime(msg.Timestamp), + url.DiscardURLQuery(msg.URL), + nullableUint16(uint16(msg.RequestStart)), + nullableUint16(uint16(msg.ResponseStart)), + nullableUint16(uint16(msg.ResponseEnd)), + nullableUint16(uint16(msg.DomContentLoadedEventStart)), + nullableUint16(uint16(msg.DomContentLoadedEventEnd)), + nullableUint16(uint16(msg.LoadEventStart)), + nullableUint16(uint16(msg.LoadEventEnd)), + nullableUint16(uint16(msg.FirstPaint)), + nullableUint16(uint16(msg.FirstContentfulPaint)), + nullableUint16(uint16(msg.SpeedIndex)), + nullableUint16(uint16(msg.VisuallyComplete)), + nullableUint16(uint16(msg.TimeToInteractive)), + ); err != nil { + c.checkError("pages", err) + return fmt.Errorf("can't append to pages batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertWebClickEvent(session *types.Session, msg *messages.ClickEvent) error { + if msg.Label == "" { + return nil + } + if err := c.batches["clicks"].Append( + session.SessionID, + session.ProjectID, + session.TrackerVersion, + nullableString(session.RevID), + session.UserUUID, + session.UserOS, + nullableString(session.UserOSVersion), + session.UserBrowser, + nullableString(session.UserBrowserVersion), + nullableString(session.UserDevice), + session.UserDeviceType, + session.UserCountry, + datetime(msg.Timestamp), + msg.Label, + nullableUint32(uint32(msg.HesitationTime)), + ); err != nil { + c.checkError("clicks", err) + return fmt.Errorf("can't append to clicks batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertWebInputEvent(session *types.Session, msg *messages.InputEvent) error { + if msg.Label == "" { + return nil + } + if err := c.batches["inputs"].Append( + session.SessionID, + session.ProjectID, + session.TrackerVersion, + nullableString(session.RevID), + session.UserUUID, + session.UserOS, + nullableString(session.UserOSVersion), + session.UserBrowser, + nullableString(session.UserBrowserVersion), + nullableString(session.UserDevice), + session.UserDeviceType, + session.UserCountry, + datetime(msg.Timestamp), + msg.Label, + ); err != nil { + c.checkError("inputs", err) + return fmt.Errorf("can't append to inputs batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertWebErrorEvent(session *types.Session, msg *messages.ErrorEvent) error { + if err := c.batches["errors"].Append( + session.SessionID, + session.ProjectID, + session.TrackerVersion, + nullableString(session.RevID), + session.UserUUID, + session.UserOS, + nullableString(session.UserOSVersion), + session.UserBrowser, + nullableString(session.UserBrowserVersion), + nullableString(session.UserDevice), + session.UserDeviceType, + session.UserCountry, + datetime(msg.Timestamp), + msg.Source, + nullableString(msg.Name), + msg.Message, + hashid.WebErrorID(session.ProjectID, msg), + ); err != nil { + c.checkError("errors", err) + return fmt.Errorf("can't append to errors batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertWebPerformanceTrackAggr(session *types.Session, msg *messages.PerformanceTrackAggr) error { + var timestamp uint64 = (msg.TimestampStart + msg.TimestampEnd) / 2 + if err := c.batches["performance"].Append( + session.SessionID, + session.ProjectID, + session.TrackerVersion, + nullableString(session.RevID), + session.UserUUID, + session.UserOS, + nullableString(session.UserOSVersion), + session.UserBrowser, + nullableString(session.UserBrowserVersion), + nullableString(session.UserDevice), + session.UserDeviceType, + session.UserCountry, + datetime(timestamp), + uint8(msg.MinFPS), + uint8(msg.AvgFPS), + uint8(msg.MaxFPS), + uint8(msg.MinCPU), + uint8(msg.AvgCPU), + uint8(msg.MaxCPU), + msg.MinTotalJSHeapSize, + msg.AvgTotalJSHeapSize, + msg.MaxTotalJSHeapSize, + msg.MinUsedJSHeapSize, + msg.AvgUsedJSHeapSize, + msg.MaxUsedJSHeapSize, + ); err != nil { + c.checkError("performance", err) + return fmt.Errorf("can't append to performance batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertLongtask(session *types.Session, msg *messages.LongTask) error { + if err := c.batches["longtasks"].Append( + session.SessionID, + session.ProjectID, + session.TrackerVersion, + nullableString(session.RevID), + session.UserUUID, + session.UserOS, + nullableString(session.UserOSVersion), + session.UserBrowser, + nullableString(session.UserBrowserVersion), + nullableString(session.UserDevice), + session.UserDeviceType, + session.UserCountry, + datetime(msg.Timestamp), + CONTEXT_MAP[msg.Context], + CONTAINER_TYPE_MAP[msg.ContainerType], + msg.ContainerId, + msg.ContainerName, + msg.ContainerSrc, + ); err != nil { + c.checkError("longtasks", err) + return fmt.Errorf("can't append to longtasks batch: %s", err) + } + return nil +} + +func nullableUint16(v uint16) *uint16 { + var p *uint16 = nil + if v != 0 { + p = &v + } + return p +} + +func nullableUint32(v uint32) *uint32 { + var p *uint32 = nil + if v != 0 { + p = &v + } + return p +} + +func nullableString(v string) *string { + var p *string = nil + if v != "" { + p = &v + } + return p +} + +func datetime(timestamp uint64) time.Time { + t := time.Unix(int64(timestamp/1e3), 0) + // Temporal solution for not correct timestamps in performance messages + if t.Year() < 2022 || t.Year() > 2025 { + return time.Now() + } + return t } diff --git a/ee/backend/pkg/db/clickhouse/helpers.go b/ee/backend/pkg/db/clickhouse/helpers.go deleted file mode 100644 index 37e30518c..000000000 --- a/ee/backend/pkg/db/clickhouse/helpers.go +++ /dev/null @@ -1,34 +0,0 @@ -package clickhouse - -import ( - "time" -) - - -func nullableUint16(v uint16) *uint16 { - var p *uint16 = nil - if v != 0 { - p = &v - } - return p -} - -func nullableUint32(v uint32) *uint32 { - var p *uint32 = nil - if v != 0 { - p = &v - } - return p -} - -func nullableString(v string) *string { - var p *string = nil - if v != "" { - p = &v - } - return p -} - -func datetime(timestamp uint64) time.Time { - return time.Unix(int64(timestamp/1e3), 0) -} diff --git a/ee/backend/pkg/db/clickhouse/messages-web.go b/ee/backend/pkg/db/clickhouse/messages-web.go deleted file mode 100644 index adfa38655..000000000 --- a/ee/backend/pkg/db/clickhouse/messages-web.go +++ /dev/null @@ -1,243 +0,0 @@ -package clickhouse - -import ( - "errors" - - . "openreplay/backend/pkg/db/types" - "openreplay/backend/pkg/hashid" - . "openreplay/backend/pkg/messages" - "openreplay/backend/pkg/url" -) - -func (conn *Connector) InsertWebSession(session *Session) error { - if session.Duration == nil { - return errors.New("Clickhouse: trying to insert session with ") - } - - if err := conn.sessions.exec( - session.SessionID, - session.ProjectID, - session.TrackerVersion, - nullableString(session.RevID), - session.UserUUID, - session.UserOS, - nullableString(session.UserOSVersion), - nullableString(session.UserDevice), - session.UserDeviceType, - session.UserCountry, - datetime(session.Timestamp), - uint32(*session.Duration), - session.PagesCount, - session.EventsCount, - session.ErrorsCount, - // Web unique columns - session.UserBrowser, - nullableString(session.UserBrowserVersion), - ); err != nil { - return err - } - // TODO: join sessions, sessions_metadata & sessions_ios - return conn.metadata.exec( - session.SessionID, - session.UserID, - session.UserAnonymousID, - session.Metadata1, - session.Metadata2, - session.Metadata3, - session.Metadata4, - session.Metadata5, - session.Metadata6, - session.Metadata7, - session.Metadata8, - session.Metadata9, - session.Metadata10, - datetime(session.Timestamp), - ) -} - -func (conn *Connector) InsertWebResourceEvent(session *Session, msg *ResourceEvent) error { - // nullableString causes error "unexpected type *string" on Nullable Enum type - // (apparently, a clickhouse-go bug) https://github.com/ClickHouse/clickhouse-go/pull/204 - var method interface{} = url.EnsureMethod(msg.Method) - if method == "" { - method = nil - } - return conn.resources.exec( - session.SessionID, - session.ProjectID, - session.TrackerVersion, - nullableString(session.RevID), - session.UserUUID, - session.UserOS, - nullableString(session.UserOSVersion), - session.UserBrowser, - nullableString(session.UserBrowserVersion), - nullableString(session.UserDevice), - session.UserDeviceType, - session.UserCountry, - datetime(msg.Timestamp), - url.DiscardURLQuery(msg.URL), - msg.Type, - nullableUint16(uint16(msg.Duration)), - nullableUint16(uint16(msg.TTFB)), - nullableUint16(uint16(msg.HeaderSize)), - nullableUint32(uint32(msg.EncodedBodySize)), - nullableUint32(uint32(msg.DecodedBodySize)), - msg.Success, - ) -} - -func (conn *Connector) InsertWebPageEvent(session *Session, msg *PageEvent) error { - return conn.pages.exec( - session.SessionID, - session.ProjectID, - session.TrackerVersion, nullableString(session.RevID), - session.UserUUID, - session.UserOS, - nullableString(session.UserOSVersion), - session.UserBrowser, - nullableString(session.UserBrowserVersion), - nullableString(session.UserDevice), - session.UserDeviceType, - session.UserCountry, - datetime(msg.Timestamp), - url.DiscardURLQuery(msg.URL), - nullableUint16(uint16(msg.RequestStart)), - nullableUint16(uint16(msg.ResponseStart)), - nullableUint16(uint16(msg.ResponseEnd)), - nullableUint16(uint16(msg.DomContentLoadedEventStart)), - nullableUint16(uint16(msg.DomContentLoadedEventEnd)), - nullableUint16(uint16(msg.LoadEventStart)), - nullableUint16(uint16(msg.LoadEventEnd)), - nullableUint16(uint16(msg.FirstPaint)), - nullableUint16(uint16(msg.FirstContentfulPaint)), - nullableUint16(uint16(msg.SpeedIndex)), - nullableUint16(uint16(msg.VisuallyComplete)), - nullableUint16(uint16(msg.TimeToInteractive)), - ) -} - -func (conn *Connector) InsertWebClickEvent(session *Session, msg *ClickEvent) error { - if msg.Label == "" { - return nil - } - return conn.clicks.exec( - session.SessionID, - session.ProjectID, - session.TrackerVersion, - nullableString(session.RevID), - session.UserUUID, - session.UserOS, - nullableString(session.UserOSVersion), - session.UserBrowser, - nullableString(session.UserBrowserVersion), - nullableString(session.UserDevice), - session.UserDeviceType, - session.UserCountry, - datetime(msg.Timestamp), - msg.Label, - nullableUint32(uint32(msg.HesitationTime)), - ) -} - -func (conn *Connector) InsertWebInputEvent(session *Session, msg *InputEvent) error { - if msg.Label == "" { - return nil - } - return conn.inputs.exec( - session.SessionID, - session.ProjectID, - session.TrackerVersion, - nullableString(session.RevID), - session.UserUUID, - session.UserOS, - nullableString(session.UserOSVersion), - session.UserBrowser, - nullableString(session.UserBrowserVersion), - nullableString(session.UserDevice), - session.UserDeviceType, - session.UserCountry, - datetime(msg.Timestamp), - msg.Label, - ) -} - -func (conn *Connector) InsertWebErrorEvent(session *Session, msg *ErrorEvent) error { - return conn.errors.exec( - session.SessionID, - session.ProjectID, - session.TrackerVersion, - nullableString(session.RevID), - session.UserUUID, - session.UserOS, - nullableString(session.UserOSVersion), - session.UserBrowser, - nullableString(session.UserBrowserVersion), - nullableString(session.UserDevice), - session.UserDeviceType, - session.UserCountry, - datetime(msg.Timestamp), - msg.Source, - nullableString(msg.Name), - msg.Message, - hashid.WebErrorID(session.ProjectID, msg), - ) -} - -func (conn *Connector) InsertWebPerformanceTrackAggr(session *Session, msg *PerformanceTrackAggr) error { - var timestamp uint64 = (msg.TimestampStart + msg.TimestampEnd) / 2 - return conn.performance.exec( - session.SessionID, - session.ProjectID, - session.TrackerVersion, - nullableString(session.RevID), - session.UserUUID, - session.UserOS, - nullableString(session.UserOSVersion), - session.UserBrowser, - nullableString(session.UserBrowserVersion), - nullableString(session.UserDevice), - session.UserDeviceType, - session.UserCountry, - datetime(timestamp), - uint8(msg.MinFPS), - uint8(msg.AvgFPS), - uint8(msg.MaxFPS), - uint8(msg.MinCPU), - uint8(msg.AvgCPU), - uint8(msg.MaxCPU), - msg.MinTotalJSHeapSize, - msg.AvgTotalJSHeapSize, - msg.MaxTotalJSHeapSize, - msg.MinUsedJSHeapSize, - msg.AvgUsedJSHeapSize, - msg.MaxUsedJSHeapSize, - ) -} - -// TODO: make enum message type -var CONTEXT_MAP = map[uint64]string{0: "unknown", 1: "self", 2: "same-origin-ancestor", 3: "same-origin-descendant", 4: "same-origin", 5: "cross-origin-ancestor", 6: "cross-origin-descendant", 7: "cross-origin-unreachable", 8: "multiple-contexts"} -var CONTAINER_TYPE_MAP = map[uint64]string{0: "window", 1: "iframe", 2: "embed", 3: "object"} - -func (conn *Connector) InsertLongtask(session *Session, msg *LongTask) error { - return conn.longtasks.exec( - session.SessionID, - session.ProjectID, - session.TrackerVersion, - nullableString(session.RevID), - session.UserUUID, - session.UserOS, - nullableString(session.UserOSVersion), - session.UserBrowser, - nullableString(session.UserBrowserVersion), - nullableString(session.UserDevice), - session.UserDeviceType, - session.UserCountry, - datetime(msg.Timestamp), - CONTEXT_MAP[msg.Context], - CONTAINER_TYPE_MAP[msg.ContainerType], - msg.ContainerId, - msg.ContainerName, - msg.ContainerSrc, - ) -}