feat(backend): added metrics to storage and sink services
This commit is contained in:
parent
a6db2cb602
commit
78cf538b6b
4 changed files with 64 additions and 9 deletions
|
|
@ -18,7 +18,7 @@ import (
|
|||
)
|
||||
|
||||
func main() {
|
||||
metrics := monitoring.New("ender")
|
||||
metrics := monitoring.New("http")
|
||||
|
||||
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
|
||||
pprof.StartProfilingServer()
|
||||
|
|
|
|||
|
|
@ -1,11 +1,13 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"log"
|
||||
"openreplay/backend/internal/sink/assetscache"
|
||||
"openreplay/backend/internal/sink/oswriter"
|
||||
"openreplay/backend/internal/storage"
|
||||
"openreplay/backend/pkg/monitoring"
|
||||
"time"
|
||||
|
||||
"os"
|
||||
|
|
@ -20,6 +22,8 @@ import (
|
|||
)
|
||||
|
||||
func main() {
|
||||
metrics := monitoring.New("sink")
|
||||
|
||||
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
|
||||
|
||||
cfg := sink.New()
|
||||
|
|
@ -36,6 +40,18 @@ func main() {
|
|||
assetMessageHandler := assetscache.New(cfg, rewriter, producer)
|
||||
|
||||
counter := storage.NewLogCounter()
|
||||
totalMessages, err := metrics.RegisterCounter("messages_total")
|
||||
if err != nil {
|
||||
log.Printf("can't create messages_total metric: %s", err)
|
||||
}
|
||||
savedMessages, err := metrics.RegisterCounter("messages_saved")
|
||||
if err != nil {
|
||||
log.Printf("can't create messages_saved metric: %s", err)
|
||||
}
|
||||
messageSize, err := metrics.RegisterHistogram("messages_size")
|
||||
if err != nil {
|
||||
log.Printf("can't create messages_size metric: %s", err)
|
||||
}
|
||||
|
||||
consumer := queue.NewMessageConsumer(
|
||||
cfg.GroupSink,
|
||||
|
|
@ -47,6 +63,8 @@ func main() {
|
|||
// Process assets
|
||||
message = assetMessageHandler.ParseAssets(sessionID, message)
|
||||
|
||||
totalMessages.Add(context.Background(), 1)
|
||||
|
||||
// Filter message
|
||||
typeID := message.TypeID()
|
||||
if !IsReplayerType(typeID) {
|
||||
|
|
@ -74,6 +92,9 @@ func main() {
|
|||
if err := writer.Write(sessionID, data); err != nil {
|
||||
log.Printf("Writer error: %v\n", err)
|
||||
}
|
||||
|
||||
messageSize.Record(context.Background(), float64(len(data)))
|
||||
savedMessages.Add(context.Background(), 1)
|
||||
},
|
||||
false,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package main
|
|||
|
||||
import (
|
||||
"log"
|
||||
"openreplay/backend/pkg/monitoring"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strconv"
|
||||
|
|
@ -17,12 +18,14 @@ import (
|
|||
)
|
||||
|
||||
func main() {
|
||||
metrics := monitoring.New("storage")
|
||||
|
||||
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
|
||||
|
||||
cfg := config.New()
|
||||
|
||||
s3 := s3storage.NewS3(cfg.S3Region, cfg.S3Bucket)
|
||||
srv, err := storage.New(cfg, s3)
|
||||
srv, err := storage.New(cfg, s3, metrics)
|
||||
if err != nil {
|
||||
log.Printf("can't init storage service: %s", err)
|
||||
return
|
||||
|
|
|
|||
|
|
@ -2,10 +2,13 @@ package storage
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
|
||||
"log"
|
||||
config "openreplay/backend/internal/config/storage"
|
||||
"openreplay/backend/pkg/flakeid"
|
||||
"openreplay/backend/pkg/monitoring"
|
||||
"openreplay/backend/pkg/storage"
|
||||
"os"
|
||||
"strconv"
|
||||
|
|
@ -13,26 +16,41 @@ import (
|
|||
)
|
||||
|
||||
type Storage struct {
|
||||
cfg *config.Config
|
||||
s3 *storage.S3
|
||||
startBytes []byte
|
||||
cfg *config.Config
|
||||
s3 *storage.S3
|
||||
startBytes []byte
|
||||
totalSessions syncfloat64.Counter
|
||||
sessionSize syncfloat64.Histogram
|
||||
archivingTime syncfloat64.Histogram
|
||||
}
|
||||
|
||||
func New(cfg *config.Config, s3 *storage.S3) (*Storage, error) {
|
||||
func New(cfg *config.Config, s3 *storage.S3, metrics *monitoring.Metrics) (*Storage, error) {
|
||||
switch {
|
||||
case cfg == nil:
|
||||
return nil, fmt.Errorf("config is empty")
|
||||
case s3 == nil:
|
||||
return nil, fmt.Errorf("s3 storage is empty")
|
||||
}
|
||||
// Create metrics
|
||||
totalSessions, err := metrics.RegisterCounter("sessions_total")
|
||||
if err != nil {
|
||||
log.Printf("can't create sessions_total metric: %s", err)
|
||||
}
|
||||
sessionSize, err := metrics.RegisterHistogram("sessions_size")
|
||||
if err != nil {
|
||||
log.Printf("can't create session_size metric: %s", err)
|
||||
}
|
||||
return &Storage{
|
||||
cfg: cfg,
|
||||
s3: s3,
|
||||
startBytes: make([]byte, cfg.FileSplitSize),
|
||||
cfg: cfg,
|
||||
s3: s3,
|
||||
startBytes: make([]byte, cfg.FileSplitSize),
|
||||
totalSessions: totalSessions,
|
||||
sessionSize: sessionSize,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Storage) UploadKey(key string, retryCount int) {
|
||||
start := time.Now()
|
||||
if retryCount <= 0 {
|
||||
return
|
||||
}
|
||||
|
|
@ -77,4 +95,17 @@ func (s *Storage) UploadKey(key string, retryCount int) {
|
|||
log.Fatalf("Storage: end upload failed. %v\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Save metrics
|
||||
var fileSize float64 = 0
|
||||
fileInfo, err := file.Stat()
|
||||
if err != nil {
|
||||
log.Printf("can't get file info: %s", err)
|
||||
} else {
|
||||
fileSize = float64(fileInfo.Size())
|
||||
}
|
||||
ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*200)
|
||||
s.archivingTime.Record(ctx, float64(time.Now().Sub(start).Milliseconds()))
|
||||
s.sessionSize.Record(ctx, fileSize)
|
||||
s.totalSessions.Add(ctx, 1)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue