Tag and watch backend support (#1838)
* feat(ui/tracker): start tag n watch * fix(tracker): test coverage, fix some watcher api * fix(tracker): add intersectionobserver, adjust tests * feat(tracker): relay + apollo plugins * feat(ui): tags search * feat(ui): tags name edit * feat(ui): tags search icon * feat(ui): icons for tabs in player * feat(ui): save and find button * feat(tracker): save tags in session storage (just in case) * feat(ui): improve loading * feat(ui): fix icon names gen * feat(ui): fix typo * feat(backend): added TagAndWatch backend support * feat(backend): use msgIndex as a seqIndex * feat(backend): try to use truncSqlIdx for seq_index * feat(backend): added tags cache and tags filter to DB service * feat(backend): removed debug logs --------- Co-authored-by: nick-delirium <nikita@openreplay.com>
This commit is contained in:
parent
133cbc049d
commit
622b189d39
10 changed files with 161 additions and 3 deletions
|
|
@ -15,6 +15,7 @@ import (
|
|||
"openreplay/backend/pkg/projects"
|
||||
"openreplay/backend/pkg/queue"
|
||||
"openreplay/backend/pkg/sessions"
|
||||
"openreplay/backend/pkg/tags"
|
||||
"openreplay/backend/pkg/terminator"
|
||||
)
|
||||
|
||||
|
|
@ -47,9 +48,10 @@ func main() {
|
|||
|
||||
projManager := projects.New(pgConn, redisClient)
|
||||
sessManager := sessions.New(pgConn, projManager, redisClient)
|
||||
tagsManager := tags.New(pgConn)
|
||||
|
||||
// Init data saver
|
||||
saver := datasaver.New(cfg, pg, sessManager)
|
||||
saver := datasaver.New(cfg, pg, sessManager, tagsManager)
|
||||
|
||||
// Message filter
|
||||
msgFilter := []int{
|
||||
|
|
@ -60,7 +62,7 @@ func main() {
|
|||
messages.MsgFetch, messages.MsgNetworkRequest, messages.MsgGraphQL, messages.MsgStateAction, messages.MsgMouseClick,
|
||||
messages.MsgSetPageLocation, messages.MsgPageLoadTiming, messages.MsgPageRenderTiming,
|
||||
messages.MsgPageEvent, messages.MsgMouseThrashing, messages.MsgInputChange,
|
||||
messages.MsgUnbindNodes, messages.MsgCanvasNode,
|
||||
messages.MsgUnbindNodes, messages.MsgCanvasNode, messages.MsgTagTrigger,
|
||||
// Mobile messages
|
||||
messages.MsgIOSSessionStart, messages.MsgIOSSessionEnd, messages.MsgIOSUserID, messages.MsgIOSUserAnonymousID,
|
||||
messages.MsgIOSMetadata, messages.MsgIOSEvent, messages.MsgIOSNetworkCall,
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package datasaver
|
|||
|
||||
import (
|
||||
"log"
|
||||
"openreplay/backend/pkg/tags"
|
||||
|
||||
"openreplay/backend/internal/config/db"
|
||||
"openreplay/backend/pkg/db/clickhouse"
|
||||
|
|
@ -24,13 +25,15 @@ type saverImpl struct {
|
|||
sessions sessions.Sessions
|
||||
ch clickhouse.Connector
|
||||
producer queue.Producer
|
||||
tags tags.Tags
|
||||
}
|
||||
|
||||
func New(cfg *db.Config, pg *postgres.Conn, session sessions.Sessions) Saver {
|
||||
func New(cfg *db.Config, pg *postgres.Conn, session sessions.Sessions, tags tags.Tags) Saver {
|
||||
s := &saverImpl{
|
||||
cfg: cfg,
|
||||
pg: pg,
|
||||
sessions: session,
|
||||
tags: tags,
|
||||
}
|
||||
s.init()
|
||||
return s
|
||||
|
|
@ -126,6 +129,11 @@ func (s *saverImpl) handleMessage(msg Message) error {
|
|||
case *Metadata:
|
||||
return s.sessions.UpdateMetadata(m.SessionID(), m.Key, m.Value)
|
||||
case *IssueEvent:
|
||||
if m.Type == "dead_click" || m.Type == "click_rage" {
|
||||
if s.tags.ShouldIgnoreTag(session.ProjectID, m.Context) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
err = s.pg.InsertIssueEvent(session, m)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
@ -194,6 +202,10 @@ func (s *saverImpl) handleMessage(msg Message) error {
|
|||
if err = s.pg.InsertCanvasNode(session, m); err != nil {
|
||||
return err
|
||||
}
|
||||
case *TagTrigger:
|
||||
if err = s.pg.InsertTagTrigger(session, m); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -588,3 +588,31 @@ func (e *Router) imagesUploaderHandlerWeb(w http.ResponseWriter, r *http.Request
|
|||
}
|
||||
ResponseOK(w, startTime, r.URL.Path, 0)
|
||||
}
|
||||
|
||||
func (e *Router) getTags(w http.ResponseWriter, r *http.Request) {
|
||||
startTime := time.Now()
|
||||
bodySize := 0
|
||||
|
||||
// Check authorization
|
||||
sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r)
|
||||
if err != nil {
|
||||
ResponseWithError(w, http.StatusUnauthorized, err, startTime, r.URL.Path, bodySize)
|
||||
return
|
||||
}
|
||||
sessInfo, err := e.services.Sessions.Get(sessionData.ID)
|
||||
if err != nil {
|
||||
ResponseWithError(w, http.StatusUnauthorized, err, startTime, r.URL.Path, bodySize)
|
||||
return
|
||||
}
|
||||
|
||||
// Get tags
|
||||
tags, err := e.services.Tags.Get(sessInfo.ProjectID)
|
||||
if err != nil {
|
||||
ResponseWithError(w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
|
||||
return
|
||||
}
|
||||
type UrlResponse struct {
|
||||
Tags interface{} `json:"tags"`
|
||||
}
|
||||
ResponseWithJSON(w, &UrlResponse{Tags: tags}, startTime, r.URL.Path, bodySize)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -116,6 +116,7 @@ func (e *Router) init() {
|
|||
getHandlers := map[string]func(http.ResponseWriter, *http.Request){
|
||||
"/v1/web/uxt/test/{id}": e.getUXTestInfo,
|
||||
"/v1/web/uxt/upload-url": e.getUXUploadUrl,
|
||||
"/v1/web/tags": e.getTags,
|
||||
}
|
||||
prefix := "/ingest"
|
||||
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ import (
|
|||
"openreplay/backend/pkg/projects"
|
||||
"openreplay/backend/pkg/queue/types"
|
||||
"openreplay/backend/pkg/sessions"
|
||||
"openreplay/backend/pkg/tags"
|
||||
"openreplay/backend/pkg/token"
|
||||
"openreplay/backend/pkg/uxtesting"
|
||||
)
|
||||
|
|
@ -29,6 +30,7 @@ type ServicesBuilder struct {
|
|||
Tokenizer *token.Tokenizer
|
||||
ObjStorage objectstorage.ObjectStorage
|
||||
UXTesting uxtesting.UXTesting
|
||||
Tags tags.Tags
|
||||
}
|
||||
|
||||
func New(cfg *http.Config, producer types.Producer, pgconn pool.Pool, redis *redis.Client) (*ServicesBuilder, error) {
|
||||
|
|
@ -49,5 +51,6 @@ func New(cfg *http.Config, producer types.Producer, pgconn pool.Pool, redis *red
|
|||
Flaker: flakeid.NewFlaker(cfg.WorkerID),
|
||||
ObjStorage: objStore,
|
||||
UXTesting: uxtesting.New(pgconn),
|
||||
Tags: tags.New(pgconn),
|
||||
}, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ type BulkSet struct {
|
|||
webClickEvents Bulk
|
||||
webNetworkRequest Bulk
|
||||
webCanvasNodes Bulk
|
||||
webTagTriggers Bulk
|
||||
workerTask chan *bulksTask
|
||||
done chan struct{}
|
||||
finished chan struct{}
|
||||
|
|
@ -79,6 +80,8 @@ func (conn *BulkSet) Get(name string) Bulk {
|
|||
return conn.webNetworkRequest
|
||||
case "canvasNodes":
|
||||
return conn.webCanvasNodes
|
||||
case "tagTriggers":
|
||||
return conn.webTagTriggers
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
|
@ -207,6 +210,11 @@ func (conn *BulkSet) initBulks() {
|
|||
"(session_id, recording_id, timestamp)",
|
||||
"($%d, $%d, $%d)",
|
||||
3, 200)
|
||||
conn.webTagTriggers, err = NewBulk(conn.c,
|
||||
"events.tags",
|
||||
"(session_id, timestamp, seq_index, tag_id)",
|
||||
"($%d, $%d, $%d, $%d)",
|
||||
4, 200)
|
||||
if err != nil {
|
||||
log.Fatalf("can't create webCanvasNodes bulk: %s", err)
|
||||
}
|
||||
|
|
@ -231,6 +239,7 @@ func (conn *BulkSet) Send() {
|
|||
newTask.bulks = append(newTask.bulks, conn.webClickEvents)
|
||||
newTask.bulks = append(newTask.bulks, conn.webNetworkRequest)
|
||||
newTask.bulks = append(newTask.bulks, conn.webCanvasNodes)
|
||||
newTask.bulks = append(newTask.bulks, conn.webTagTriggers)
|
||||
|
||||
conn.workerTask <- newTask
|
||||
|
||||
|
|
|
|||
|
|
@ -219,6 +219,13 @@ func (conn *Conn) InsertCanvasNode(sess *sessions.Session, m *messages.CanvasNod
|
|||
return nil
|
||||
}
|
||||
|
||||
func (conn *Conn) InsertTagTrigger(sess *sessions.Session, m *messages.TagTrigger) error {
|
||||
if err := conn.bulks.Get("tagTriggers").Append(sess.SessionID, m.Timestamp, truncSqIdx(m.Index), m.TagId); err != nil {
|
||||
log.Printf("insert tag trigger %d to db, err: %s", m.TagId, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (conn *Conn) InsertWebStatsPerformance(p *messages.PerformanceTrackAggr) error {
|
||||
sessionID := p.SessionID()
|
||||
timestamp := (p.TimestampEnd + p.TimestampStart) / 2
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ const MinClicksInARow = 3
|
|||
type ClickRageDetector struct {
|
||||
lastTimestamp uint64
|
||||
lastLabel string
|
||||
lastSelector string
|
||||
firstInARawTimestamp uint64
|
||||
firstInARawMessageId uint64
|
||||
countsInARow int
|
||||
|
|
@ -22,6 +23,7 @@ type ClickRageDetector struct {
|
|||
func (crd *ClickRageDetector) reset() {
|
||||
crd.lastTimestamp = 0
|
||||
crd.lastLabel = ""
|
||||
crd.lastSelector = ""
|
||||
crd.firstInARawTimestamp = 0
|
||||
crd.firstInARawMessageId = 0
|
||||
crd.countsInARow = 0
|
||||
|
|
@ -49,6 +51,7 @@ func (crd *ClickRageDetector) Build() Message {
|
|||
Timestamp: crd.firstInARawTimestamp,
|
||||
MessageID: crd.firstInARawMessageId,
|
||||
URL: crd.url,
|
||||
Context: crd.lastSelector, // hack to pass selector to db (tags filter)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -74,6 +77,7 @@ func (crd *ClickRageDetector) Handle(message Message, timestamp uint64) Message
|
|||
// Use current message as init values for new event
|
||||
crd.lastTimestamp = timestamp
|
||||
crd.lastLabel = msg.Label
|
||||
crd.lastSelector = msg.Selector
|
||||
crd.firstInARawTimestamp = timestamp
|
||||
crd.firstInARawMessageId = message.MsgID()
|
||||
crd.countsInARow = 1
|
||||
|
|
|
|||
|
|
@ -44,6 +44,7 @@ func (d *DeadClickDetector) Build() Message {
|
|||
ContextString: d.lastMouseClick.Label,
|
||||
Timestamp: d.lastClickTimestamp,
|
||||
MessageID: d.lastMessageID,
|
||||
Context: d.lastMouseClick.Selector, // hack to pass selector to db (tags filter)
|
||||
}
|
||||
return event
|
||||
}
|
||||
|
|
|
|||
91
backend/pkg/tags/tags.go
Normal file
91
backend/pkg/tags/tags.go
Normal file
|
|
@ -0,0 +1,91 @@
|
|||
package tags
|
||||
|
||||
import (
|
||||
"log"
|
||||
"openreplay/backend/pkg/cache"
|
||||
"openreplay/backend/pkg/db/postgres/pool"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Tag struct {
|
||||
ID int `json:"id"`
|
||||
Selector string `json:"selector"`
|
||||
IgnoreClickRage bool `json:"icr"`
|
||||
IgnoreDeadClick bool `json:"idc"`
|
||||
}
|
||||
|
||||
type Tags interface {
|
||||
Get(projectID uint32) ([]Tag, error)
|
||||
ShouldIgnoreTag(projectID uint32, selector string) bool
|
||||
}
|
||||
|
||||
type tagsImpl struct {
|
||||
db pool.Pool
|
||||
cache cache.Cache
|
||||
}
|
||||
|
||||
func New(db pool.Pool) Tags {
|
||||
return &tagsImpl{
|
||||
db: db,
|
||||
cache: cache.New(time.Minute*5, time.Minute*10),
|
||||
}
|
||||
}
|
||||
|
||||
func (t *tagsImpl) Get(projectID uint32) ([]Tag, error) {
|
||||
rows, err := t.db.Query(`
|
||||
SELECT tag_id, selector, ignore_click_rage, ignore_dead_click
|
||||
FROM tags WHERE project_id = $1 AND deleted_at IS NULL`, projectID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var (
|
||||
tags []Tag
|
||||
id int
|
||||
selector string
|
||||
ignoreClickRage bool
|
||||
ignoreDeadClick bool
|
||||
)
|
||||
for rows.Next() {
|
||||
if err := rows.Scan(&id, &selector, &ignoreClickRage, &ignoreDeadClick); err != nil {
|
||||
log.Printf("can't scan tag: %s", err)
|
||||
continue
|
||||
}
|
||||
tags = append(tags, Tag{
|
||||
ID: id,
|
||||
Selector: selector,
|
||||
IgnoreClickRage: ignoreClickRage,
|
||||
IgnoreDeadClick: ignoreDeadClick,
|
||||
})
|
||||
}
|
||||
return tags, nil
|
||||
}
|
||||
|
||||
func (t *tagsImpl) ShouldIgnoreTag(projectID uint32, selector string) bool {
|
||||
var (
|
||||
tags []Tag
|
||||
err error
|
||||
needToUpdateCache bool
|
||||
)
|
||||
tagsData, ok := t.cache.Get(projectID)
|
||||
if !ok {
|
||||
// Try to load from DB and update cache
|
||||
tagsData, err = t.Get(projectID)
|
||||
if err != nil {
|
||||
log.Printf("can't get tags info: %s", err)
|
||||
return false
|
||||
}
|
||||
needToUpdateCache = true
|
||||
}
|
||||
tags = tagsData.([]Tag)
|
||||
if needToUpdateCache {
|
||||
t.cache.Set(projectID, tags)
|
||||
}
|
||||
for _, tag := range tags {
|
||||
if tag.Selector == selector {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue