Draft: New metrics module (#982)

* feat(backend): created new metrics module
This commit is contained in:
Alexander 2023-02-20 16:37:55 +01:00 committed by GitHub
parent 2fae4549d6
commit fdd28dbc4a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
33 changed files with 1021 additions and 602 deletions

View file

@ -1,9 +1,7 @@
package main
import (
"context"
"log"
"openreplay/backend/pkg/pprof"
"os"
"os/signal"
"syscall"
@ -13,12 +11,16 @@ import (
"openreplay/backend/internal/assets/cacher"
config "openreplay/backend/internal/config/assets"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/monitoring"
"openreplay/backend/pkg/metrics"
assetsMetrics "openreplay/backend/pkg/metrics/assets"
"openreplay/backend/pkg/pprof"
"openreplay/backend/pkg/queue"
)
func main() {
metrics := monitoring.New("assets")
m := metrics.New()
m.Register(assetsMetrics.List())
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := config.New()
@ -26,18 +28,13 @@ func main() {
pprof.StartProfilingServer()
}
cacher := cacher.NewCacher(cfg, metrics)
totalAssets, err := metrics.RegisterCounter("assets_total")
if err != nil {
log.Printf("can't create assets_total metric: %s", err)
}
cacher := cacher.NewCacher(cfg)
msgHandler := func(msg messages.Message) {
switch m := msg.(type) {
case *messages.AssetCache:
cacher.CacheURL(m.SessionID(), m.URL)
totalAssets.Add(context.Background(), 1)
assetsMetrics.IncreaseProcessesSessions()
// TODO: connect to "raw" topic in order to listen for JSException
case *messages.JSException:
sourceList, err := assets.ExtractJSExceptionSources(&m.Payload)

View file

@ -3,8 +3,6 @@ package main
import (
"errors"
"log"
types2 "openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/pprof"
"os"
"os/signal"
"syscall"
@ -14,16 +12,21 @@ import (
"openreplay/backend/internal/db/datasaver"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/postgres"
types2 "openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/handlers"
custom2 "openreplay/backend/pkg/handlers/custom"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/monitoring"
"openreplay/backend/pkg/metrics"
databaseMetrics "openreplay/backend/pkg/metrics/database"
"openreplay/backend/pkg/pprof"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/sessions"
)
func main() {
metrics := monitoring.New("db")
m := metrics.New()
m.Register(databaseMetrics.List())
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := db.New()
@ -33,7 +36,7 @@ func main() {
// Init database
pg := cache.NewPGCache(
postgres.NewConn(cfg.Postgres.String(), cfg.BatchQueueLimit, cfg.BatchSizeLimit, metrics), cfg.ProjectExpirationTimeoutMs)
postgres.NewConn(cfg.Postgres.String(), cfg.BatchQueueLimit, cfg.BatchSizeLimit), cfg.ProjectExpirationTimeoutMs)
defer pg.Close()
// HandlersFabric returns the list of message handlers we want to be applied to each incoming message.

View file

@ -2,8 +2,6 @@ package main
import (
"log"
"openreplay/backend/internal/storage"
"openreplay/backend/pkg/pprof"
"os"
"os/signal"
"strings"
@ -12,16 +10,23 @@ import (
"openreplay/backend/internal/config/ender"
"openreplay/backend/internal/sessionender"
"openreplay/backend/internal/storage"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/intervals"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/monitoring"
"openreplay/backend/pkg/metrics"
databaseMetrics "openreplay/backend/pkg/metrics/database"
enderMetrics "openreplay/backend/pkg/metrics/ender"
"openreplay/backend/pkg/pprof"
"openreplay/backend/pkg/queue"
)
func main() {
metrics := monitoring.New("ender")
m := metrics.New()
m.Register(enderMetrics.List())
m.Register(databaseMetrics.List())
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := ender.New()
@ -29,10 +34,10 @@ func main() {
pprof.StartProfilingServer()
}
pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres.String(), 0, 0, metrics), cfg.ProjectExpirationTimeoutMs)
pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres.String(), 0, 0), cfg.ProjectExpirationTimeoutMs)
defer pg.Close()
sessions, err := sessionender.New(metrics, intervals.EVENTS_SESSION_END_TIMEOUT, cfg.PartitionsNumber)
sessions, err := sessionender.New(intervals.EVENTS_SESSION_END_TIMEOUT, cfg.PartitionsNumber)
if err != nil {
log.Printf("can't init ender service: %s", err)
return

View file

@ -2,23 +2,28 @@ package main
import (
"log"
"openreplay/backend/internal/config/http"
"openreplay/backend/internal/http/router"
"openreplay/backend/internal/http/server"
"openreplay/backend/internal/http/services"
"openreplay/backend/pkg/monitoring"
"openreplay/backend/pkg/pprof"
"os"
"os/signal"
"syscall"
"openreplay/backend/internal/config/http"
"openreplay/backend/internal/http/router"
"openreplay/backend/internal/http/server"
"openreplay/backend/internal/http/services"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/metrics"
databaseMetrics "openreplay/backend/pkg/metrics/database"
httpMetrics "openreplay/backend/pkg/metrics/http"
"openreplay/backend/pkg/pprof"
"openreplay/backend/pkg/queue"
)
func main() {
metrics := monitoring.New("http")
m := metrics.New()
m.Register(httpMetrics.List())
m.Register(databaseMetrics.List())
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := http.New()
@ -31,14 +36,14 @@ func main() {
defer producer.Close(15000)
// Connect to database
dbConn := cache.NewPGCache(postgres.NewConn(cfg.Postgres.String(), 0, 0, metrics), 1000*60*20)
dbConn := cache.NewPGCache(postgres.NewConn(cfg.Postgres.String(), 0, 0), 1000*60*20)
defer dbConn.Close()
// Build all services
services := services.New(cfg, producer, dbConn)
// Init server's routes
router, err := router.NewRouter(cfg, services, metrics)
router, err := router.NewRouter(cfg, services)
if err != nil {
log.Fatalf("failed while creating engine: %s", err)
}

View file

@ -2,24 +2,26 @@ package main
import (
"log"
config "openreplay/backend/internal/config/integrations"
"openreplay/backend/internal/integrations/clientManager"
"openreplay/backend/pkg/monitoring"
"openreplay/backend/pkg/pprof"
"time"
"os"
"os/signal"
"syscall"
"time"
config "openreplay/backend/internal/config/integrations"
"openreplay/backend/internal/integrations/clientManager"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/intervals"
"openreplay/backend/pkg/metrics"
databaseMetrics "openreplay/backend/pkg/metrics/database"
"openreplay/backend/pkg/pprof"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/token"
)
func main() {
metrics := monitoring.New("integrations")
m := metrics.New()
m.Register(databaseMetrics.List())
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := config.New()
@ -27,7 +29,7 @@ func main() {
pprof.StartProfilingServer()
}
pg := postgres.NewConn(cfg.Postgres.String(), 0, 0, metrics)
pg := postgres.NewConn(cfg.Postgres.String(), 0, 0)
defer pg.Close()
tokenizer := token.NewTokenizer(cfg.TokenSecret)

View file

@ -2,10 +2,8 @@ package main
import (
"bytes"
"context"
"encoding/binary"
"log"
"openreplay/backend/pkg/pprof"
"os"
"os/signal"
"syscall"
@ -16,13 +14,16 @@ import (
"openreplay/backend/internal/sink/sessionwriter"
"openreplay/backend/internal/storage"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/monitoring"
"openreplay/backend/pkg/metrics"
sinkMetrics "openreplay/backend/pkg/metrics/sink"
"openreplay/backend/pkg/pprof"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/url/assets"
)
func main() {
metrics := monitoring.New("sink")
m := metrics.New()
m.Register(sinkMetrics.List())
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := sink.New()
@ -39,22 +40,8 @@ func main() {
producer := queue.NewProducer(cfg.MessageSizeLimit, true)
defer producer.Close(cfg.ProducerCloseTimeout)
rewriter := assets.NewRewriter(cfg.AssetsOrigin)
assetMessageHandler := assetscache.New(cfg, rewriter, producer, metrics)
assetMessageHandler := assetscache.New(cfg, rewriter, producer)
counter := storage.NewLogCounter()
// Session message metrics
totalMessages, err := metrics.RegisterCounter("messages_total")
if err != nil {
log.Printf("can't create messages_total metric: %s", err)
}
savedMessages, err := metrics.RegisterCounter("messages_saved")
if err != nil {
log.Printf("can't create messages_saved metric: %s", err)
}
messageSize, err := metrics.RegisterHistogram("messages_size")
if err != nil {
log.Printf("can't create messages_size metric: %s", err)
}
var (
sessionID uint64
@ -74,11 +61,12 @@ func main() {
if domBuffer.Len() <= 0 && devBuffer.Len() <= 0 {
return
}
sinkMetrics.RecordWrittenBytes(float64(domBuffer.Len()), "dom")
sinkMetrics.RecordWrittenBytes(float64(devBuffer.Len()), "devtools")
// Write buffered batches to the session
if err := writer.Write(sessionID, domBuffer.Bytes(), devBuffer.Bytes()); err != nil {
log.Printf("writer error: %s", err)
return
}
// Prepare buffer for the next batch
@ -88,8 +76,7 @@ func main() {
return
}
// [METRICS] Increase the number of processed messages
totalMessages.Add(context.Background(), 1)
sinkMetrics.IncreaseTotalMessages()
// Send SessionEnd trigger to storage service
if msg.TypeID() == messages.MsgSessionEnd {
@ -187,9 +174,8 @@ func main() {
}
}
// [METRICS] Increase the number of written to the files messages and the message size
messageSize.Record(context.Background(), float64(len(msg.Encode())))
savedMessages.Add(context.Background(), 1)
sinkMetrics.IncreaseWrittenMessages()
sinkMetrics.RecordMessageSize(float64(len(msg.Encode())))
}
consumer := queue.NewConsumer(

View file

@ -2,7 +2,6 @@ package main
import (
"log"
"openreplay/backend/pkg/pprof"
"os"
"os/signal"
"syscall"
@ -12,13 +11,17 @@ import (
"openreplay/backend/internal/storage"
"openreplay/backend/pkg/failover"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/monitoring"
"openreplay/backend/pkg/metrics"
storageMetrics "openreplay/backend/pkg/metrics/storage"
"openreplay/backend/pkg/pprof"
"openreplay/backend/pkg/queue"
s3storage "openreplay/backend/pkg/storage"
cloud "openreplay/backend/pkg/storage"
)
func main() {
metrics := monitoring.New("storage")
m := metrics.New()
m.Register(storageMetrics.List())
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := config.New()
@ -26,8 +29,8 @@ func main() {
pprof.StartProfilingServer()
}
s3 := s3storage.NewS3(cfg.S3Region, cfg.S3Bucket)
srv, err := storage.New(cfg, s3, metrics)
s3 := cloud.NewS3(cfg.S3Region, cfg.S3Bucket)
srv, err := storage.New(cfg, s3)
if err != nil {
log.Printf("can't init storage service: %s", err)
return

View file

@ -1,16 +1,13 @@
package cacher
import (
"context"
"crypto/tls"
"fmt"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"io"
"io/ioutil"
"log"
"mime"
"net/http"
"openreplay/backend/pkg/monitoring"
metrics "openreplay/backend/pkg/metrics/assets"
"path/filepath"
"strings"
"time"
@ -25,30 +22,22 @@ import (
const MAX_CACHE_DEPTH = 5
type cacher struct {
timeoutMap *timeoutMap // Concurrency implemented
s3 *storage.S3 // AWS Docs: "These clients are safe to use concurrently."
httpClient *http.Client // Docs: "Clients are safe for concurrent use by multiple goroutines."
rewriter *assets.Rewriter // Read only
Errors chan error
sizeLimit int
downloadedAssets syncfloat64.Counter
requestHeaders map[string]string
workers *WorkerPool
timeoutMap *timeoutMap // Concurrency implemented
s3 *storage.S3 // AWS Docs: "These clients are safe to use concurrently."
httpClient *http.Client // Docs: "Clients are safe for concurrent use by multiple goroutines."
rewriter *assets.Rewriter // Read only
Errors chan error
sizeLimit int
requestHeaders map[string]string
workers *WorkerPool
}
func (c *cacher) CanCache() bool {
return c.workers.CanAddTask()
}
func NewCacher(cfg *config.Config, metrics *monitoring.Metrics) *cacher {
func NewCacher(cfg *config.Config) *cacher {
rewriter := assets.NewRewriter(cfg.AssetsOrigin)
if metrics == nil {
log.Fatalf("metrics are empty")
}
downloadedAssets, err := metrics.RegisterCounter("assets_downloaded")
if err != nil {
log.Printf("can't create downloaded_assets metric: %s", err)
}
c := &cacher{
timeoutMap: newTimeoutMap(),
s3: storage.NewS3(cfg.AWSRegion, cfg.S3BucketAssets),
@ -59,11 +48,10 @@ func NewCacher(cfg *config.Config, metrics *monitoring.Metrics) *cacher {
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
},
rewriter: rewriter,
Errors: make(chan error),
sizeLimit: cfg.AssetsSizeLimit,
downloadedAssets: downloadedAssets,
requestHeaders: cfg.AssetsRequestHeaders,
rewriter: rewriter,
Errors: make(chan error),
sizeLimit: cfg.AssetsSizeLimit,
requestHeaders: cfg.AssetsRequestHeaders,
}
c.workers = NewPool(64, c.CacheFile)
return c
@ -75,6 +63,7 @@ func (c *cacher) CacheFile(task *Task) {
func (c *cacher) cacheURL(t *Task) {
t.retries--
start := time.Now()
req, _ := http.NewRequest("GET", t.requestURL, nil)
if t.retries%2 == 0 {
req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20100101 Firefox/31.0")
@ -87,6 +76,7 @@ func (c *cacher) cacheURL(t *Task) {
c.Errors <- errors.Wrap(err, t.urlContext)
return
}
metrics.RecordDownloadDuration(float64(time.Now().Sub(start).Milliseconds()), res.StatusCode)
defer res.Body.Close()
if res.StatusCode >= 400 {
printErr := true
@ -122,12 +112,15 @@ func (c *cacher) cacheURL(t *Task) {
}
// TODO: implement in streams
start = time.Now()
err = c.s3.Upload(strings.NewReader(strData), t.cachePath, contentType, false)
if err != nil {
metrics.RecordUploadDuration(float64(time.Now().Sub(start).Milliseconds()), true)
c.Errors <- errors.Wrap(err, t.urlContext)
return
}
c.downloadedAssets.Add(context.Background(), 1)
metrics.RecordUploadDuration(float64(time.Now().Sub(start).Milliseconds()), false)
metrics.IncreaseSavedSessions()
if isCSS {
if t.depth > 0 {

View file

@ -22,28 +22,28 @@ func (e *Router) startSessionHandlerIOS(w http.ResponseWriter, r *http.Request)
req := &StartIOSSessionRequest{}
if r.Body == nil {
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"), startTime, r.URL.Path, 0)
return
}
body := http.MaxBytesReader(w, r.Body, e.cfg.JsonSizeLimit)
defer body.Close()
if err := json.NewDecoder(body).Decode(req); err != nil {
ResponseWithError(w, http.StatusBadRequest, err)
ResponseWithError(w, http.StatusBadRequest, err, startTime, r.URL.Path, 0)
return
}
if req.ProjectKey == nil {
ResponseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"))
ResponseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"), startTime, r.URL.Path, 0)
return
}
p, err := e.services.Database.GetProjectByKey(*req.ProjectKey)
if err != nil {
if postgres.IsNoRowsErr(err) {
ResponseWithError(w, http.StatusNotFound, errors.New("Project doesn't exist or is not active"))
ResponseWithError(w, http.StatusNotFound, errors.New("Project doesn't exist or is not active"), startTime, r.URL.Path, 0)
} else {
ResponseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
ResponseWithError(w, http.StatusInternalServerError, err, startTime, r.URL.Path, 0) // TODO: send error here only on staging
}
return
}
@ -53,18 +53,18 @@ func (e *Router) startSessionHandlerIOS(w http.ResponseWriter, r *http.Request)
if err != nil { // Starting the new one
dice := byte(rand.Intn(100)) // [0, 100)
if dice >= p.SampleRate {
ResponseWithError(w, http.StatusForbidden, errors.New("cancel"))
ResponseWithError(w, http.StatusForbidden, errors.New("cancel"), startTime, r.URL.Path, 0)
return
}
ua := e.services.UaParser.ParseFromHTTPRequest(r)
if ua == nil {
ResponseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
ResponseWithError(w, http.StatusForbidden, errors.New("browser not recognized"), startTime, r.URL.Path, 0)
return
}
sessionID, err := e.services.Flaker.Compose(uint64(startTime.UnixMilli()))
if err != nil {
ResponseWithError(w, http.StatusInternalServerError, err)
ResponseWithError(w, http.StatusInternalServerError, err, startTime, r.URL.Path, 0)
return
}
// TODO: if EXPIRED => send message for two sessions association
@ -94,22 +94,24 @@ func (e *Router) startSessionHandlerIOS(w http.ResponseWriter, r *http.Request)
UserUUID: userUUID,
SessionID: strconv.FormatUint(tokenData.ID, 10),
BeaconSizeLimit: e.cfg.BeaconSizeLimit,
})
}, startTime, r.URL.Path, 0)
}
func (e *Router) pushMessagesHandlerIOS(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r)
if err != nil {
ResponseWithError(w, http.StatusUnauthorized, err)
ResponseWithError(w, http.StatusUnauthorized, err, startTime, r.URL.Path, 0)
return
}
e.pushMessages(w, r, sessionData.ID, e.cfg.TopicRawIOS)
}
func (e *Router) pushLateMessagesHandlerIOS(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r)
if err != nil && err != token.EXPIRED {
ResponseWithError(w, http.StatusUnauthorized, err)
ResponseWithError(w, http.StatusUnauthorized, err, startTime, r.URL.Path, 0)
return
}
// Check timestamps here?
@ -117,16 +119,17 @@ func (e *Router) pushLateMessagesHandlerIOS(w http.ResponseWriter, r *http.Reque
}
func (e *Router) imagesUploadHandlerIOS(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
log.Printf("recieved imagerequest")
sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r)
if err != nil { // Should accept expired token?
ResponseWithError(w, http.StatusUnauthorized, err)
ResponseWithError(w, http.StatusUnauthorized, err, startTime, r.URL.Path, 0)
return
}
if r.Body == nil {
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"), startTime, r.URL.Path, 0)
return
}
r.Body = http.MaxBytesReader(w, r.Body, e.cfg.FileSizeLimit)
@ -134,21 +137,21 @@ func (e *Router) imagesUploadHandlerIOS(w http.ResponseWriter, r *http.Request)
err = r.ParseMultipartForm(1e6) // ~1Mb
if err == http.ErrNotMultipart || err == http.ErrMissingBoundary {
ResponseWithError(w, http.StatusUnsupportedMediaType, err)
ResponseWithError(w, http.StatusUnsupportedMediaType, err, startTime, r.URL.Path, 0)
return
// } else if err == multipart.ErrMessageTooLarge // if non-files part exceeds 10 MB
} else if err != nil {
ResponseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
ResponseWithError(w, http.StatusInternalServerError, err, startTime, r.URL.Path, 0) // TODO: send error here only on staging
return
}
if r.MultipartForm == nil {
ResponseWithError(w, http.StatusInternalServerError, errors.New("Multipart not parsed"))
ResponseWithError(w, http.StatusInternalServerError, errors.New("Multipart not parsed"), startTime, r.URL.Path, 0)
return
}
if len(r.MultipartForm.Value["projectKey"]) == 0 {
ResponseWithError(w, http.StatusBadRequest, errors.New("projectKey parameter missing")) // status for missing/wrong parameter?
ResponseWithError(w, http.StatusBadRequest, errors.New("projectKey parameter missing"), startTime, r.URL.Path, 0) // status for missing/wrong parameter?
return
}

View file

@ -3,18 +3,17 @@ package router
import (
"encoding/json"
"errors"
"github.com/Masterminds/semver"
"go.opentelemetry.io/otel/attribute"
"io"
"log"
"math/rand"
"net/http"
"openreplay/backend/internal/http/uuid"
"openreplay/backend/pkg/flakeid"
"strconv"
"time"
"github.com/Masterminds/semver"
"openreplay/backend/internal/http/uuid"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/flakeid"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/token"
)
@ -28,13 +27,6 @@ func (e *Router) readBody(w http.ResponseWriter, r *http.Request, limit int64) (
if err != nil {
return nil, err
}
reqSize := len(bodyBytes)
e.requestSize.Record(
r.Context(),
float64(reqSize),
[]attribute.KeyValue{attribute.String("method", r.URL.Path)}...,
)
return bodyBytes, nil
}
@ -56,40 +48,43 @@ func getSessionTimestamp(req *StartSessionRequest, startTimeMili int64) (ts uint
func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
// Check request body
if r.Body == nil {
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"), startTime, r.URL.Path, bodySize)
return
}
bodyBytes, err := e.readBody(w, r, e.cfg.JsonSizeLimit)
if err != nil {
log.Printf("error while reading request body: %s", err)
ResponseWithError(w, http.StatusRequestEntityTooLarge, err)
ResponseWithError(w, http.StatusRequestEntityTooLarge, err, startTime, r.URL.Path, bodySize)
return
}
bodySize = len(bodyBytes)
// Parse request body
req := &StartSessionRequest{}
if err := json.Unmarshal(bodyBytes, req); err != nil {
ResponseWithError(w, http.StatusBadRequest, err)
ResponseWithError(w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
// Handler's logic
if req.ProjectKey == nil {
ResponseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"))
ResponseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"), startTime, r.URL.Path, bodySize)
return
}
p, err := e.services.Database.GetProjectByKey(*req.ProjectKey)
if err != nil {
if postgres.IsNoRowsErr(err) {
ResponseWithError(w, http.StatusNotFound, errors.New("project doesn't exist or capture limit has been reached"))
ResponseWithError(w, http.StatusNotFound,
errors.New("project doesn't exist or capture limit has been reached"), startTime, r.URL.Path, bodySize)
} else {
log.Printf("can't get project by key: %s", err)
ResponseWithError(w, http.StatusInternalServerError, errors.New("can't get project by key"))
ResponseWithError(w, http.StatusInternalServerError, errors.New("can't get project by key"), startTime, r.URL.Path, bodySize)
}
return
}
@ -99,19 +94,19 @@ func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request)
if err != nil || req.Reset { // Starting the new one
dice := byte(rand.Intn(100)) // [0, 100)
if dice >= p.SampleRate {
ResponseWithError(w, http.StatusForbidden, errors.New("cancel"))
ResponseWithError(w, http.StatusForbidden, errors.New("cancel"), startTime, r.URL.Path, bodySize)
return
}
ua := e.services.UaParser.ParseFromHTTPRequest(r)
if ua == nil {
ResponseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
ResponseWithError(w, http.StatusForbidden, errors.New("browser not recognized"), startTime, r.URL.Path, bodySize)
return
}
startTimeMili := startTime.UnixMilli()
sessionID, err := e.services.Flaker.Compose(uint64(startTimeMili))
if err != nil {
ResponseWithError(w, http.StatusInternalServerError, err)
ResponseWithError(w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
// TODO: if EXPIRED => send message for two sessions association
@ -163,29 +158,33 @@ func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request)
BeaconSizeLimit: e.getBeaconSize(tokenData.ID),
StartTimestamp: int64(flakeid.ExtractTimestamp(tokenData.ID)),
Delay: tokenData.Delay,
})
}, startTime, r.URL.Path, bodySize)
}
func (e *Router) pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
// Check authorization
sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r)
if err != nil {
ResponseWithError(w, http.StatusUnauthorized, err)
ResponseWithError(w, http.StatusUnauthorized, err, startTime, r.URL.Path, bodySize)
return
}
// Check request body
if r.Body == nil {
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"), startTime, r.URL.Path, bodySize)
return
}
bodyBytes, err := e.readBody(w, r, e.getBeaconSize(sessionData.ID))
if err != nil {
log.Printf("error while reading request body: %s", err)
ResponseWithError(w, http.StatusRequestEntityTooLarge, err)
ResponseWithError(w, http.StatusRequestEntityTooLarge, err, startTime, r.URL.Path, bodySize)
return
}
bodySize = len(bodyBytes)
// Send processed messages to queue as array of bytes
// TODO: check bytes for nonsense crap
@ -194,39 +193,43 @@ func (e *Router) pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request)
log.Printf("can't send processed messages to queue: %s", err)
}
w.WriteHeader(http.StatusOK)
ResponseOK(w, startTime, r.URL.Path, bodySize)
}
func (e *Router) notStartedHandlerWeb(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
// Check request body
if r.Body == nil {
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"), startTime, r.URL.Path, bodySize)
return
}
bodyBytes, err := e.readBody(w, r, e.cfg.JsonSizeLimit)
if err != nil {
log.Printf("error while reading request body: %s", err)
ResponseWithError(w, http.StatusRequestEntityTooLarge, err)
ResponseWithError(w, http.StatusRequestEntityTooLarge, err, startTime, r.URL.Path, bodySize)
return
}
bodySize = len(bodyBytes)
// Parse request body
req := &NotStartedRequest{}
if err := json.Unmarshal(bodyBytes, req); err != nil {
ResponseWithError(w, http.StatusBadRequest, err)
ResponseWithError(w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
// Handler's logic
if req.ProjectKey == nil {
ResponseWithError(w, http.StatusForbidden, errors.New("projectKey value required"))
ResponseWithError(w, http.StatusForbidden, errors.New("projectKey value required"), startTime, r.URL.Path, bodySize)
return
}
ua := e.services.UaParser.ParseFromHTTPRequest(r) // TODO?: insert anyway
if ua == nil {
ResponseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
ResponseWithError(w, http.StatusForbidden, errors.New("browser not recognized"), startTime, r.URL.Path, bodySize)
return
}
country := e.services.GeoIP.ExtractISOCodeFromHTTPRequest(r)
@ -248,5 +251,5 @@ func (e *Router) notStartedHandlerWeb(w http.ResponseWriter, r *http.Request) {
log.Printf("Unable to insert Unstarted Session: %v\n", err)
}
w.WriteHeader(http.StatusOK)
ResponseOK(w, startTime, r.URL.Path, bodySize)
}

View file

@ -6,9 +6,11 @@ import (
"io/ioutil"
"log"
"net/http"
"time"
)
func (e *Router) pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64, topicName string) {
start := time.Now()
body := http.MaxBytesReader(w, r.Body, e.cfg.BeaconSizeLimit)
defer body.Close()
@ -21,7 +23,7 @@ func (e *Router) pushMessages(w http.ResponseWriter, r *http.Request, sessionID
reader, err = gzip.NewReader(body)
if err != nil {
ResponseWithError(w, http.StatusInternalServerError, err) // TODO: stage-dependent response
ResponseWithError(w, http.StatusInternalServerError, err, start, r.URL.Path, 0) // TODO: stage-dependent response
return
}
//log.Println("Gzip reader init", reader)
@ -32,7 +34,7 @@ func (e *Router) pushMessages(w http.ResponseWriter, r *http.Request, sessionID
//log.Println("Reader after switch:", reader)
buf, err := ioutil.ReadAll(reader)
if err != nil {
ResponseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
ResponseWithError(w, http.StatusInternalServerError, err, start, r.URL.Path, 0) // TODO: send error here only on staging
return
}
e.services.Producer.Produce(topicName, sessionID, buf) // What if not able to send?

View file

@ -4,21 +4,44 @@ import (
"encoding/json"
"log"
"net/http"
"time"
metrics "openreplay/backend/pkg/metrics/http"
)
func ResponseWithJSON(w http.ResponseWriter, res interface{}) {
func recordMetrics(requestStart time.Time, url string, code, bodySize int) {
if bodySize > 0 {
metrics.RecordRequestSize(float64(bodySize), url, code)
}
metrics.IncreaseTotalRequests()
metrics.RecordRequestDuration(float64(time.Now().Sub(requestStart).Milliseconds()), url, code)
}
func ResponseOK(w http.ResponseWriter, requestStart time.Time, url string, bodySize int) {
w.WriteHeader(http.StatusOK)
recordMetrics(requestStart, url, http.StatusOK, bodySize)
}
func ResponseWithJSON(w http.ResponseWriter, res interface{}, requestStart time.Time, url string, bodySize int) {
body, err := json.Marshal(res)
if err != nil {
log.Println(err)
}
w.Header().Set("Content-Type", "application/json")
w.Write(body)
recordMetrics(requestStart, url, http.StatusOK, bodySize)
}
func ResponseWithError(w http.ResponseWriter, code int, err error) {
type response struct {
Error string `json:"error"`
type response struct {
Error string `json:"error"`
}
func ResponseWithError(w http.ResponseWriter, code int, err error, requestStart time.Time, url string, bodySize int) {
body, err := json.Marshal(&response{err.Error()})
if err != nil {
log.Println(err)
}
w.WriteHeader(code)
ResponseWithJSON(w, &response{err.Error()})
w.Write(body)
recordMetrics(requestStart, url, code, bodySize)
}

View file

@ -1,19 +1,16 @@
package router
import (
"context"
"fmt"
"github.com/gorilla/mux"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"log"
"net/http"
"sync"
"time"
"github.com/gorilla/mux"
http3 "openreplay/backend/internal/config/http"
http2 "openreplay/backend/internal/http/services"
"openreplay/backend/internal/http/util"
"openreplay/backend/pkg/monitoring"
"sync"
"time"
)
type BeaconSize struct {
@ -25,21 +22,16 @@ type Router struct {
router *mux.Router
cfg *http3.Config
services *http2.ServicesBuilder
requestSize syncfloat64.Histogram
requestDuration syncfloat64.Histogram
totalRequests syncfloat64.Counter
mutex *sync.RWMutex
beaconSizeCache map[uint64]*BeaconSize // Cache for session's beaconSize
}
func NewRouter(cfg *http3.Config, services *http2.ServicesBuilder, metrics *monitoring.Metrics) (*Router, error) {
func NewRouter(cfg *http3.Config, services *http2.ServicesBuilder) (*Router, error) {
switch {
case cfg == nil:
return nil, fmt.Errorf("config is empty")
case services == nil:
return nil, fmt.Errorf("services is empty")
case metrics == nil:
return nil, fmt.Errorf("metrics is empty")
}
e := &Router{
cfg: cfg,
@ -47,7 +39,6 @@ func NewRouter(cfg *http3.Config, services *http2.ServicesBuilder, metrics *moni
mutex: &sync.RWMutex{},
beaconSizeCache: make(map[uint64]*BeaconSize),
}
e.initMetrics(metrics)
e.init()
go e.clearBeaconSizes()
return e, nil
@ -115,22 +106,6 @@ func (e *Router) init() {
e.router.Use(e.corsMiddleware)
}
func (e *Router) initMetrics(metrics *monitoring.Metrics) {
var err error
e.requestSize, err = metrics.RegisterHistogram("requests_body_size")
if err != nil {
log.Printf("can't create requests_body_size metric: %s", err)
}
e.requestDuration, err = metrics.RegisterHistogram("requests_duration")
if err != nil {
log.Printf("can't create requests_duration metric: %s", err)
}
e.totalRequests, err = metrics.RegisterCounter("requests_total")
if err != nil {
log.Printf("can't create requests_total metric: %s", err)
}
}
func (e *Router) root(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}
@ -149,17 +124,8 @@ func (e *Router) corsMiddleware(next http.Handler) http.Handler {
log.Printf("Request: %v - %v ", r.Method, util.SafeString(r.URL.Path))
requestStart := time.Now()
// Serve request
next.ServeHTTP(w, r)
metricsContext, _ := context.WithTimeout(context.Background(), time.Millisecond*100)
e.totalRequests.Add(metricsContext, 1)
e.requestDuration.Record(metricsContext,
float64(time.Now().Sub(requestStart).Milliseconds()),
[]attribute.KeyValue{attribute.String("method", r.URL.Path)}...,
)
})
}

View file

@ -1,13 +1,11 @@
package sessionender
import (
"context"
"fmt"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"log"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/monitoring"
"time"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/metrics/ender"
)
// EndedSessionHandler handler for ended sessions
@ -23,32 +21,16 @@ type session struct {
// SessionEnder updates timestamp of last message for each session
type SessionEnder struct {
timeout int64
sessions map[uint64]*session // map[sessionID]session
timeCtrl *timeController
activeSessions syncfloat64.UpDownCounter
totalSessions syncfloat64.Counter
timeout int64
sessions map[uint64]*session // map[sessionID]session
timeCtrl *timeController
}
func New(metrics *monitoring.Metrics, timeout int64, parts int) (*SessionEnder, error) {
if metrics == nil {
return nil, fmt.Errorf("metrics module is empty")
}
activeSessions, err := metrics.RegisterUpDownCounter("sessions_active")
if err != nil {
return nil, fmt.Errorf("can't register session.active metric: %s", err)
}
totalSessions, err := metrics.RegisterCounter("sessions_total")
if err != nil {
return nil, fmt.Errorf("can't register session.total metric: %s", err)
}
func New(timeout int64, parts int) (*SessionEnder, error) {
return &SessionEnder{
timeout: timeout,
sessions: make(map[uint64]*session),
timeCtrl: NewTimeController(parts),
activeSessions: activeSessions,
totalSessions: totalSessions,
timeout: timeout,
sessions: make(map[uint64]*session),
timeCtrl: NewTimeController(parts),
}, nil
}
@ -74,8 +56,8 @@ func (se *SessionEnder) UpdateSession(msg messages.Message) {
lastUserTime: msgTimestamp, // last timestamp from user's machine
isEnded: false,
}
se.activeSessions.Add(context.Background(), 1)
se.totalSessions.Add(context.Background(), 1)
ender.IncreaseActiveSessions()
ender.IncreaseTotalSessions()
return
}
// Keep the highest user's timestamp for correct session duration value
@ -100,7 +82,8 @@ func (se *SessionEnder) HandleEndedSessions(handler EndedSessionHandler) {
sess.isEnded = true
if handler(sessID, sess.lastUserTime) {
delete(se.sessions, sessID)
se.activeSessions.Add(context.Background(), -1)
ender.DecreaseActiveSessions()
ender.IncreaseClosedSessions()
removedSessions++
} else {
log.Printf("sessID: %d, userTime: %d", sessID, sess.lastUserTime)

View file

@ -1,20 +1,19 @@
package assetscache
import (
"context"
"crypto/md5"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"io"
"log"
"net/url"
"openreplay/backend/internal/config/sink"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/monitoring"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/url/assets"
metrics "openreplay/backend/pkg/metrics/sink"
"strings"
"sync"
"time"
"openreplay/backend/internal/config/sink"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/url/assets"
)
type CachedAsset struct {
@ -23,52 +22,21 @@ type CachedAsset struct {
}
type AssetsCache struct {
mutex sync.RWMutex
cfg *sink.Config
rewriter *assets.Rewriter
producer types.Producer
cache map[string]*CachedAsset
blackList []string // use "example.com" to filter all domains or ".example.com" to filter only third-level domain
totalAssets syncfloat64.Counter
cachedAssets syncfloat64.Counter
skippedAssets syncfloat64.Counter
assetSize syncfloat64.Histogram
assetDuration syncfloat64.Histogram
mutex sync.RWMutex
cfg *sink.Config
rewriter *assets.Rewriter
producer types.Producer
cache map[string]*CachedAsset
blackList []string // use "example.com" to filter all domains or ".example.com" to filter only third-level domain
}
func New(cfg *sink.Config, rewriter *assets.Rewriter, producer types.Producer, metrics *monitoring.Metrics) *AssetsCache {
// Assets metrics
totalAssets, err := metrics.RegisterCounter("assets_total")
if err != nil {
log.Printf("can't create assets_total metric: %s", err)
}
cachedAssets, err := metrics.RegisterCounter("assets_cached")
if err != nil {
log.Printf("can't create assets_cached metric: %s", err)
}
skippedAssets, err := metrics.RegisterCounter("assets_skipped")
if err != nil {
log.Printf("can't create assets_skipped metric: %s", err)
}
assetSize, err := metrics.RegisterHistogram("asset_size")
if err != nil {
log.Printf("can't create asset_size metric: %s", err)
}
assetDuration, err := metrics.RegisterHistogram("asset_duration")
if err != nil {
log.Printf("can't create asset_duration metric: %s", err)
}
func New(cfg *sink.Config, rewriter *assets.Rewriter, producer types.Producer) *AssetsCache {
assetsCache := &AssetsCache{
cfg: cfg,
rewriter: rewriter,
producer: producer,
cache: make(map[string]*CachedAsset, 64),
blackList: make([]string, 0),
totalAssets: totalAssets,
cachedAssets: cachedAssets,
skippedAssets: skippedAssets,
assetSize: assetSize,
assetDuration: assetDuration,
cfg: cfg,
rewriter: rewriter,
producer: producer,
cache: make(map[string]*CachedAsset, 64),
blackList: make([]string, 0),
}
// Parse black list for cache layer
if len(cfg.CacheBlackList) > 0 {
@ -84,7 +52,7 @@ func New(cfg *sink.Config, rewriter *assets.Rewriter, producer types.Producer, m
}
func (e *AssetsCache) cleaner() {
cleanTick := time.Tick(time.Minute * 30)
cleanTick := time.Tick(time.Minute * 3)
for {
select {
case <-cleanTick:
@ -105,6 +73,7 @@ func (e *AssetsCache) clearCache() {
if int64(now.Sub(cache.ts).Minutes()) > e.cfg.CacheExpiration {
deleted++
delete(e.cache, id)
metrics.DecreaseCachedAssets()
}
}
log.Printf("cache cleaner: deleted %d/%d assets", deleted, cacheSize)
@ -232,8 +201,7 @@ func parseHost(baseURL string) (string, error) {
}
func (e *AssetsCache) handleCSS(sessionID uint64, baseURL string, css string) string {
ctx := context.Background()
e.totalAssets.Add(ctx, 1)
metrics.IncreaseTotalAssets()
// Try to find asset in cache
h := md5.New()
// Cut first part of url (scheme + host)
@ -255,7 +223,7 @@ func (e *AssetsCache) handleCSS(sessionID uint64, baseURL string, css string) st
e.mutex.RUnlock()
if ok {
if int64(time.Now().Sub(cachedAsset.ts).Minutes()) < e.cfg.CacheExpiration {
e.skippedAssets.Add(ctx, 1)
metrics.IncreaseSkippedAssets()
return cachedAsset.msg
}
}
@ -267,8 +235,8 @@ func (e *AssetsCache) handleCSS(sessionID uint64, baseURL string, css string) st
start := time.Now()
res := e.getRewrittenCSS(sessionID, baseURL, css)
duration := time.Now().Sub(start).Milliseconds()
e.assetSize.Record(ctx, float64(len(res)))
e.assetDuration.Record(ctx, float64(duration))
metrics.RecordAssetSize(float64(len(res)))
metrics.RecordProcessAssetDuration(float64(duration))
// Save asset to cache if we spent more than threshold
if duration > e.cfg.CacheThreshold {
e.mutex.Lock()
@ -277,7 +245,7 @@ func (e *AssetsCache) handleCSS(sessionID uint64, baseURL string, css string) st
ts: time.Now(),
}
e.mutex.Unlock()
e.cachedAssets.Add(ctx, 1)
metrics.IncreaseCachedAssets()
}
// Return rewritten asset
return res

View file

@ -2,20 +2,20 @@ package storage
import (
"bytes"
"context"
"fmt"
gzip "github.com/klauspost/pgzip"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"log"
config "openreplay/backend/internal/config/storage"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/monitoring"
"openreplay/backend/pkg/storage"
"os"
"strconv"
"strings"
"sync"
"time"
config "openreplay/backend/internal/config/storage"
"openreplay/backend/pkg/messages"
metrics "openreplay/backend/pkg/metrics/storage"
"openreplay/backend/pkg/storage"
gzip "github.com/klauspost/pgzip"
)
type FileType string
@ -25,6 +25,13 @@ const (
DEV FileType = "/devtools.mob"
)
func (t FileType) String() string {
if t == DOM {
return "dom"
}
return "devtools"
}
type Task struct {
id string
doms *bytes.Buffer
@ -36,92 +43,23 @@ type Storage struct {
cfg *config.Config
s3 *storage.S3
startBytes []byte
totalSessions syncfloat64.Counter
sessionDOMSize syncfloat64.Histogram
sessionDEVSize syncfloat64.Histogram
readingDOMTime syncfloat64.Histogram
readingDEVTime syncfloat64.Histogram
sortingDOMTime syncfloat64.Histogram
sortingDEVTime syncfloat64.Histogram
archivingDOMTime syncfloat64.Histogram
archivingDEVTime syncfloat64.Histogram
uploadingDOMTime syncfloat64.Histogram
uploadingDEVTime syncfloat64.Histogram
tasks chan *Task
ready chan struct{}
tasks chan *Task
ready chan struct{}
}
func New(cfg *config.Config, s3 *storage.S3, metrics *monitoring.Metrics) (*Storage, error) {
func New(cfg *config.Config, s3 *storage.S3) (*Storage, error) {
switch {
case cfg == nil:
return nil, fmt.Errorf("config is empty")
case s3 == nil:
return nil, fmt.Errorf("s3 storage is empty")
}
// Create metrics
totalSessions, err := metrics.RegisterCounter("sessions_total")
if err != nil {
log.Printf("can't create sessions_total metric: %s", err)
}
sessionDOMSize, err := metrics.RegisterHistogram("sessions_size")
if err != nil {
log.Printf("can't create session_size metric: %s", err)
}
sessionDevtoolsSize, err := metrics.RegisterHistogram("sessions_dt_size")
if err != nil {
log.Printf("can't create sessions_dt_size metric: %s", err)
}
readingDOMTime, err := metrics.RegisterHistogram("reading_duration")
if err != nil {
log.Printf("can't create reading_duration metric: %s", err)
}
readingDEVTime, err := metrics.RegisterHistogram("reading_dt_duration")
if err != nil {
log.Printf("can't create reading_duration metric: %s", err)
}
sortingDOMTime, err := metrics.RegisterHistogram("sorting_duration")
if err != nil {
log.Printf("can't create reading_duration metric: %s", err)
}
sortingDEVTime, err := metrics.RegisterHistogram("sorting_dt_duration")
if err != nil {
log.Printf("can't create reading_duration metric: %s", err)
}
archivingDOMTime, err := metrics.RegisterHistogram("archiving_duration")
if err != nil {
log.Printf("can't create archiving_duration metric: %s", err)
}
archivingDEVTime, err := metrics.RegisterHistogram("archiving_dt_duration")
if err != nil {
log.Printf("can't create archiving_duration metric: %s", err)
}
uploadingDOMTime, err := metrics.RegisterHistogram("uploading_duration")
if err != nil {
log.Printf("can't create uploading_duration metric: %s", err)
}
uploadingDEVTime, err := metrics.RegisterHistogram("uploading_dt_duration")
if err != nil {
log.Printf("can't create uploading_duration metric: %s", err)
}
newStorage := &Storage{
cfg: cfg,
s3: s3,
startBytes: make([]byte, cfg.FileSplitSize),
totalSessions: totalSessions,
sessionDOMSize: sessionDOMSize,
sessionDEVSize: sessionDevtoolsSize,
readingDOMTime: readingDOMTime,
readingDEVTime: readingDEVTime,
sortingDOMTime: sortingDOMTime,
sortingDEVTime: sortingDEVTime,
archivingDOMTime: archivingDOMTime,
archivingDEVTime: archivingDEVTime,
uploadingDOMTime: uploadingDOMTime,
uploadingDEVTime: uploadingDEVTime,
tasks: make(chan *Task, 1),
ready: make(chan struct{}),
cfg: cfg,
s3: s3,
startBytes: make([]byte, cfg.FileSplitSize),
tasks: make(chan *Task, 1),
ready: make(chan struct{}),
}
go newStorage.worker()
return newStorage, nil
@ -187,11 +125,7 @@ func (s *Storage) openSession(filePath string, tp FileType) ([]byte, error) {
if err != nil {
return nil, fmt.Errorf("can't sort session, err: %s", err)
}
if tp == DOM {
s.sortingDOMTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()))
} else {
s.sortingDEVTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()))
}
metrics.RecordSessionSortDuration(float64(time.Now().Sub(start).Milliseconds()), tp.String())
return res, nil
}
@ -215,26 +149,19 @@ func (s *Storage) prepareSession(path string, tp FileType, task *Task) error {
if err != nil {
return err
}
durRead := time.Now().Sub(startRead).Milliseconds()
// Send metrics
ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*200)
if tp == DOM {
s.sessionDOMSize.Record(ctx, float64(len(mob)))
s.readingDOMTime.Record(ctx, float64(durRead))
} else {
s.sessionDEVSize.Record(ctx, float64(len(mob)))
s.readingDEVTime.Record(ctx, float64(durRead))
}
metrics.RecordSessionSize(float64(len(mob)), tp.String())
metrics.RecordSessionReadDuration(float64(time.Now().Sub(startRead).Milliseconds()), tp.String())
// Encode and compress session
if tp == DEV {
startCompress := time.Now()
start := time.Now()
task.dev = s.compressSession(mob)
s.archivingDEVTime.Record(ctx, float64(time.Now().Sub(startCompress).Milliseconds()))
metrics.RecordSessionCompressDuration(float64(time.Now().Sub(start).Milliseconds()), tp.String())
} else {
if len(mob) <= s.cfg.FileSplitSize {
startCompress := time.Now()
start := time.Now()
task.doms = s.compressSession(mob)
s.archivingDOMTime.Record(ctx, float64(time.Now().Sub(startCompress).Milliseconds()))
metrics.RecordSessionCompressDuration(float64(time.Now().Sub(start).Milliseconds()), tp.String())
return nil
}
wg := &sync.WaitGroup{}
@ -253,7 +180,7 @@ func (s *Storage) prepareSession(path string, tp FileType, task *Task) error {
wg.Done()
}()
wg.Wait()
s.archivingDOMTime.Record(ctx, float64(firstPart+secondPart))
metrics.RecordSessionCompressDuration(float64(firstPart+secondPart), tp.String())
}
return nil
}
@ -324,11 +251,9 @@ func (s *Storage) uploadSession(task *Task) {
wg.Done()
}()
wg.Wait()
// Record metrics
ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*200)
s.uploadingDOMTime.Record(ctx, float64(uploadDoms+uploadDome))
s.uploadingDEVTime.Record(ctx, float64(uploadDev))
s.totalSessions.Add(ctx, 1)
metrics.RecordSessionUploadDuration(float64(uploadDoms+uploadDome), DOM.String())
metrics.RecordSessionUploadDuration(float64(uploadDev), DEV.String())
metrics.IncreaseStorageTotalSessions()
}
func (s *Storage) worker() {

View file

@ -1,14 +1,13 @@
package postgres
import (
"context"
"github.com/jackc/pgx/v4"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"log"
"openreplay/backend/pkg/monitoring"
"strings"
"time"
"openreplay/backend/pkg/metrics/database"
"github.com/jackc/pgx/v4"
)
type batchItem struct {
@ -78,21 +77,17 @@ func NewBatchesTask(size int) *batchesTask {
}
type BatchSet struct {
c Pool
batches map[uint64]*SessionBatch
batchQueueLimit int
batchSizeLimit int
batchSizeBytes syncfloat64.Histogram
batchSizeLines syncfloat64.Histogram
sqlRequestTime syncfloat64.Histogram
sqlRequestCounter syncfloat64.Counter
updates map[uint64]*sessionUpdates
workerTask chan *batchesTask
done chan struct{}
finished chan struct{}
c Pool
batches map[uint64]*SessionBatch
batchQueueLimit int
batchSizeLimit int
updates map[uint64]*sessionUpdates
workerTask chan *batchesTask
done chan struct{}
finished chan struct{}
}
func NewBatchSet(c Pool, queueLimit, sizeLimit int, metrics *monitoring.Metrics) *BatchSet {
func NewBatchSet(c Pool, queueLimit, sizeLimit int) *BatchSet {
bs := &BatchSet{
c: c,
batches: make(map[uint64]*SessionBatch),
@ -103,31 +98,10 @@ func NewBatchSet(c Pool, queueLimit, sizeLimit int, metrics *monitoring.Metrics)
finished: make(chan struct{}),
updates: make(map[uint64]*sessionUpdates),
}
bs.initMetrics(metrics)
go bs.worker()
return bs
}
func (conn *BatchSet) initMetrics(metrics *monitoring.Metrics) {
var err error
conn.batchSizeBytes, err = metrics.RegisterHistogram("batch_size_bytes")
if err != nil {
log.Printf("can't create batchSizeBytes metric: %s", err)
}
conn.batchSizeLines, err = metrics.RegisterHistogram("batch_size_lines")
if err != nil {
log.Printf("can't create batchSizeLines metric: %s", err)
}
conn.sqlRequestTime, err = metrics.RegisterHistogram("sql_request_time")
if err != nil {
log.Printf("can't create sqlRequestTime metric: %s", err)
}
conn.sqlRequestCounter, err = metrics.RegisterCounter("sql_request_number")
if err != nil {
log.Printf("can't create sqlRequestNumber metric: %s", err)
}
}
func (conn *BatchSet) getBatch(sessionID uint64) *SessionBatch {
sessionID = sessionID % 10
if _, ok := conn.batches[sessionID]; !ok {
@ -194,11 +168,10 @@ func (conn *BatchSet) sendBatches(t *batchesTask) {
// Append session update sql request to the end of batch
batch.Prepare()
// Record batch size in bytes and number of lines
conn.batchSizeBytes.Record(context.Background(), float64(batch.Size()))
conn.batchSizeLines.Record(context.Background(), float64(batch.Len()))
database.RecordBatchSize(float64(batch.Size()))
database.RecordBatchElements(float64(batch.Len()))
start := time.Now()
isFailed := false
// Send batch to db and execute
br := conn.c.SendBatch(batch.batch)
@ -209,15 +182,11 @@ func (conn *BatchSet) sendBatches(t *batchesTask) {
failedSql := batch.items[i]
query := strings.ReplaceAll(failedSql.query, "\n", " ")
log.Println("failed sql req:", query, failedSql.arguments)
isFailed = true
}
}
br.Close() // returns err
dur := time.Now().Sub(start).Milliseconds()
conn.sqlRequestTime.Record(context.Background(), float64(dur),
attribute.String("method", "batch"), attribute.Bool("failed", isFailed))
conn.sqlRequestCounter.Add(context.Background(), 1,
attribute.String("method", "batch"), attribute.Bool("failed", isFailed))
database.RecordBatchInsertDuration(float64(time.Now().Sub(start).Milliseconds()))
database.IncreaseTotalBatches()
}
}

View file

@ -2,13 +2,9 @@ package postgres
import (
"bytes"
"context"
"errors"
"fmt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"log"
"openreplay/backend/pkg/monitoring"
"openreplay/backend/pkg/metrics/database"
"time"
)
@ -25,15 +21,13 @@ type Bulk interface {
}
type bulkImpl struct {
conn Pool
table string
columns string
template string
setSize int
sizeLimit int
values []interface{}
bulkSize syncfloat64.Histogram
bulkDuration syncfloat64.Histogram
conn Pool
table string
columns string
template string
setSize int
sizeLimit int
values []interface{}
}
func (b *bulkImpl) Append(args ...interface{}) error {
@ -79,18 +73,15 @@ func (b *bulkImpl) send() error {
return fmt.Errorf("send bulk err: %s", err)
}
// Save bulk metrics
ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*200)
b.bulkDuration.Record(ctx, float64(time.Now().Sub(start).Milliseconds()), attribute.String("table", b.table))
b.bulkSize.Record(ctx, float64(size), attribute.String("table", b.table))
database.RecordBulkElements(float64(size), "pg", b.table)
database.RecordBulkInsertDuration(float64(time.Now().Sub(start).Milliseconds()), "pg", b.table)
return nil
}
func NewBulk(conn Pool, metrics *monitoring.Metrics, table, columns, template string, setSize, sizeLimit int) (Bulk, error) {
func NewBulk(conn Pool, table, columns, template string, setSize, sizeLimit int) (Bulk, error) {
switch {
case conn == nil:
return nil, errors.New("db conn is empty")
case metrics == nil:
return nil, errors.New("metrics is empty")
case table == "":
return nil, errors.New("table is empty")
case columns == "":
@ -102,23 +93,13 @@ func NewBulk(conn Pool, metrics *monitoring.Metrics, table, columns, template st
case sizeLimit <= 0:
return nil, errors.New("size limit is wrong")
}
messagesInBulk, err := metrics.RegisterHistogram("messages_in_bulk")
if err != nil {
log.Printf("can't create messages_size metric: %s", err)
}
bulkInsertDuration, err := metrics.RegisterHistogram("bulk_insert_duration")
if err != nil {
log.Printf("can't create messages_size metric: %s", err)
}
return &bulkImpl{
conn: conn,
table: table,
columns: columns,
template: template,
setSize: setSize,
sizeLimit: sizeLimit,
values: make([]interface{}, 0, setSize*sizeLimit),
bulkSize: messagesInBulk,
bulkDuration: bulkInsertDuration,
conn: conn,
table: table,
columns: columns,
template: template,
setSize: setSize,
sizeLimit: sizeLimit,
values: make([]interface{}, 0, setSize*sizeLimit),
}, nil
}

View file

@ -2,7 +2,6 @@ package postgres
import (
"log"
"openreplay/backend/pkg/monitoring"
"time"
)
@ -30,16 +29,14 @@ type BulkSet struct {
webCustomEvents Bulk
webClickEvents Bulk
webNetworkRequest Bulk
metrics *monitoring.Metrics
workerTask chan *bulksTask
done chan struct{}
finished chan struct{}
}
func NewBulkSet(c Pool, metrics *monitoring.Metrics) *BulkSet {
func NewBulkSet(c Pool) *BulkSet {
bs := &BulkSet{
c: c,
metrics: metrics,
workerTask: make(chan *bulksTask, 1),
done: make(chan struct{}),
finished: make(chan struct{}),
@ -86,7 +83,7 @@ func (conn *BulkSet) Get(name string) Bulk {
func (conn *BulkSet) initBulks() {
var err error
conn.autocompletes, err = NewBulk(conn.c, conn.metrics,
conn.autocompletes, err = NewBulk(conn.c,
"autocomplete",
"(value, type, project_id)",
"($%d, $%d, $%d)",
@ -94,7 +91,7 @@ func (conn *BulkSet) initBulks() {
if err != nil {
log.Fatalf("can't create autocomplete bulk: %s", err)
}
conn.requests, err = NewBulk(conn.c, conn.metrics,
conn.requests, err = NewBulk(conn.c,
"events_common.requests",
"(session_id, timestamp, seq_index, url, duration, success)",
"($%d, $%d, $%d, LEFT($%d, 8000), $%d, $%d)",
@ -102,7 +99,7 @@ func (conn *BulkSet) initBulks() {
if err != nil {
log.Fatalf("can't create requests bulk: %s", err)
}
conn.customEvents, err = NewBulk(conn.c, conn.metrics,
conn.customEvents, err = NewBulk(conn.c,
"events_common.customs",
"(session_id, timestamp, seq_index, name, payload)",
"($%d, $%d, $%d, LEFT($%d, 2000), $%d)",
@ -110,7 +107,7 @@ func (conn *BulkSet) initBulks() {
if err != nil {
log.Fatalf("can't create customEvents bulk: %s", err)
}
conn.webPageEvents, err = NewBulk(conn.c, conn.metrics,
conn.webPageEvents, err = NewBulk(conn.c,
"events.pages",
"(session_id, message_id, timestamp, referrer, base_referrer, host, path, query, dom_content_loaded_time, "+
"load_time, response_end, first_paint_time, first_contentful_paint_time, speed_index, visually_complete, "+
@ -122,7 +119,7 @@ func (conn *BulkSet) initBulks() {
if err != nil {
log.Fatalf("can't create webPageEvents bulk: %s", err)
}
conn.webInputEvents, err = NewBulk(conn.c, conn.metrics,
conn.webInputEvents, err = NewBulk(conn.c,
"events.inputs",
"(session_id, message_id, timestamp, value, label)",
"($%d, $%d, $%d, LEFT($%d, 2000), NULLIF(LEFT($%d, 2000),''))",
@ -130,7 +127,7 @@ func (conn *BulkSet) initBulks() {
if err != nil {
log.Fatalf("can't create webPageEvents bulk: %s", err)
}
conn.webGraphQL, err = NewBulk(conn.c, conn.metrics,
conn.webGraphQL, err = NewBulk(conn.c,
"events.graphql",
"(session_id, timestamp, message_id, name, request_body, response_body)",
"($%d, $%d, $%d, LEFT($%d, 2000), $%d, $%d)",
@ -138,7 +135,7 @@ func (conn *BulkSet) initBulks() {
if err != nil {
log.Fatalf("can't create webPageEvents bulk: %s", err)
}
conn.webErrors, err = NewBulk(conn.c, conn.metrics,
conn.webErrors, err = NewBulk(conn.c,
"errors",
"(error_id, project_id, source, name, message, payload)",
"($%d, $%d, $%d, $%d, $%d, $%d::jsonb)",
@ -146,7 +143,7 @@ func (conn *BulkSet) initBulks() {
if err != nil {
log.Fatalf("can't create webErrors bulk: %s", err)
}
conn.webErrorEvents, err = NewBulk(conn.c, conn.metrics,
conn.webErrorEvents, err = NewBulk(conn.c,
"events.errors",
"(session_id, message_id, timestamp, error_id)",
"($%d, $%d, $%d, $%d)",
@ -154,7 +151,7 @@ func (conn *BulkSet) initBulks() {
if err != nil {
log.Fatalf("can't create webErrorEvents bulk: %s", err)
}
conn.webErrorTags, err = NewBulk(conn.c, conn.metrics,
conn.webErrorTags, err = NewBulk(conn.c,
"public.errors_tags",
"(session_id, message_id, error_id, key, value)",
"($%d, $%d, $%d, $%d, $%d)",
@ -162,7 +159,7 @@ func (conn *BulkSet) initBulks() {
if err != nil {
log.Fatalf("can't create webErrorEvents bulk: %s", err)
}
conn.webIssues, err = NewBulk(conn.c, conn.metrics,
conn.webIssues, err = NewBulk(conn.c,
"issues",
"(project_id, issue_id, type, context_string)",
"($%d, $%d, $%d, $%d)",
@ -170,7 +167,7 @@ func (conn *BulkSet) initBulks() {
if err != nil {
log.Fatalf("can't create webIssues bulk: %s", err)
}
conn.webIssueEvents, err = NewBulk(conn.c, conn.metrics,
conn.webIssueEvents, err = NewBulk(conn.c,
"events_common.issues",
"(session_id, issue_id, timestamp, seq_index, payload)",
"($%d, $%d, $%d, $%d, CAST($%d AS jsonb))",
@ -178,7 +175,7 @@ func (conn *BulkSet) initBulks() {
if err != nil {
log.Fatalf("can't create webIssueEvents bulk: %s", err)
}
conn.webCustomEvents, err = NewBulk(conn.c, conn.metrics,
conn.webCustomEvents, err = NewBulk(conn.c,
"events_common.customs",
"(session_id, seq_index, timestamp, name, payload, level)",
"($%d, $%d, $%d, LEFT($%d, 2000), $%d, $%d)",
@ -186,7 +183,7 @@ func (conn *BulkSet) initBulks() {
if err != nil {
log.Fatalf("can't create webCustomEvents bulk: %s", err)
}
conn.webClickEvents, err = NewBulk(conn.c, conn.metrics,
conn.webClickEvents, err = NewBulk(conn.c,
"events.clicks",
"(session_id, message_id, timestamp, label, selector, url, path)",
"($%d, $%d, $%d, NULLIF(LEFT($%d, 2000), ''), LEFT($%d, 8000), LEFT($%d, 2000), LEFT($%d, 2000))",
@ -194,7 +191,7 @@ func (conn *BulkSet) initBulks() {
if err != nil {
log.Fatalf("can't create webClickEvents bulk: %s", err)
}
conn.webNetworkRequest, err = NewBulk(conn.c, conn.metrics,
conn.webNetworkRequest, err = NewBulk(conn.c,
"events_common.requests",
"(session_id, timestamp, seq_index, url, host, path, query, request_body, response_body, status_code, method, duration, success)",
"($%d, $%d, $%d, LEFT($%d, 8000), LEFT($%d, 300), LEFT($%d, 2000), LEFT($%d, 8000), $%d, $%d, $%d::smallint, NULLIF($%d, '')::http_method, $%d, $%d)",

View file

@ -2,11 +2,10 @@ package postgres
import (
"context"
"github.com/jackc/pgx/v4/pgxpool"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"log"
"github.com/jackc/pgx/v4/pgxpool"
"openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/monitoring"
)
type CH interface {
@ -15,36 +14,28 @@ type CH interface {
// Conn contains batches, bulks and cache for all sessions
type Conn struct {
c Pool
batches *BatchSet
bulks *BulkSet
batchSizeBytes syncfloat64.Histogram
batchSizeLines syncfloat64.Histogram
sqlRequestTime syncfloat64.Histogram
sqlRequestCounter syncfloat64.Counter
chConn CH
c Pool
batches *BatchSet
bulks *BulkSet
chConn CH
}
func (conn *Conn) SetClickHouse(ch CH) {
conn.chConn = ch
}
func NewConn(url string, queueLimit, sizeLimit int, metrics *monitoring.Metrics) *Conn {
if metrics == nil {
log.Fatalf("metrics is nil")
}
func NewConn(url string, queueLimit, sizeLimit int) *Conn {
c, err := pgxpool.Connect(context.Background(), url)
if err != nil {
log.Fatalf("pgxpool.Connect err: %s", err)
}
conn := &Conn{}
conn.initMetrics(metrics)
conn.c, err = NewPool(c, conn.sqlRequestTime, conn.sqlRequestCounter)
conn.c, err = NewPool(c)
if err != nil {
log.Fatalf("can't create new pool wrapper: %s", err)
}
conn.bulks = NewBulkSet(conn.c, metrics)
conn.batches = NewBatchSet(conn.c, queueLimit, sizeLimit, metrics)
conn.bulks = NewBulkSet(conn.c)
conn.batches = NewBatchSet(conn.c, queueLimit, sizeLimit)
return conn
}
@ -55,26 +46,6 @@ func (conn *Conn) Close() error {
return nil
}
func (conn *Conn) initMetrics(metrics *monitoring.Metrics) {
var err error
conn.batchSizeBytes, err = metrics.RegisterHistogram("batch_size_bytes")
if err != nil {
log.Printf("can't create batchSizeBytes metric: %s", err)
}
conn.batchSizeLines, err = metrics.RegisterHistogram("batch_size_lines")
if err != nil {
log.Printf("can't create batchSizeLines metric: %s", err)
}
conn.sqlRequestTime, err = metrics.RegisterHistogram("sql_request_time")
if err != nil {
log.Printf("can't create sqlRequestTime metric: %s", err)
}
conn.sqlRequestCounter, err = metrics.RegisterCounter("sql_request_number")
if err != nil {
log.Printf("can't create sqlRequestNumber metric: %s", err)
}
}
func (conn *Conn) insertAutocompleteValue(sessionID uint64, projectID uint32, tp string, value string) {
if len(value) == 0 {
return

View file

@ -3,12 +3,12 @@ package postgres
import (
"context"
"errors"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"strings"
"time"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"openreplay/backend/pkg/metrics/database"
)
// Pool is a pgx.Pool wrapper with metrics integration
@ -22,19 +22,15 @@ type Pool interface {
}
type poolImpl struct {
conn *pgxpool.Pool
sqlRequestTime syncfloat64.Histogram
sqlRequestCounter syncfloat64.Counter
conn *pgxpool.Pool
}
func (p *poolImpl) Query(sql string, args ...interface{}) (pgx.Rows, error) {
start := time.Now()
res, err := p.conn.Query(getTimeoutContext(), sql, args...)
method, table := methodName(sql)
p.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
attribute.String("method", method), attribute.String("table", table))
p.sqlRequestCounter.Add(context.Background(), 1,
attribute.String("method", method), attribute.String("table", table))
database.RecordRequestDuration(float64(time.Now().Sub(start).Milliseconds()), method, table)
database.IncreaseTotalRequests(method, table)
return res, err
}
@ -42,10 +38,8 @@ func (p *poolImpl) QueryRow(sql string, args ...interface{}) pgx.Row {
start := time.Now()
res := p.conn.QueryRow(getTimeoutContext(), sql, args...)
method, table := methodName(sql)
p.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
attribute.String("method", method), attribute.String("table", table))
p.sqlRequestCounter.Add(context.Background(), 1,
attribute.String("method", method), attribute.String("table", table))
database.RecordRequestDuration(float64(time.Now().Sub(start).Milliseconds()), method, table)
database.IncreaseTotalRequests(method, table)
return res
}
@ -53,45 +47,37 @@ func (p *poolImpl) Exec(sql string, arguments ...interface{}) error {
start := time.Now()
_, err := p.conn.Exec(getTimeoutContext(), sql, arguments...)
method, table := methodName(sql)
p.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
attribute.String("method", method), attribute.String("table", table))
p.sqlRequestCounter.Add(context.Background(), 1,
attribute.String("method", method), attribute.String("table", table))
database.RecordRequestDuration(float64(time.Now().Sub(start).Milliseconds()), method, table)
database.IncreaseTotalRequests(method, table)
return err
}
func (p *poolImpl) SendBatch(b *pgx.Batch) pgx.BatchResults {
start := time.Now()
res := p.conn.SendBatch(getTimeoutContext(), b)
p.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
attribute.String("method", "sendBatch"))
p.sqlRequestCounter.Add(context.Background(), 1,
attribute.String("method", "sendBatch"))
database.RecordRequestDuration(float64(time.Now().Sub(start).Milliseconds()), "sendBatch", "")
database.IncreaseTotalRequests("sendBatch", "")
return res
}
func (p *poolImpl) Begin() (*_Tx, error) {
start := time.Now()
tx, err := p.conn.Begin(context.Background())
p.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
attribute.String("method", "begin"))
p.sqlRequestCounter.Add(context.Background(), 1,
attribute.String("method", "begin"))
return &_Tx{tx, p.sqlRequestTime, p.sqlRequestCounter}, err
database.RecordRequestDuration(float64(time.Now().Sub(start).Milliseconds()), "begin", "")
database.IncreaseTotalRequests("begin", "")
return &_Tx{tx}, err
}
func (p *poolImpl) Close() {
p.conn.Close()
}
func NewPool(conn *pgxpool.Pool, sqlRequestTime syncfloat64.Histogram, sqlRequestCounter syncfloat64.Counter) (Pool, error) {
func NewPool(conn *pgxpool.Pool) (Pool, error) {
if conn == nil {
return nil, errors.New("conn is empty")
}
return &poolImpl{
conn: conn,
sqlRequestTime: sqlRequestTime,
sqlRequestCounter: sqlRequestCounter,
conn: conn,
}, nil
}
@ -99,38 +85,30 @@ func NewPool(conn *pgxpool.Pool, sqlRequestTime syncfloat64.Histogram, sqlReques
type _Tx struct {
pgx.Tx
sqlRequestTime syncfloat64.Histogram
sqlRequestCounter syncfloat64.Counter
}
func (tx *_Tx) exec(sql string, args ...interface{}) error {
start := time.Now()
_, err := tx.Exec(context.Background(), sql, args...)
method, table := methodName(sql)
tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
attribute.String("method", method), attribute.String("table", table))
tx.sqlRequestCounter.Add(context.Background(), 1,
attribute.String("method", method), attribute.String("table", table))
database.RecordRequestDuration(float64(time.Now().Sub(start).Milliseconds()), method, table)
database.IncreaseTotalRequests(method, table)
return err
}
func (tx *_Tx) rollback() error {
start := time.Now()
err := tx.Rollback(context.Background())
tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
attribute.String("method", "rollback"))
tx.sqlRequestCounter.Add(context.Background(), 1,
attribute.String("method", "rollback"))
database.RecordRequestDuration(float64(time.Now().Sub(start).Milliseconds()), "rollback", "")
database.IncreaseTotalRequests("rollback", "")
return err
}
func (tx *_Tx) commit() error {
start := time.Now()
err := tx.Commit(context.Background())
tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
attribute.String("method", "commit"))
tx.sqlRequestCounter.Add(context.Background(), 1,
attribute.String("method", "commit"))
database.RecordRequestDuration(float64(time.Now().Sub(start).Milliseconds()), "commit", "")
database.IncreaseTotalRequests("commit", "")
return err
}
@ -169,7 +147,8 @@ func methodName(sql string) (string, string) {
case "update":
table = strings.TrimSpace(parts[1])
case "insert":
table = strings.TrimSpace(parts[2])
tableNameParts := strings.Split(strings.TrimSpace(parts[2]), "(")
table = tableNameParts[0]
}
return cmd, table
}

View file

@ -3,6 +3,7 @@ package messages
import (
"fmt"
"log"
"openreplay/backend/pkg/metrics/sink"
)
type sinkMessageIteratorImpl struct {
@ -53,6 +54,8 @@ func (i *sinkMessageIteratorImpl) sendBatchEnd() {
}
func (i *sinkMessageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) {
sink.RecordBatchSize(float64(len(batchData)))
sink.IncreaseTotalBatches()
// Create new message reader
reader := NewMessageReader(batchData)

View file

@ -74,12 +74,13 @@ func (i *messageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) {
i.messageInfo.Index++
msg := reader.Message()
msgType := msg.TypeID()
// Preprocess "system" messages
if _, ok := i.preFilter[msg.TypeID()]; ok {
msg = msg.Decode()
if msg == nil {
log.Printf("decode error, type: %d, info: %s", msg.TypeID(), i.batchInfo.Info())
log.Printf("decode error, type: %d, info: %s", msgType, i.batchInfo.Info())
return
}
msg = transformDeprecated(msg)
@ -99,7 +100,7 @@ func (i *messageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) {
if i.autoDecode {
msg = msg.Decode()
if msg == nil {
log.Printf("decode error, type: %d, info: %s", msg.TypeID(), i.batchInfo.Info())
log.Printf("decode error, type: %d, info: %s", msgType, i.batchInfo.Info())
return
}
}

View file

@ -0,0 +1,72 @@
package assets
import (
"github.com/prometheus/client_golang/prometheus"
"openreplay/backend/pkg/metrics/common"
"strconv"
)
var assetsProcessedSessions = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "assets",
Name: "processed_total",
Help: "A counter displaying the total count of processed assets.",
},
)
func IncreaseProcessesSessions() {
assetsProcessedSessions.Inc()
}
var assetsSavedSessions = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "assets",
Name: "saved_total",
Help: "A counter displaying the total number of cached assets.",
},
)
func IncreaseSavedSessions() {
assetsSavedSessions.Inc()
}
var assetsDownloadDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "assets",
Name: "download_duration_seconds",
Help: "A histogram displaying the duration of downloading for each asset in seconds.",
Buckets: common.DefaultDurationBuckets,
},
[]string{"response_code"},
)
func RecordDownloadDuration(durMillis float64, code int) {
assetsDownloadDuration.WithLabelValues(strconv.Itoa(code)).Observe(durMillis / 1000.0)
}
var assetsUploadDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "assets",
Name: "upload_s3_duration_seconds",
Help: "A histogram displaying the duration of uploading to s3 for each asset in seconds.",
Buckets: common.DefaultDurationBuckets,
},
[]string{"failed"},
)
func RecordUploadDuration(durMillis float64, isFailed bool) {
failed := "false"
if isFailed {
failed = "true"
}
assetsUploadDuration.WithLabelValues(failed).Observe(durMillis / 1000.0)
}
func List() []prometheus.Collector {
return []prometheus.Collector{
assetsProcessedSessions,
assetsSavedSessions,
assetsDownloadDuration,
assetsUploadDuration,
}
}

View file

@ -0,0 +1,11 @@
package common
// DefaultDurationBuckets is a set of buckets from 5 milliseconds to 1000 seconds (16.6667 minutes)
var DefaultDurationBuckets = []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 25, 50, 100, 250, 500, 1000}
// DefaultSizeBuckets is a set of buckets from 1 byte to 1_000_000_000 bytes (~1 Gb)
var DefaultSizeBuckets = []float64{1, 10, 50, 100, 250, 500, 1000, 2500, 5000, 10000, 25000, 50000, 100_000, 250_000,
500_000, 1_000_000, 10_000_000, 100_000_000, 1_000_000_000}
// DefaultBuckets is a set of buckets from 1 to 1_000_000 elements
var DefaultBuckets = []float64{1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10_000, 50_000, 100_000, 1_000_000}

View file

@ -0,0 +1,127 @@
package database
import (
"github.com/prometheus/client_golang/prometheus"
"openreplay/backend/pkg/metrics/common"
)
var dbBatchSize = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "db",
Name: "batch_size_bytes",
Help: "A histogram displaying the batch size in bytes.",
Buckets: common.DefaultSizeBuckets,
},
)
func RecordBatchSize(size float64) {
dbBatchSize.Observe(size)
}
var dbBatchElements = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "db",
Name: "batch_size_elements",
Help: "A histogram displaying the number of SQL commands in each batch.",
Buckets: common.DefaultBuckets,
},
)
func RecordBatchElements(number float64) {
dbBatchElements.Observe(number)
}
var dbBatchInsertDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "db",
Name: "batch_insert_duration_seconds",
Help: "A histogram displaying the duration of batch inserts in seconds.",
Buckets: common.DefaultDurationBuckets,
},
)
func RecordBatchInsertDuration(durMillis float64) {
dbBatchInsertDuration.Observe(durMillis / 1000.0)
}
var dbBulkSize = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "db",
Name: "bulk_size_bytes",
Help: "A histogram displaying the bulk size in bytes.",
Buckets: common.DefaultSizeBuckets,
},
[]string{"db", "table"},
)
func RecordBulkSize(size float64, db, table string) {
dbBulkSize.WithLabelValues(db, table).Observe(size)
}
var dbBulkElements = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "db",
Name: "bulk_size_elements",
Help: "A histogram displaying the size of data set in each bulk.",
Buckets: common.DefaultBuckets,
},
[]string{"db", "table"},
)
func RecordBulkElements(size float64, db, table string) {
dbBulkElements.WithLabelValues(db, table).Observe(size)
}
var dbBulkInsertDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "db",
Name: "bulk_insert_duration_seconds",
Help: "A histogram displaying the duration of bulk inserts in seconds.",
Buckets: common.DefaultDurationBuckets,
},
[]string{"db", "table"},
)
func RecordBulkInsertDuration(durMillis float64, db, table string) {
dbBulkInsertDuration.WithLabelValues(db, table).Observe(durMillis / 1000.0)
}
var dbRequestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "db",
Name: "request_duration_seconds",
Help: "A histogram displaying the duration of each sql request in seconds.",
Buckets: common.DefaultDurationBuckets,
},
[]string{"method", "table"},
)
func RecordRequestDuration(durMillis float64, method, table string) {
dbRequestDuration.WithLabelValues(method, table).Observe(durMillis / 1000.0)
}
var dbTotalRequests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "db",
Name: "requests_total",
Help: "A counter showing the total number of all SQL requests.",
},
[]string{"method", "table"},
)
func IncreaseTotalRequests(method, table string) {
dbTotalRequests.WithLabelValues(method, table).Inc()
}
func List() []prometheus.Collector {
return []prometheus.Collector{
dbBatchSize,
dbBatchElements,
dbBatchInsertDuration,
dbBulkSize,
dbBulkElements,
dbBulkInsertDuration,
dbRequestDuration,
dbTotalRequests,
}
}

View file

@ -0,0 +1,51 @@
package ender
import "github.com/prometheus/client_golang/prometheus"
var enderActiveSessions = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "ender",
Name: "sessions_active",
Help: "A gauge displaying the number of active (live) sessions.",
},
)
func IncreaseActiveSessions() {
enderActiveSessions.Inc()
}
func DecreaseActiveSessions() {
enderActiveSessions.Dec()
}
var enderClosedSessions = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "ender",
Name: "sessions_closed",
Help: "A counter displaying the number of closed sessions (sent SessionEnd).",
},
)
func IncreaseClosedSessions() {
enderClosedSessions.Inc()
}
var enderTotalSessions = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "ender",
Name: "sessions_total",
Help: "A counter displaying the number of all processed sessions.",
},
)
func IncreaseTotalSessions() {
enderTotalSessions.Inc()
}
func List() []prometheus.Collector {
return []prometheus.Collector{
enderActiveSessions,
enderClosedSessions,
enderTotalSessions,
}
}

View file

@ -0,0 +1,55 @@
package http
import (
"github.com/prometheus/client_golang/prometheus"
"openreplay/backend/pkg/metrics/common"
"strconv"
)
var httpRequestSize = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "http",
Name: "request_size_bytes",
Help: "A histogram displaying the size of each HTTP request in bytes.",
Buckets: common.DefaultSizeBuckets,
},
[]string{"url", "response_code"},
)
func RecordRequestSize(size float64, url string, code int) {
httpRequestSize.WithLabelValues(url, strconv.Itoa(code)).Observe(size)
}
var httpRequestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "http",
Name: "request_duration_seconds",
Help: "A histogram displaying the duration of each HTTP request in seconds.",
Buckets: common.DefaultDurationBuckets,
},
[]string{"url", "response_code"},
)
func RecordRequestDuration(durMillis float64, url string, code int) {
httpRequestDuration.WithLabelValues(url, strconv.Itoa(code)).Observe(durMillis / 1000.0)
}
var httpTotalRequests = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "http",
Name: "requests_total",
Help: "A counter displaying the number all HTTP requests.",
},
)
func IncreaseTotalRequests() {
httpTotalRequests.Inc()
}
func List() []prometheus.Collector {
return []prometheus.Collector{
httpRequestSize,
httpRequestDuration,
httpTotalRequests,
}
}

View file

@ -0,0 +1,40 @@
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"log"
"net/http"
)
type MetricServer struct {
registry *prometheus.Registry
}
func New() *MetricServer {
registry := prometheus.NewRegistry()
// Add go runtime metrics and process collectors.
registry.MustRegister(
collectors.NewGoCollector(),
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
)
// Expose /metrics HTTP endpoint using the created custom registry.
http.Handle(
"/metrics", promhttp.HandlerFor(
registry,
promhttp.HandlerOpts{
EnableOpenMetrics: true,
}),
)
go func() {
log.Println(http.ListenAndServe(":8888", nil))
}()
return &MetricServer{
registry: registry,
}
}
func (s *MetricServer) Register(cs []prometheus.Collector) {
s.registry.MustRegister(cs...)
}

View file

@ -0,0 +1,185 @@
package sink
import (
"github.com/prometheus/client_golang/prometheus"
"openreplay/backend/pkg/metrics/common"
)
var sinkMessageSize = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "sink",
Name: "message_size_bytes",
Help: "A histogram displaying the size of each message in bytes.",
Buckets: common.DefaultSizeBuckets,
},
)
func RecordMessageSize(size float64) {
sinkMessageSize.Observe(size)
}
var sinkWrittenMessages = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "sink",
Name: "messages_written",
Help: "A counter displaying the total number of all written messages.",
},
)
func IncreaseWrittenMessages() {
sinkWrittenMessages.Inc()
}
var sinkTotalMessages = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "sink",
Name: "messages_total",
Help: "A counter displaying the total number of all processed messages.",
},
)
func IncreaseTotalMessages() {
sinkTotalMessages.Inc()
}
var sinkBatchSize = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "sink",
Name: "batch_size_bytes",
Help: "A histogram displaying the size of each batch in bytes.",
Buckets: common.DefaultSizeBuckets,
},
)
func RecordBatchSize(size float64) {
sinkBatchSize.Observe(size)
}
var sinkTotalBatches = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "sink",
Name: "batches_total",
Help: "A counter displaying the total number of all written batches.",
},
)
func IncreaseTotalBatches() {
sinkTotalBatches.Inc()
}
var sinkWrittenBytes = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "sink",
Name: "written_bytes",
Help: "A histogram displaying the size of buffer in bytes written to session file.",
Buckets: common.DefaultSizeBuckets,
},
[]string{"file_type"},
)
func RecordWrittenBytes(size float64, fileType string) {
if size == 0 {
return
}
sinkWrittenBytes.WithLabelValues(fileType).Observe(size)
IncreaseTotalWrittenBytes(size, fileType)
}
var sinkTotalWrittenBytes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "sink",
Name: "written_bytes_total",
Help: "A counter displaying the total number of bytes written to all session files.",
},
[]string{"file_type"},
)
func IncreaseTotalWrittenBytes(size float64, fileType string) {
if size == 0 {
return
}
sinkTotalWrittenBytes.WithLabelValues(fileType).Add(size)
}
var sinkCachedAssets = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "sink",
Name: "assets_cached",
Help: "A gauge displaying the current number of cached assets.",
},
)
func IncreaseCachedAssets() {
sinkCachedAssets.Inc()
}
func DecreaseCachedAssets() {
sinkCachedAssets.Dec()
}
var sinkSkippedAssets = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "sink",
Name: "assets_skipped",
Help: "A counter displaying the total number of all skipped assets.",
},
)
func IncreaseSkippedAssets() {
sinkSkippedAssets.Inc()
}
var sinkTotalAssets = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "sink",
Name: "assets_total",
Help: "A counter displaying the total number of all processed assets.",
},
)
func IncreaseTotalAssets() {
sinkTotalAssets.Inc()
}
var sinkAssetSize = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "sink",
Name: "asset_size_bytes",
Help: "A histogram displaying the size of each asset in bytes.",
Buckets: common.DefaultSizeBuckets,
},
)
func RecordAssetSize(size float64) {
sinkAssetSize.Observe(size)
}
var sinkProcessAssetDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "sink",
Name: "asset_process_duration_seconds",
Help: "A histogram displaying the duration of processing for each asset in seconds.",
Buckets: common.DefaultDurationBuckets,
},
)
func RecordProcessAssetDuration(durMillis float64) {
sinkProcessAssetDuration.Observe(durMillis / 1000.0)
}
func List() []prometheus.Collector {
return []prometheus.Collector{
sinkMessageSize,
sinkWrittenMessages,
sinkTotalMessages,
sinkBatchSize,
sinkTotalBatches,
sinkWrittenBytes,
sinkTotalWrittenBytes,
sinkCachedAssets,
sinkSkippedAssets,
sinkTotalAssets,
sinkAssetSize,
sinkProcessAssetDuration,
}
}

View file

@ -0,0 +1,114 @@
package storage
import (
"github.com/prometheus/client_golang/prometheus"
"openreplay/backend/pkg/metrics/common"
)
var storageSessionSize = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "storage",
Name: "session_size_bytes",
Help: "A histogram displaying the size of each session file in bytes prior to any manipulation.",
Buckets: common.DefaultSizeBuckets,
},
[]string{"file_type"},
)
func RecordSessionSize(fileSize float64, fileType string) {
storageSessionSize.WithLabelValues(fileType).Observe(fileSize)
}
var storageTotalSessions = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "storage",
Name: "sessions_total",
Help: "A counter displaying the total number of all processed sessions.",
},
)
func IncreaseStorageTotalSessions() {
storageTotalSessions.Inc()
}
var storageSessionReadDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "storage",
Name: "read_duration_seconds",
Help: "A histogram displaying the duration of reading for each session in seconds.",
Buckets: common.DefaultDurationBuckets,
},
[]string{"file_type"},
)
func RecordSessionReadDuration(durMillis float64, fileType string) {
storageSessionReadDuration.WithLabelValues(fileType).Observe(durMillis / 1000.0)
}
var storageSessionSortDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "storage",
Name: "sort_duration_seconds",
Help: "A histogram displaying the duration of sorting for each session in seconds.",
Buckets: common.DefaultDurationBuckets,
},
[]string{"file_type"},
)
func RecordSessionSortDuration(durMillis float64, fileType string) {
storageSessionSortDuration.WithLabelValues(fileType).Observe(durMillis / 1000.0)
}
var storageSessionEncodeDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "storage",
Name: "encode_duration_seconds",
Help: "A histogram displaying the duration of encoding for each session in seconds.",
Buckets: common.DefaultDurationBuckets,
},
[]string{"file_type"},
)
func RecordSessionEncodeDuration(durMillis float64, fileType string) {
storageSessionEncodeDuration.WithLabelValues(fileType).Observe(durMillis / 1000.0)
}
var storageSessionCompressDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "storage",
Name: "compress_duration_seconds",
Help: "A histogram displaying the duration of compressing for each session in seconds.",
Buckets: common.DefaultDurationBuckets,
},
[]string{"file_type"},
)
func RecordSessionCompressDuration(durMillis float64, fileType string) {
storageSessionCompressDuration.WithLabelValues(fileType).Observe(durMillis / 1000.0)
}
var storageSessionUploadDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "storage",
Name: "upload_duration_seconds",
Help: "A histogram displaying the duration of uploading to s3 for each session in seconds.",
Buckets: common.DefaultDurationBuckets,
},
[]string{"file_type"},
)
func RecordSessionUploadDuration(durMillis float64, fileType string) {
storageSessionUploadDuration.WithLabelValues(fileType).Observe(durMillis / 1000.0)
}
func List() []prometheus.Collector {
return []prometheus.Collector{
storageSessionSize,
storageTotalSessions,
storageSessionReadDuration,
storageSessionSortDuration,
storageSessionEncodeDuration,
storageSessionCompressDuration,
storageSessionUploadDuration,
}
}

View file

@ -5,6 +5,8 @@ import (
"errors"
"fmt"
"log"
"openreplay/backend/pkg/metrics/database"
"time"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
)
@ -16,19 +18,23 @@ type Bulk interface {
type bulkImpl struct {
conn driver.Conn
table string
query string
values [][]interface{}
}
func NewBulk(conn driver.Conn, query string) (Bulk, error) {
func NewBulk(conn driver.Conn, table, query string) (Bulk, error) {
switch {
case conn == nil:
return nil, errors.New("clickhouse connection is empty")
case table == "":
return nil, errors.New("table is empty")
case query == "":
return nil, errors.New("query is empty")
}
return &bulkImpl{
conn: conn,
table: table,
query: query,
values: make([][]interface{}, 0),
}, nil
@ -40,6 +46,7 @@ func (b *bulkImpl) Append(args ...interface{}) error {
}
func (b *bulkImpl) Send() error {
start := time.Now()
batch, err := b.conn.PrepareBatch(context.Background(), b.query)
if err != nil {
return fmt.Errorf("can't create new batch: %s", err)
@ -50,6 +57,11 @@ func (b *bulkImpl) Send() error {
log.Printf("failed query: %s", b.query)
}
}
err = batch.Send()
// Save bulk metrics
database.RecordBulkElements(float64(len(b.values)), "ch", b.table)
database.RecordBulkInsertDuration(float64(time.Now().Sub(start).Milliseconds()), "ch", b.table)
// Prepare values slice for a new data
b.values = make([][]interface{}, 0)
return batch.Send()
return err
}

View file

@ -3,18 +3,16 @@ package clickhouse
import (
"errors"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"log"
"openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/hashid"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/url"
"os"
"strings"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"openreplay/backend/pkg/license"
)
@ -52,28 +50,14 @@ type connectorImpl struct {
finished chan struct{}
}
// Check env variables. If not present, return default value.
func getEnv(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
}
func NewConnector(url string) Connector {
license.CheckLicense()
// Check username, password, database
userName := getEnv("CH_USERNAME", "default")
password := getEnv("CH_PASSWORD", "")
database := getEnv("CH_DATABASE", "default")
url = strings.TrimPrefix(url, "tcp://")
url = strings.TrimSuffix(url, "/"+database)
url = strings.TrimSuffix(url, "/default")
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{url},
Auth: clickhouse.Auth{
Database: database,
Username: userName,
Password: password,
Database: "default",
},
MaxOpenConns: 20,
MaxIdleConns: 15,
@ -99,7 +83,7 @@ func NewConnector(url string) Connector {
}
func (c *connectorImpl) newBatch(name, query string) error {
batch, err := NewBulk(c.conn, query)
batch, err := NewBulk(c.conn, name, query)
if err != nil {
return fmt.Errorf("can't create new batch: %s", err)
}