diff --git a/backend/cmd/http/main.go b/backend/cmd/http/main.go index 9901c8e32..0ead20855 100644 --- a/backend/cmd/http/main.go +++ b/backend/cmd/http/main.go @@ -6,6 +6,7 @@ import ( "openreplay/backend/internal/http/router" "openreplay/backend/internal/http/server" "openreplay/backend/internal/http/services" + "openreplay/backend/pkg/monitoring" "os" "os/signal" "syscall" @@ -17,6 +18,8 @@ import ( ) func main() { + metrics := monitoring.New("ender") + log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) pprof.StartProfilingServer() @@ -35,7 +38,7 @@ func main() { services := services.New(cfg, producer, dbConn) // Init server's routes - router, err := router.NewRouter(cfg, services) + router, err := router.NewRouter(cfg, services, metrics) if err != nil { log.Fatalf("failed while creating engine: %s", err) } diff --git a/backend/internal/http/router/handlers-web.go b/backend/internal/http/router/handlers-web.go index cc6043c2d..fa464d530 100644 --- a/backend/internal/http/router/handlers-web.go +++ b/backend/internal/http/router/handlers-web.go @@ -1,8 +1,11 @@ package router import ( + "bytes" "encoding/json" "errors" + "go.opentelemetry.io/otel/attribute" + "io" "io/ioutil" "log" "math/rand" @@ -16,6 +19,23 @@ import ( "openreplay/backend/pkg/token" ) +func (e *Router) readBody(w http.ResponseWriter, r *http.Request, limit int64) (io.ReadCloser, error) { + body := http.MaxBytesReader(w, r.Body, limit) + bodyBytes, err := ioutil.ReadAll(body) + if err != nil { + return nil, err + } + body.Close() + + reqSize := len(bodyBytes) + e.requestSize.Record( + r.Context(), + float64(reqSize), + []attribute.KeyValue{attribute.String("method", r.URL.Path)}..., + ) + return ioutil.NopCloser(bytes.NewBuffer(bodyBytes)), nil +} + func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) { startTime := time.Now() @@ -24,12 +44,18 @@ func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty")) return } - body := http.MaxBytesReader(w, r.Body, e.cfg.JsonSizeLimit) - defer body.Close() + + reqBody, err := e.readBody(w, r, e.cfg.JsonSizeLimit) + if err != nil { + log.Printf("error while reading request body: %s", err) + ResponseWithError(w, http.StatusRequestEntityTooLarge, err) + return + } + defer reqBody.Close() // Parse request body req := &StartSessionRequest{} - if err := json.NewDecoder(body).Decode(req); err != nil { + if err := json.NewDecoder(reqBody).Decode(req); err != nil { ResponseWithError(w, http.StatusBadRequest, err) return } @@ -114,10 +140,16 @@ func (e *Router) pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request) ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty")) return } - body := http.MaxBytesReader(w, r.Body, e.cfg.BeaconSizeLimit) - defer body.Close() - bytes, err := ioutil.ReadAll(body) + reqBody, err := e.readBody(w, r, e.cfg.BeaconSizeLimit) + if err != nil { + log.Printf("error while reading request body: %s", err) + ResponseWithError(w, http.StatusRequestEntityTooLarge, err) + return + } + defer reqBody.Close() + + bytes, err := ioutil.ReadAll(reqBody) if err != nil { ResponseWithError(w, http.StatusInternalServerError, err) // TODO: Split environments; send error here only on staging return @@ -139,12 +171,18 @@ func (e *Router) notStartedHandlerWeb(w http.ResponseWriter, r *http.Request) { ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty")) return } - body := http.MaxBytesReader(w, r.Body, e.cfg.JsonSizeLimit) - defer body.Close() + + reqBody, err := e.readBody(w, r, e.cfg.JsonSizeLimit) + if err != nil { + log.Printf("error while reading request body: %s", err) + ResponseWithError(w, http.StatusRequestEntityTooLarge, err) + return + } + defer reqBody.Close() // Parse request body req := &NotStartedRequest{} - if err := json.NewDecoder(body).Decode(req); err != nil { + if err := json.NewDecoder(reqBody).Decode(req); err != nil { ResponseWithError(w, http.StatusBadRequest, err) return } @@ -160,7 +198,7 @@ func (e *Router) notStartedHandlerWeb(w http.ResponseWriter, r *http.Request) { return } country := e.services.GeoIP.ExtractISOCodeFromHTTPRequest(r) - err := e.services.Database.InsertUnstartedSession(postgres.UnstartedSession{ + err = e.services.Database.InsertUnstartedSession(postgres.UnstartedSession{ ProjectKey: *req.ProjectKey, TrackerVersion: req.TrackerVersion, DoNotTrack: req.DoNotTrack, diff --git a/backend/internal/http/router/router.go b/backend/internal/http/router/router.go index a0b6c9c03..cd7095fae 100644 --- a/backend/internal/http/router/router.go +++ b/backend/internal/http/router/router.go @@ -1,24 +1,42 @@ package router import ( + "context" + "fmt" "github.com/gorilla/mux" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/instrument/syncfloat64" "log" "net/http" http3 "openreplay/backend/internal/config/http" http2 "openreplay/backend/internal/http/services" + "openreplay/backend/pkg/monitoring" + "time" ) type Router struct { - router *mux.Router - cfg *http3.Config - services *http2.ServicesBuilder + router *mux.Router + cfg *http3.Config + services *http2.ServicesBuilder + requestSize syncfloat64.Histogram + requestDuration syncfloat64.Histogram + totalRequests syncfloat64.Counter } -func NewRouter(cfg *http3.Config, services *http2.ServicesBuilder) (*Router, error) { +func NewRouter(cfg *http3.Config, services *http2.ServicesBuilder, metrics *monitoring.Metrics) (*Router, error) { + switch { + case cfg == nil: + return nil, fmt.Errorf("config is empty") + case services == nil: + return nil, fmt.Errorf("services is empty") + case metrics == nil: + return nil, fmt.Errorf("metrics is empty") + } e := &Router{ cfg: cfg, services: services, } + e.initMetrics(metrics) e.init() return e, nil } @@ -49,6 +67,22 @@ func (e *Router) init() { e.router.Use(e.corsMiddleware) } +func (e *Router) initMetrics(metrics *monitoring.Metrics) { + var err error + e.requestSize, err = metrics.RegisterHistogram("requests_body_size") + if err != nil { + log.Printf("can't create requests_body_size metric: %s", err) + } + e.requestDuration, err = metrics.RegisterHistogram("requests_duration") + if err != nil { + log.Printf("can't create requests_duration metric: %s", err) + } + e.totalRequests, err = metrics.RegisterCounter("requests_total") + if err != nil { + log.Printf("can't create requests_total metric: %s", err) + } +} + func (e *Router) root(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } @@ -66,8 +100,17 @@ func (e *Router) corsMiddleware(next http.Handler) http.Handler { } log.Printf("Request: %v - %v ", r.Method, r.URL.Path) + requestStart := time.Now() + // Serve request next.ServeHTTP(w, r) + + metricsContext, _ := context.WithTimeout(context.Background(), time.Millisecond*100) + e.totalRequests.Add(metricsContext, 1) + e.requestDuration.Record(metricsContext, + float64(time.Now().Sub(requestStart).Milliseconds()), + []attribute.KeyValue{attribute.String("method", r.URL.Path)}..., + ) }) } diff --git a/backend/internal/sessionender/ender.go b/backend/internal/sessionender/ender.go index bd0a40a35..dd20f9414 100644 --- a/backend/internal/sessionender/ender.go +++ b/backend/internal/sessionender/ender.go @@ -23,21 +23,27 @@ type SessionEnder struct { timeout int64 sessions map[uint64]*session // map[sessionID]session activeSessions syncfloat64.UpDownCounter + totalSessions syncfloat64.Counter } func New(metrics *monitoring.Metrics, timeout int64) (*SessionEnder, error) { if metrics == nil { return nil, fmt.Errorf("metrics module is empty") } - activeSessions, err := metrics.RegisterUpDownCounter("active_sessions") + activeSessions, err := metrics.RegisterUpDownCounter("sessions_active") if err != nil { - return nil, fmt.Errorf("can't register active_session metric: %s", err) + return nil, fmt.Errorf("can't register session.active metric: %s", err) + } + totalSessions, err := metrics.RegisterCounter("sessions_total") + if err != nil { + return nil, fmt.Errorf("can't register session.total metric: %s", err) } return &SessionEnder{ timeout: timeout, sessions: make(map[uint64]*session), activeSessions: activeSessions, + totalSessions: totalSessions, }, nil } @@ -49,12 +55,15 @@ func (se *SessionEnder) UpdateSession(sessionID, timestamp uint64) { return } sess, ok := se.sessions[sessionID] + log.Println(sess, ok) if !ok { se.sessions[sessionID] = &session{ lastTimestamp: currTS, isEnded: false, } + log.Printf("added new session: %d", sessionID) se.activeSessions.Add(context.Background(), 1) + se.totalSessions.Add(context.Background(), 1) return } if currTS > sess.lastTimestamp { diff --git a/backend/pkg/monitoring/metrics.go b/backend/pkg/monitoring/metrics.go index b6d524895..d3cd807c6 100644 --- a/backend/pkg/monitoring/metrics.go +++ b/backend/pkg/monitoring/metrics.go @@ -75,16 +75,16 @@ Counter is a synchronous instrument that measures additive non-decreasing values - disk reads */ -func (m *Metrics) RegisterCounter(name string) error { +func (m *Metrics) RegisterCounter(name string) (syncfloat64.Counter, error) { if _, ok := m.counters[name]; ok { - return fmt.Errorf("counter %s already exists", name) + return nil, fmt.Errorf("counter %s already exists", name) } counter, err := m.meter.SyncFloat64().Counter(name) if err != nil { - return fmt.Errorf("failed to initialize counter: %v", err) + return nil, fmt.Errorf("failed to initialize counter: %v", err) } m.counters[name] = counter - return nil + return counter, nil } func (m *Metrics) GetCounter(name string) syncfloat64.Counter { @@ -116,8 +116,10 @@ func (m *Metrics) GetUpDownCounter(name string) syncfloat64.UpDownCounter { } /* - - */ +Histogram is a synchronous instrument that produces a histogram from recorded values, for example: +- request latency +- request size +*/ func (m *Metrics) RegisterHistogram(name string) (syncfloat64.Histogram, error) { if _, ok := m.histograms[name]; ok {