Performance patch (#1508)
* feat(backend): removed legacy InputEvent builder from heuristics service * feat(backend): removed InputEvent handler from DB service * feat(backend): removed InputEvent from ClickHouse connector * feat(backend): added extra env variable for brotli compression algorithm
This commit is contained in:
parent
662e5299c1
commit
bb7c086c5b
10 changed files with 3 additions and 139 deletions
|
|
@ -56,7 +56,7 @@ func main() {
|
||||||
messages.MsgUserID, messages.MsgUserAnonymousID, messages.MsgIntegrationEvent, messages.MsgPerformanceTrackAggr,
|
messages.MsgUserID, messages.MsgUserAnonymousID, messages.MsgIntegrationEvent, messages.MsgPerformanceTrackAggr,
|
||||||
messages.MsgJSException, messages.MsgResourceTiming, messages.MsgCustomEvent, messages.MsgCustomIssue,
|
messages.MsgJSException, messages.MsgResourceTiming, messages.MsgCustomEvent, messages.MsgCustomIssue,
|
||||||
messages.MsgFetch, messages.MsgNetworkRequest, messages.MsgGraphQL, messages.MsgStateAction,
|
messages.MsgFetch, messages.MsgNetworkRequest, messages.MsgGraphQL, messages.MsgStateAction,
|
||||||
messages.MsgSetInputTarget, messages.MsgSetInputValue, messages.MsgCreateDocument, messages.MsgMouseClick,
|
messages.MsgCreateDocument, messages.MsgMouseClick,
|
||||||
messages.MsgSetPageLocation, messages.MsgPageLoadTiming, messages.MsgPageRenderTiming,
|
messages.MsgSetPageLocation, messages.MsgPageLoadTiming, messages.MsgPageRenderTiming,
|
||||||
messages.MsgInputEvent, messages.MsgPageEvent, messages.MsgMouseThrashing, messages.MsgInputChange,
|
messages.MsgInputEvent, messages.MsgPageEvent, messages.MsgMouseThrashing, messages.MsgInputChange,
|
||||||
messages.MsgUnbindNodes}
|
messages.MsgUnbindNodes}
|
||||||
|
|
|
||||||
|
|
@ -26,7 +26,6 @@ func main() {
|
||||||
// HandlersFabric returns the list of message handlers we want to be applied to each incoming message.
|
// HandlersFabric returns the list of message handlers we want to be applied to each incoming message.
|
||||||
handlersFabric := func() []handlers.MessageProcessor {
|
handlersFabric := func() []handlers.MessageProcessor {
|
||||||
return []handlers.MessageProcessor{
|
return []handlers.MessageProcessor{
|
||||||
custom.NewInputEventBuilder(),
|
|
||||||
custom.NewPageEventBuilder(),
|
custom.NewPageEventBuilder(),
|
||||||
web.NewDeadClickDetector(),
|
web.NewDeadClickDetector(),
|
||||||
&web.ClickRageDetector{},
|
&web.ClickRageDetector{},
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,7 @@ type Config struct {
|
||||||
MaxFileSize int64 `env:"MAX_FILE_SIZE,default=524288000"`
|
MaxFileSize int64 `env:"MAX_FILE_SIZE,default=524288000"`
|
||||||
UseSort bool `env:"USE_SESSION_SORT,default=true"`
|
UseSort bool `env:"USE_SESSION_SORT,default=true"`
|
||||||
UseProfiler bool `env:"PROFILER_ENABLED,default=false"`
|
UseProfiler bool `env:"PROFILER_ENABLED,default=false"`
|
||||||
|
UseBrotli bool `env:"USE_BROTLI,default=false"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func New() *Config {
|
func New() *Config {
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,6 @@
|
||||||
package datasaver
|
package datasaver
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"log"
|
"log"
|
||||||
|
|
||||||
"openreplay/backend/internal/config/db"
|
"openreplay/backend/internal/config/db"
|
||||||
|
|
@ -103,14 +102,6 @@ func (s *saverImpl) handleMessage(msg Message) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return s.sessions.UpdateEventsStats(session.SessionID, 1, 0)
|
return s.sessions.UpdateEventsStats(session.SessionID, 1, 0)
|
||||||
case *InputEvent:
|
|
||||||
if err = s.pg.InsertWebInputEvent(session, m); err != nil {
|
|
||||||
if errors.Is(err, postgres.EmptyLabel) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return s.sessions.UpdateEventsStats(session.SessionID, 1, 0)
|
|
||||||
case *PageEvent:
|
case *PageEvent:
|
||||||
if err = s.pg.InsertWebPageEvent(session, m); err != nil {
|
if err = s.pg.InsertWebPageEvent(session, m); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
||||||
|
|
@ -197,7 +197,7 @@ func (s *Storage) prepareSession(path string, tp FileType, task *Task) error {
|
||||||
|
|
||||||
func (s *Storage) packSession(task *Task, tp FileType) {
|
func (s *Storage) packSession(task *Task, tp FileType) {
|
||||||
// If encryption key is empty, pack session using better algorithm
|
// If encryption key is empty, pack session using better algorithm
|
||||||
if task.key == "" {
|
if task.key == "" && s.cfg.UseBrotli {
|
||||||
s.packSessionBetter(task, tp)
|
s.packSessionBetter(task, tp)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,6 @@ type BulkSet struct {
|
||||||
requests Bulk
|
requests Bulk
|
||||||
customEvents Bulk
|
customEvents Bulk
|
||||||
webPageEvents Bulk
|
webPageEvents Bulk
|
||||||
webInputEvents Bulk
|
|
||||||
webInputDurations Bulk
|
webInputDurations Bulk
|
||||||
webGraphQL Bulk
|
webGraphQL Bulk
|
||||||
webErrors Bulk
|
webErrors Bulk
|
||||||
|
|
@ -57,8 +56,6 @@ func (conn *BulkSet) Get(name string) Bulk {
|
||||||
return conn.customEvents
|
return conn.customEvents
|
||||||
case "webPageEvents":
|
case "webPageEvents":
|
||||||
return conn.webPageEvents
|
return conn.webPageEvents
|
||||||
case "webInputEvents":
|
|
||||||
return conn.webInputEvents
|
|
||||||
case "webInputDurations":
|
case "webInputDurations":
|
||||||
return conn.webInputDurations
|
return conn.webInputDurations
|
||||||
case "webGraphQL":
|
case "webGraphQL":
|
||||||
|
|
@ -122,14 +119,6 @@ func (conn *BulkSet) initBulks() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("can't create webPageEvents bulk: %s", err)
|
log.Fatalf("can't create webPageEvents bulk: %s", err)
|
||||||
}
|
}
|
||||||
conn.webInputEvents, err = NewBulk(conn.c,
|
|
||||||
"events.inputs",
|
|
||||||
"(session_id, message_id, timestamp, label)",
|
|
||||||
"($%d, $%d, $%d, NULLIF(LEFT($%d, 2000),''))",
|
|
||||||
4, 200)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("can't create webPageEvents bulk: %s", err)
|
|
||||||
}
|
|
||||||
conn.webInputDurations, err = NewBulk(conn.c,
|
conn.webInputDurations, err = NewBulk(conn.c,
|
||||||
"events.inputs",
|
"events.inputs",
|
||||||
"(session_id, message_id, timestamp, label, hesitation, duration)",
|
"(session_id, message_id, timestamp, label, hesitation, duration)",
|
||||||
|
|
@ -220,7 +209,6 @@ func (conn *BulkSet) Send() {
|
||||||
newTask.bulks = append(newTask.bulks, conn.requests)
|
newTask.bulks = append(newTask.bulks, conn.requests)
|
||||||
newTask.bulks = append(newTask.bulks, conn.customEvents)
|
newTask.bulks = append(newTask.bulks, conn.customEvents)
|
||||||
newTask.bulks = append(newTask.bulks, conn.webPageEvents)
|
newTask.bulks = append(newTask.bulks, conn.webPageEvents)
|
||||||
newTask.bulks = append(newTask.bulks, conn.webInputEvents)
|
|
||||||
newTask.bulks = append(newTask.bulks, conn.webInputDurations)
|
newTask.bulks = append(newTask.bulks, conn.webInputDurations)
|
||||||
newTask.bulks = append(newTask.bulks, conn.webGraphQL)
|
newTask.bulks = append(newTask.bulks, conn.webGraphQL)
|
||||||
newTask.bulks = append(newTask.bulks, conn.webErrors)
|
newTask.bulks = append(newTask.bulks, conn.webErrors)
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,6 @@
|
||||||
package postgres
|
package postgres
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
@ -14,8 +13,6 @@ import (
|
||||||
"openreplay/backend/pkg/url"
|
"openreplay/backend/pkg/url"
|
||||||
)
|
)
|
||||||
|
|
||||||
var EmptyLabel = errors.New("empty label")
|
|
||||||
|
|
||||||
func getAutocompleteType(baseType string, platform string) string {
|
func getAutocompleteType(baseType string, platform string) string {
|
||||||
if platform == "web" {
|
if platform == "web" {
|
||||||
return baseType
|
return baseType
|
||||||
|
|
@ -141,17 +138,6 @@ func (conn *Conn) InsertWebClickEvent(sess *sessions.Session, e *messages.MouseC
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Conn) InsertWebInputEvent(sess *sessions.Session, e *messages.InputEvent) error {
|
|
||||||
if e.Label == "" {
|
|
||||||
return EmptyLabel
|
|
||||||
}
|
|
||||||
if err := conn.bulks.Get("webInputEvents").Append(sess.SessionID, truncSqIdx(e.MessageID), e.Timestamp, e.Label); err != nil {
|
|
||||||
log.Printf("insert web input event err: %s", err)
|
|
||||||
}
|
|
||||||
conn.InsertAutocompleteValue(sess.SessionID, sess.ProjectID, "INPUT", e.Label)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (conn *Conn) InsertInputChangeEvent(sess *sessions.Session, e *messages.InputChange) error {
|
func (conn *Conn) InsertInputChangeEvent(sess *sessions.Session, e *messages.InputChange) error {
|
||||||
if e.Label == "" {
|
if e.Label == "" {
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
|
|
@ -1,77 +0,0 @@
|
||||||
package custom
|
|
||||||
|
|
||||||
import (
|
|
||||||
. "openreplay/backend/pkg/messages"
|
|
||||||
)
|
|
||||||
|
|
||||||
const InputEventTimeout = 1 * 60 * 1000
|
|
||||||
|
|
||||||
type inputLabels map[uint64]string
|
|
||||||
|
|
||||||
type inputEventBuilder struct {
|
|
||||||
inputEvent *InputEvent
|
|
||||||
inputLabels inputLabels
|
|
||||||
inputID uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewInputEventBuilder() *inputEventBuilder {
|
|
||||||
ieBuilder := &inputEventBuilder{}
|
|
||||||
ieBuilder.clearLabels()
|
|
||||||
return ieBuilder
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *inputEventBuilder) clearLabels() {
|
|
||||||
b.inputLabels = make(inputLabels)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *inputEventBuilder) Handle(message Message, timestamp uint64) Message {
|
|
||||||
var inputEvent Message = nil
|
|
||||||
switch msg := message.(type) {
|
|
||||||
case *SetInputTarget:
|
|
||||||
if b.inputID != msg.ID {
|
|
||||||
inputEvent = b.Build()
|
|
||||||
b.inputID = msg.ID
|
|
||||||
}
|
|
||||||
b.inputLabels[msg.ID] = msg.Label
|
|
||||||
return inputEvent
|
|
||||||
case *SetInputValue:
|
|
||||||
if b.inputID != msg.ID {
|
|
||||||
inputEvent = b.Build()
|
|
||||||
b.inputID = msg.ID
|
|
||||||
}
|
|
||||||
if b.inputEvent == nil {
|
|
||||||
b.inputEvent = &InputEvent{
|
|
||||||
MessageID: message.MsgID(),
|
|
||||||
Timestamp: timestamp,
|
|
||||||
Value: msg.Value,
|
|
||||||
ValueMasked: msg.Mask > 0,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
b.inputEvent.Value = msg.Value
|
|
||||||
b.inputEvent.ValueMasked = msg.Mask > 0
|
|
||||||
}
|
|
||||||
return inputEvent
|
|
||||||
case *CreateDocument:
|
|
||||||
inputEvent = b.Build()
|
|
||||||
b.clearLabels()
|
|
||||||
return inputEvent
|
|
||||||
case *MouseClick:
|
|
||||||
return b.Build()
|
|
||||||
}
|
|
||||||
|
|
||||||
if b.inputEvent != nil && b.inputEvent.Timestamp+InputEventTimeout < timestamp {
|
|
||||||
return b.Build()
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *inputEventBuilder) Build() Message {
|
|
||||||
if b.inputEvent == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
inputEvent := b.inputEvent
|
|
||||||
inputEvent.Label = b.inputLabels[b.inputID] // might be empty string
|
|
||||||
|
|
||||||
b.inputEvent = nil
|
|
||||||
return inputEvent
|
|
||||||
}
|
|
||||||
|
|
@ -50,8 +50,6 @@ func (s *saverImpl) handleExtraMessage(msg messages.Message) error {
|
||||||
return s.ch.InsertWebPerformanceTrackAggr(session, m)
|
return s.ch.InsertWebPerformanceTrackAggr(session, m)
|
||||||
case *messages.MouseClick:
|
case *messages.MouseClick:
|
||||||
return s.ch.InsertWebClickEvent(session, m)
|
return s.ch.InsertWebClickEvent(session, m)
|
||||||
case *messages.InputEvent:
|
|
||||||
return s.ch.InsertWebInputEvent(session, m)
|
|
||||||
// Unique for Web
|
// Unique for Web
|
||||||
case *messages.PageEvent:
|
case *messages.PageEvent:
|
||||||
return s.ch.InsertWebPageEvent(session, m)
|
return s.ch.InsertWebPageEvent(session, m)
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,6 @@ package clickhouse
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/ClickHouse/clickhouse-go/v2"
|
|
||||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||||
"log"
|
"log"
|
||||||
"openreplay/backend/pkg/db/types"
|
"openreplay/backend/pkg/db/types"
|
||||||
|
|
@ -26,7 +25,6 @@ type Connector interface {
|
||||||
InsertWebResourceEvent(session *sessions.Session, msg *messages.ResourceTiming) error
|
InsertWebResourceEvent(session *sessions.Session, msg *messages.ResourceTiming) error
|
||||||
InsertWebPageEvent(session *sessions.Session, msg *messages.PageEvent) error
|
InsertWebPageEvent(session *sessions.Session, msg *messages.PageEvent) error
|
||||||
InsertWebClickEvent(session *sessions.Session, msg *messages.MouseClick) error
|
InsertWebClickEvent(session *sessions.Session, msg *messages.MouseClick) error
|
||||||
InsertWebInputEvent(session *sessions.Session, msg *messages.InputEvent) error
|
|
||||||
InsertWebErrorEvent(session *sessions.Session, msg *types.ErrorEvent) error
|
InsertWebErrorEvent(session *sessions.Session, msg *types.ErrorEvent) error
|
||||||
InsertWebPerformanceTrackAggr(session *sessions.Session, msg *messages.PerformanceTrackAggr) error
|
InsertWebPerformanceTrackAggr(session *sessions.Session, msg *messages.PerformanceTrackAggr) error
|
||||||
InsertAutocomplete(session *sessions.Session, msgType, msgValue string) error
|
InsertAutocomplete(session *sessions.Session, msgType, msgValue string) error
|
||||||
|
|
@ -377,26 +375,6 @@ func (c *connectorImpl) InsertWebClickEvent(session *sessions.Session, msg *mess
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *connectorImpl) InsertWebInputEvent(session *sessions.Session, msg *messages.InputEvent) error {
|
|
||||||
if msg.Label == "" {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if err := c.batches["inputs"].Append(
|
|
||||||
session.SessionID,
|
|
||||||
uint16(session.ProjectID),
|
|
||||||
msg.MessageID,
|
|
||||||
datetime(msg.Timestamp),
|
|
||||||
msg.Label,
|
|
||||||
"INPUT",
|
|
||||||
nil,
|
|
||||||
nil,
|
|
||||||
); err != nil {
|
|
||||||
c.checkError("inputs", err)
|
|
||||||
return fmt.Errorf("can't append to inputs batch: %s", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *connectorImpl) InsertWebErrorEvent(session *sessions.Session, msg *types.ErrorEvent) error {
|
func (c *connectorImpl) InsertWebErrorEvent(session *sessions.Session, msg *types.ErrorEvent) error {
|
||||||
keys, values := make([]string, 0, len(msg.Tags)), make([]*string, 0, len(msg.Tags))
|
keys, values := make([]string, 0, len(msg.Tags)), make([]*string, 0, len(msg.Tags))
|
||||||
for k, v := range msg.Tags {
|
for k, v := range msg.Tags {
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue