[DB] added last quickwit changes (#950)

* feat(backend): added last quickwit changes
This commit is contained in:
Alexander 2023-01-20 16:18:00 +01:00 committed by GitHub
parent 8c90ff8da9
commit bfe14db353
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 36 additions and 18 deletions

View file

@ -5,7 +5,6 @@ import (
"log"
types2 "openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/pprof"
"openreplay/backend/pkg/queue/types"
"os"
"os/signal"
"syscall"
@ -49,14 +48,8 @@ func main() {
// Create handler's aggregator
builderMap := sessions.NewBuilderMap(handlersFabric)
var producer types.Producer = nil
if cfg.UseQuickwit {
producer = queue.NewProducer(cfg.MessageSizeLimit, true)
defer producer.Close(15000)
}
// Init modules
saver := datasaver.New(pg, producer)
saver := datasaver.New(pg, cfg)
saver.InitStats()
msgFilter := []int{messages.MsgMetadata, messages.MsgIssueEvent, messages.MsgSessionStart, messages.MsgSessionEnd,

View file

@ -18,6 +18,7 @@ type Config struct {
BatchQueueLimit int `env:"DB_BATCH_QUEUE_LIMIT,required"`
BatchSizeLimit int `env:"DB_BATCH_SIZE_LIMIT,required"`
UseQuickwit bool `env:"QUICKWIT_ENABLED,default=false"`
QuickwitTopic string `env:"QUICKWIT_TOPIC,default=saas-quickwit"`
UseProfiler bool `env:"PROFILER_ENABLED,default=false"`
}

View file

@ -36,13 +36,10 @@ func (mi *Saver) InsertMessage(msg Message) error {
// Unique Web messages
case *PageEvent:
mi.sendToFTS(msg, sessionID)
return mi.pg.InsertWebPageEvent(sessionID, m)
case *NetworkRequest:
mi.sendToFTS(msg, sessionID)
return mi.pg.InsertWebNetworkRequest(sessionID, m)
case *GraphQL:
mi.sendToFTS(msg, sessionID)
return mi.pg.InsertWebGraphQL(sessionID, m)
case *JSException:
return mi.pg.InsertWebJSException(m)

View file

@ -1,6 +1,7 @@
package datasaver
import (
"openreplay/backend/internal/config/db"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/queue/types"
)
@ -10,6 +11,6 @@ type Saver struct {
producer types.Producer
}
func New(pg *cache.PGCache, producer types.Producer) *Saver {
return &Saver{pg: pg, producer: producer}
func New(pg *cache.PGCache, _ *db.Config) *Saver {
return &Saver{pg: pg, producer: nil}
}

View file

@ -60,7 +60,6 @@ func (conn *Conn) InsertWebPageEvent(sessionID uint64, projectID uint32, e *Page
func (conn *Conn) InsertWebClickEvent(sessionID uint64, projectID uint32, e *ClickEvent) error {
var host, path string
host, path, _, _ = url.GetURLParts(e.Url)
log.Println("insert web click:", host, path)
if err := conn.webClickEvents.Append(sessionID, truncSqIdx(e.MessageID), e.Timestamp, e.Label, e.Selector, host+path, path); err != nil {
log.Printf("insert web click err: %s", err)
}

View file

@ -7,6 +7,8 @@ import (
)
type NetworkRequestFTS struct {
SessionID uint64 `json:"session_id"`
ProjectID uint32 `json:"project_id"`
Method string `json:"method"`
URL string `json:"url"`
Request string `json:"request"`
@ -17,6 +19,8 @@ type NetworkRequestFTS struct {
}
type PageEventFTS struct {
SessionID uint64 `json:"session_id"`
ProjectID uint32 `json:"project_id"`
MessageID uint64 `json:"message_id"`
Timestamp uint64 `json:"timestamp"`
URL string `json:"url"`
@ -37,13 +41,15 @@ type PageEventFTS struct {
}
type GraphQLFTS struct {
SessionID uint64 `json:"session_id"`
ProjectID uint32 `json:"project_id"`
OperationKind string `json:"operation_kind"`
OperationName string `json:"operation_name"`
Variables string `json:"variables"`
Response string `json:"response"`
}
func (s *Saver) sendToFTS(msg messages.Message, sessionID uint64) {
func (s *Saver) SendToFTS(msg messages.Message, projID uint32) {
// Skip, if FTS is disabled
if s.producer == nil {
return
@ -58,6 +64,8 @@ func (s *Saver) sendToFTS(msg messages.Message, sessionID uint64) {
// Common
case *messages.NetworkRequest:
event, err = json.Marshal(NetworkRequestFTS{
SessionID: msg.SessionID(),
ProjectID: projID,
Method: m.Method,
URL: m.URL,
Request: m.Request,
@ -68,6 +76,8 @@ func (s *Saver) sendToFTS(msg messages.Message, sessionID uint64) {
})
case *messages.PageEvent:
event, err = json.Marshal(PageEventFTS{
SessionID: msg.SessionID(),
ProjectID: projID,
MessageID: m.MessageID,
Timestamp: m.Timestamp,
URL: m.URL,
@ -88,6 +98,8 @@ func (s *Saver) sendToFTS(msg messages.Message, sessionID uint64) {
})
case *messages.GraphQL:
event, err = json.Marshal(GraphQLFTS{
SessionID: msg.SessionID(),
ProjectID: projID,
OperationKind: m.OperationKind,
OperationName: m.OperationName,
Variables: m.Variables,
@ -98,7 +110,7 @@ func (s *Saver) sendToFTS(msg messages.Message, sessionID uint64) {
log.Printf("can't marshal json for quickwit: %s", err)
} else {
if len(event) > 0 {
if err := s.producer.Produce("quickwit", sessionID, event); err != nil {
if err := s.producer.Produce(s.topic, msg.SessionID(), event); err != nil {
log.Printf("can't send event to quickwit: %s", err)
}
}

View file

@ -1,8 +1,10 @@
package datasaver
import (
"openreplay/backend/internal/config/db"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/clickhouse"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
)
@ -10,8 +12,14 @@ type Saver struct {
pg *cache.PGCache
ch clickhouse.Connector
producer types.Producer
topic string
}
func New(pg *cache.PGCache, producer types.Producer) *Saver {
return &Saver{pg: pg, producer: producer}
func New(pg *cache.PGCache, cfg *db.Config) *Saver {
var producer types.Producer = nil
if cfg.UseQuickwit {
producer = queue.NewProducer(cfg.MessageSizeLimit, true)
defer producer.Close(15000)
}
return &Saver{pg: pg, producer: producer, topic: cfg.QuickwitTopic}
}

View file

@ -17,6 +17,13 @@ func (si *Saver) InitStats() {
}
func (si *Saver) InsertStats(session *types.Session, msg messages.Message) error {
// Send data to quickwit
if sess, err := si.pg.Cache.GetSession(msg.SessionID()); err != nil {
si.SendToFTS(msg, 0)
} else {
si.SendToFTS(msg, sess.ProjectID)
}
switch m := msg.(type) {
// Web
case *messages.SessionEnd: