diff --git a/backend/cmd/db/main.go b/backend/cmd/db/main.go index 8f2334f21..2ea57b459 100644 --- a/backend/cmd/db/main.go +++ b/backend/cmd/db/main.go @@ -45,15 +45,21 @@ func main() { // Create handler's aggregator builderMap := sessions.NewBuilderMap(handlersFabric) - // Init modules - saver := datasaver.New(pg) - saver.InitStats() - statsLogger := logger.NewQueueStats(cfg.LoggerTimeout) - keepMessage := func(tp int) bool { return tp == messages.MsgMetadata || tp == messages.MsgIssueEvent || tp == messages.MsgSessionStart || tp == messages.MsgSessionEnd || tp == messages.MsgUserID || tp == messages.MsgUserAnonymousID || tp == messages.MsgCustomEvent || tp == messages.MsgClickEvent || tp == messages.MsgInputEvent || tp == messages.MsgPageEvent || tp == messages.MsgErrorEvent || tp == messages.MsgFetchEvent || tp == messages.MsgGraphQLEvent || tp == messages.MsgIntegrationEvent || tp == messages.MsgPerformanceTrackAggr || tp == messages.MsgResourceEvent || tp == messages.MsgLongTask || tp == messages.MsgJSException || tp == messages.MsgResourceTiming || tp == messages.MsgRawCustomEvent || tp == messages.MsgCustomIssue || tp == messages.MsgFetch || tp == messages.MsgGraphQL || tp == messages.MsgStateAction || tp == messages.MsgSetInputTarget || tp == messages.MsgSetInputValue || tp == messages.MsgCreateDocument || tp == messages.MsgMouseClick || tp == messages.MsgSetPageLocation || tp == messages.MsgPageLoadTiming || tp == messages.MsgPageRenderTiming } + var producer types.Producer = nil + if cfg.UseQuickwit { + producer = queue.NewProducer(cfg.MessageSizeLimit, true) + defer producer.Close(15000) + } + + // Init modules + saver := datasaver.New(pg, producer) + saver.InitStats() + statsLogger := logger.NewQueueStats(cfg.LoggerTimeout) + // Handler logic handler := func(sessionID uint64, iter messages.Iterator, meta *types.Meta) { statsLogger.Collect(sessionID, meta) diff --git a/backend/internal/config/db/config.go b/backend/internal/config/db/config.go index 03c7bc096..715d9ff8e 100644 --- a/backend/internal/config/db/config.go +++ b/backend/internal/config/db/config.go @@ -17,6 +17,7 @@ type Config struct { CommitBatchTimeout time.Duration `env:"COMMIT_BATCH_TIMEOUT,default=15s"` BatchQueueLimit int `env:"DB_BATCH_QUEUE_LIMIT,required"` BatchSizeLimit int `env:"DB_BATCH_SIZE_LIMIT,required"` + UseQuickwit bool `env:"QUICKWIT_ENABLED,default=false"` } func New() *Config { diff --git a/backend/internal/db/datasaver/fts.go b/backend/internal/db/datasaver/fts.go new file mode 100644 index 000000000..c0250c4d2 --- /dev/null +++ b/backend/internal/db/datasaver/fts.go @@ -0,0 +1,123 @@ +package datasaver + +import ( + "encoding/json" + "log" + "openreplay/backend/pkg/messages" +) + +type FetchEventFTS struct { + Method string `json:"method"` + URL string `json:"url"` + Request string `json:"request"` + Response string `json:"response"` + Status uint64 `json:"status"` + Timestamp uint64 `json:"timestamp"` + Duration uint64 `json:"duration"` +} + +type PageEventFTS struct { + MessageID uint64 `json:"message_id"` + Timestamp uint64 `json:"timestamp"` + URL string `json:"url"` + Referrer string `json:"referrer"` + Loaded bool `json:"loaded"` + RequestStart uint64 `json:"request_start"` + ResponseStart uint64 `json:"response_start"` + ResponseEnd uint64 `json:"response_end"` + DomContentLoadedEventStart uint64 `json:"dom_content_loaded_event_start"` + DomContentLoadedEventEnd uint64 `json:"dom_content_loaded_event_end"` + LoadEventStart uint64 `json:"load_event_start"` + LoadEventEnd uint64 `json:"load_event_end"` + FirstPaint uint64 `json:"first_paint"` + FirstContentfulPaint uint64 `json:"first_contentful_paint"` + SpeedIndex uint64 `json:"speed_index"` + VisuallyComplete uint64 `json:"visually_complete"` + TimeToInteractive uint64 `json:"time_to_interactive"` +} + +type GraphQLEventFTS struct { + OperationKind string `json:"operation_kind"` + OperationName string `json:"operation_name"` + Variables string `json:"variables"` + Response string `json:"response"` +} + +func (s *Saver) sendToFTS(msg messages.Message, sessionID uint64) { + // Skip, if FTS is disabled + if s.producer == nil { + return + } + + var ( + event []byte + err error + ) + + switch m := msg.(type) { + // Common + case *messages.Fetch: + event, err = json.Marshal(FetchEventFTS{ + Method: m.Method, + URL: m.URL, + Request: m.Request, + Response: m.Response, + Status: m.Status, + Timestamp: m.Timestamp, + Duration: m.Duration, + }) + case *messages.FetchEvent: + event, err = json.Marshal(FetchEventFTS{ + Method: m.Method, + URL: m.URL, + Request: m.Request, + Response: m.Response, + Status: m.Status, + Timestamp: m.Timestamp, + Duration: m.Duration, + }) + case *messages.PageEvent: + event, err = json.Marshal(PageEventFTS{ + MessageID: m.MessageID, + Timestamp: m.Timestamp, + URL: m.URL, + Referrer: m.Referrer, + Loaded: m.Loaded, + RequestStart: m.RequestStart, + ResponseStart: m.ResponseStart, + ResponseEnd: m.ResponseEnd, + DomContentLoadedEventStart: m.DomContentLoadedEventStart, + DomContentLoadedEventEnd: m.DomContentLoadedEventEnd, + LoadEventStart: m.LoadEventStart, + LoadEventEnd: m.LoadEventEnd, + FirstPaint: m.FirstPaint, + FirstContentfulPaint: m.FirstContentfulPaint, + SpeedIndex: m.SpeedIndex, + VisuallyComplete: m.VisuallyComplete, + TimeToInteractive: m.TimeToInteractive, + }) + case *messages.GraphQL: + event, err = json.Marshal(GraphQLEventFTS{ + OperationKind: m.OperationKind, + OperationName: m.OperationName, + Variables: m.Variables, + Response: m.Response, + }) + case *messages.GraphQLEvent: + event, err = json.Marshal(GraphQLEventFTS{ + OperationKind: m.OperationKind, + OperationName: m.OperationName, + Variables: m.Variables, + Response: m.Response, + }) + } + if err != nil { + log.Printf("can't marshal json for quickwit: %s", err) + } else { + if len(event) > 0 { + if err := s.producer.Produce("quickwit", sessionID, event); err != nil { + log.Printf("can't send event to quickwit: %s", err) + } + } + } +} diff --git a/backend/internal/db/datasaver/messages.go b/backend/internal/db/datasaver/messages.go index 4197ffb77..702c2f210 100644 --- a/backend/internal/db/datasaver/messages.go +++ b/backend/internal/db/datasaver/messages.go @@ -35,12 +35,15 @@ func (mi *Saver) InsertMessage(sessionID uint64, msg Message) error { // Unique Web messages case *PageEvent: + mi.sendToFTS(msg, sessionID) return mi.pg.InsertWebPageEvent(sessionID, m) case *ErrorEvent: return mi.pg.InsertWebErrorEvent(sessionID, m) case *FetchEvent: + mi.sendToFTS(msg, sessionID) return mi.pg.InsertWebFetchEvent(sessionID, m) case *GraphQLEvent: + mi.sendToFTS(msg, sessionID) return mi.pg.InsertWebGraphQLEvent(sessionID, m) case *IntegrationEvent: return mi.pg.InsertWebErrorEvent(sessionID, &ErrorEvent{ diff --git a/backend/internal/db/datasaver/saver.go b/backend/internal/db/datasaver/saver.go index 4cd742718..d41756a4d 100644 --- a/backend/internal/db/datasaver/saver.go +++ b/backend/internal/db/datasaver/saver.go @@ -1,11 +1,15 @@ package datasaver -import "openreplay/backend/pkg/db/cache" +import ( + "openreplay/backend/pkg/db/cache" + "openreplay/backend/pkg/queue/types" +) type Saver struct { - pg *cache.PGCache + pg *cache.PGCache + producer types.Producer } -func New(pg *cache.PGCache) *Saver { - return &Saver{pg: pg} +func New(pg *cache.PGCache, producer types.Producer) *Saver { + return &Saver{pg: pg, producer: producer} }