[DB] moved click events to bulks (#892)
* feat(backend): moved click events to bulks * feat(backend): insert click event with host + path
This commit is contained in:
parent
205fdcba42
commit
f1d852abb4
5 changed files with 93 additions and 73 deletions
|
|
@ -41,6 +41,8 @@ type Conn struct {
|
|||
webIssues Bulk
|
||||
webIssueEvents Bulk
|
||||
webCustomEvents Bulk
|
||||
webClickEvents Bulk
|
||||
webNetworkRequest Bulk
|
||||
sessionUpdates map[uint64]*sessionUpdates
|
||||
batchQueueLimit int
|
||||
batchSizeLimit int
|
||||
|
|
@ -111,25 +113,25 @@ func (conn *Conn) initBulks() {
|
|||
"autocomplete",
|
||||
"(value, type, project_id)",
|
||||
"($%d, $%d, $%d)",
|
||||
3, 100)
|
||||
3, 200)
|
||||
if err != nil {
|
||||
log.Fatalf("can't create autocomplete bulk")
|
||||
log.Fatalf("can't create autocomplete bulk: %s", err)
|
||||
}
|
||||
conn.requests, err = NewBulk(conn.c,
|
||||
"events_common.requests",
|
||||
"(session_id, timestamp, seq_index, url, duration, success)",
|
||||
"($%d, $%d, $%d, left($%d, 2700), $%d, $%d)",
|
||||
6, 100)
|
||||
6, 200)
|
||||
if err != nil {
|
||||
log.Fatalf("can't create requests bulk")
|
||||
log.Fatalf("can't create requests bulk: %s", err)
|
||||
}
|
||||
conn.customEvents, err = NewBulk(conn.c,
|
||||
"events_common.customs",
|
||||
"(session_id, timestamp, seq_index, name, payload)",
|
||||
"($%d, $%d, $%d, left($%d, 2700), $%d)",
|
||||
5, 100)
|
||||
5, 200)
|
||||
if err != nil {
|
||||
log.Fatalf("can't create customEvents bulk")
|
||||
log.Fatalf("can't create customEvents bulk: %s", err)
|
||||
}
|
||||
conn.webPageEvents, err = NewBulk(conn.c,
|
||||
"events.pages",
|
||||
|
|
@ -138,73 +140,89 @@ func (conn *Conn) initBulks() {
|
|||
"time_to_interactive, response_time, dom_building_time)",
|
||||
"($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0),"+
|
||||
" NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0))",
|
||||
18, 100)
|
||||
18, 200)
|
||||
if err != nil {
|
||||
log.Fatalf("can't create webPageEvents bulk")
|
||||
log.Fatalf("can't create webPageEvents bulk: %s", err)
|
||||
}
|
||||
conn.webInputEvents, err = NewBulk(conn.c,
|
||||
"events.inputs",
|
||||
"(session_id, message_id, timestamp, value, label)",
|
||||
"($%d, $%d, $%d, $%d, NULLIF($%d,''))",
|
||||
5, 100)
|
||||
5, 200)
|
||||
if err != nil {
|
||||
log.Fatalf("can't create webPageEvents bulk")
|
||||
log.Fatalf("can't create webPageEvents bulk: %s", err)
|
||||
}
|
||||
conn.webGraphQL, err = NewBulk(conn.c,
|
||||
"events.graphql",
|
||||
"(session_id, timestamp, message_id, name, request_body, response_body)",
|
||||
"($%d, $%d, $%d, left($%d, 2700), $%d, $%d)",
|
||||
6, 100)
|
||||
6, 200)
|
||||
if err != nil {
|
||||
log.Fatalf("can't create webPageEvents bulk")
|
||||
log.Fatalf("can't create webPageEvents bulk: %s", err)
|
||||
}
|
||||
conn.webErrors, err = NewBulk(conn.c,
|
||||
"errors",
|
||||
"(error_id, project_id, source, name, message, payload)",
|
||||
"($%d, $%d, $%d, $%d, $%d, $%d::jsonb)",
|
||||
6, 100)
|
||||
6, 200)
|
||||
if err != nil {
|
||||
log.Fatalf("can't create webErrors bulk")
|
||||
log.Fatalf("can't create webErrors bulk: %s", err)
|
||||
}
|
||||
conn.webErrorEvents, err = NewBulk(conn.c,
|
||||
"events.errors",
|
||||
"(session_id, message_id, timestamp, error_id)",
|
||||
"($%d, $%d, $%d, $%d)",
|
||||
4, 100)
|
||||
4, 200)
|
||||
if err != nil {
|
||||
log.Fatalf("can't create webErrorEvents bulk")
|
||||
log.Fatalf("can't create webErrorEvents bulk: %s", err)
|
||||
}
|
||||
conn.webErrorTags, err = NewBulk(conn.c,
|
||||
"public.errors_tags",
|
||||
"(session_id, message_id, error_id, key, value)",
|
||||
"($%d, $%d, $%d, $%d, $%d)",
|
||||
5, 100)
|
||||
5, 200)
|
||||
if err != nil {
|
||||
log.Fatalf("can't create webErrorEvents bulk")
|
||||
log.Fatalf("can't create webErrorEvents bulk: %s", err)
|
||||
}
|
||||
conn.webIssues, err = NewBulk(conn.c,
|
||||
"issues",
|
||||
"(project_id, issue_id, type, context_string)",
|
||||
"($%d, $%d, $%d, $%d)",
|
||||
4, 100)
|
||||
4, 200)
|
||||
if err != nil {
|
||||
log.Fatalf("can't create webIssues bulk")
|
||||
log.Fatalf("can't create webIssues bulk: %s", err)
|
||||
}
|
||||
conn.webIssueEvents, err = NewBulk(conn.c,
|
||||
"events_common.issues",
|
||||
"(session_id, issue_id, timestamp, seq_index, payload)",
|
||||
"($%d, $%d, $%d, $%d, CAST($%d AS jsonb))",
|
||||
5, 100)
|
||||
5, 200)
|
||||
if err != nil {
|
||||
log.Fatalf("can't create webIssueEvents bulk")
|
||||
log.Fatalf("can't create webIssueEvents bulk: %s", err)
|
||||
}
|
||||
conn.webCustomEvents, err = NewBulk(conn.c,
|
||||
"events_common.customs",
|
||||
"(session_id, seq_index, timestamp, name, payload, level)",
|
||||
"($%d, $%d, $%d, left($%d, 2700), $%d, $%d)",
|
||||
6, 100)
|
||||
6, 200)
|
||||
if err != nil {
|
||||
log.Fatalf("can't create webCustomEvents bulk")
|
||||
log.Fatalf("can't create webCustomEvents bulk: %s", err)
|
||||
}
|
||||
conn.webClickEvents, err = NewBulk(conn.c,
|
||||
"events.clicks",
|
||||
"(session_id, message_id, timestamp, label, selector, url, path)",
|
||||
"($%d, $%d, $%d, NULLIF($%d, ''), $%d, $%d, $%d)",
|
||||
7, 200)
|
||||
if err != nil {
|
||||
log.Fatalf("can't create webClickEvents bulk: %s", err)
|
||||
}
|
||||
conn.webNetworkRequest, err = NewBulk(conn.c,
|
||||
"events_common.requests",
|
||||
"(session_id, timestamp, seq_index, url, host, path, query, request_body, response_body, status_code, method, duration, success)",
|
||||
"($%d, $%d, $%d, left($%d, 2700), $%d, $%d, $%d, $%d, $%d, $%d::smallint, NULLIF($%d, '')::http_method, $%d, $%d)",
|
||||
13, 200)
|
||||
if err != nil {
|
||||
log.Fatalf("can't create webNetworkRequest bulk: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -296,6 +314,12 @@ func (conn *Conn) sendBulks() {
|
|||
if err := conn.webCustomEvents.Send(); err != nil {
|
||||
log.Printf("webCustomEvents bulk send err: %s", err)
|
||||
}
|
||||
if err := conn.webClickEvents.Send(); err != nil {
|
||||
log.Printf("webClickEvents bulk send err: %s", err)
|
||||
}
|
||||
if err := conn.webNetworkRequest.Send(); err != nil {
|
||||
log.Printf("webNetworkRequest bulk send err: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (conn *Conn) CommitBatches() {
|
||||
|
|
|
|||
|
|
@ -58,16 +58,12 @@ func (conn *Conn) InsertWebPageEvent(sessionID uint64, projectID uint32, e *Page
|
|||
}
|
||||
|
||||
func (conn *Conn) InsertWebClickEvent(sessionID uint64, projectID uint32, e *ClickEvent) error {
|
||||
sqlRequest := `
|
||||
INSERT INTO events.clicks
|
||||
(session_id, message_id, timestamp, label, selector, url)
|
||||
(SELECT
|
||||
$1, $2, $3, NULLIF($4, ''), $5, host || path
|
||||
FROM events.pages
|
||||
WHERE session_id = $1 AND timestamp <= $3 ORDER BY timestamp DESC LIMIT 1
|
||||
)
|
||||
`
|
||||
conn.batchQueue(sessionID, sqlRequest, sessionID, truncSqIdx(e.MessageID), e.Timestamp, e.Label, e.Selector)
|
||||
var host, path string
|
||||
host, path, _, _ = url.GetURLParts(e.Url)
|
||||
log.Println("insert web click:", host, path)
|
||||
if err := conn.webClickEvents.Append(sessionID, truncSqIdx(e.MessageID), e.Timestamp, e.Label, e.Selector, host+path, path); err != nil {
|
||||
log.Printf("insert web click err: %s", err)
|
||||
}
|
||||
// Accumulate session updates and exec inside batch with another sql commands
|
||||
conn.updateSessionEvents(sessionID, 1, 0)
|
||||
// Add new value set to autocomplete bulk
|
||||
|
|
@ -119,29 +115,8 @@ func (conn *Conn) InsertWebNetworkRequest(sessionID uint64, projectID uint32, sa
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sqlRequest := `
|
||||
INSERT INTO events_common.requests (
|
||||
session_id, timestamp, seq_index,
|
||||
url, host, path, query,
|
||||
request_body, response_body, status_code, method,
|
||||
duration, success
|
||||
) VALUES (
|
||||
$1, $2, $3,
|
||||
left($4, 2700), $5, $6, $7,
|
||||
$8, $9, $10::smallint, NULLIF($11, '')::http_method,
|
||||
$12, $13
|
||||
) ON CONFLICT DO NOTHING`
|
||||
conn.batchQueue(sessionID, sqlRequest,
|
||||
sessionID, e.Meta().Timestamp, truncSqIdx(e.Meta().Index),
|
||||
e.URL, host, path, query,
|
||||
request, response, e.Status, url.EnsureMethod(e.Method),
|
||||
e.Duration, e.Status < 400,
|
||||
)
|
||||
|
||||
// Record approximate message size
|
||||
conn.updateBatchSize(sessionID, len(sqlRequest)+len(e.URL)+len(host)+len(path)+len(query)+
|
||||
len(e.Request)+len(e.Response)+len(url.EnsureMethod(e.Method))+8*5+1)
|
||||
conn.webNetworkRequest.Append(sessionID, e.Meta().Timestamp, truncSqIdx(e.Meta().Index), e.URL, host, path, query,
|
||||
request, response, e.Status, url.EnsureMethod(e.Method), e.Duration, e.Status < 400)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -19,20 +19,6 @@ func (f *NetworkIssueDetector) Build() Message {
|
|||
|
||||
func (f *NetworkIssueDetector) Handle(message Message, messageID uint64, timestamp uint64) Message {
|
||||
switch msg := message.(type) {
|
||||
// case *ResourceTiming:
|
||||
// success := msg.Duration != 0 // The only available way here
|
||||
// if !success {
|
||||
// issueType := "missing_resource"
|
||||
// if msg.Initiator == "fetch" || msg.Initiator == "xmlhttprequest" {
|
||||
// issueType = "bad_request"
|
||||
// }
|
||||
// return &IssueEvent{
|
||||
// Type: issueType,
|
||||
// MessageID: messageID,
|
||||
// Timestamp: msg.Timestamp,
|
||||
// ContextString: msg.URL,
|
||||
// }
|
||||
// }
|
||||
case *NetworkRequest:
|
||||
if msg.Status >= 400 {
|
||||
return &IssueEvent{
|
||||
|
|
|
|||
22
backend/pkg/messages/cache.go
Normal file
22
backend/pkg/messages/cache.go
Normal file
|
|
@ -0,0 +1,22 @@
|
|||
package messages
|
||||
|
||||
type pageLocations struct {
|
||||
urls map[uint64]string
|
||||
}
|
||||
|
||||
func NewPageLocations() *pageLocations {
|
||||
return &pageLocations{urls: make(map[uint64]string)}
|
||||
}
|
||||
|
||||
func (p *pageLocations) Set(sessID uint64, url string) {
|
||||
p.urls[sessID] = url
|
||||
}
|
||||
|
||||
func (p *pageLocations) Get(sessID uint64) string {
|
||||
url := p.urls[sessID]
|
||||
return url
|
||||
}
|
||||
|
||||
func (p *pageLocations) Delete(sessID uint64) {
|
||||
delete(p.urls, sessID)
|
||||
}
|
||||
|
|
@ -24,10 +24,15 @@ type messageIteratorImpl struct {
|
|||
broken bool
|
||||
messageInfo *message
|
||||
batchInfo *BatchInfo
|
||||
urls *pageLocations
|
||||
}
|
||||
|
||||
func NewMessageIterator(messageHandler MessageHandler, messageFilter []int, autoDecode bool) MessageIterator {
|
||||
iter := &messageIteratorImpl{handler: messageHandler, autoDecode: autoDecode}
|
||||
iter := &messageIteratorImpl{
|
||||
handler: messageHandler,
|
||||
autoDecode: autoDecode,
|
||||
urls: NewPageLocations(),
|
||||
}
|
||||
if len(messageFilter) != 0 {
|
||||
filter := make(map[int]struct{}, len(messageFilter))
|
||||
for _, msgType := range messageFilter {
|
||||
|
|
@ -125,7 +130,7 @@ func (i *messageIteratorImpl) preprocessing(msg Message) error {
|
|||
if m.Timestamp == 0 {
|
||||
i.zeroTsLog("BatchMetadata")
|
||||
}
|
||||
i.messageInfo.Url = m.Url
|
||||
i.messageInfo.Url = m.Location
|
||||
i.version = m.Version
|
||||
i.batchInfo.version = m.Version
|
||||
|
||||
|
|
@ -138,6 +143,10 @@ func (i *messageIteratorImpl) preprocessing(msg Message) error {
|
|||
if m.Timestamp == 0 {
|
||||
i.zeroTsLog("BatchMeta")
|
||||
}
|
||||
// Try to get saved session's page url
|
||||
if savedURL := i.urls.Get(i.messageInfo.batch.sessionID); savedURL != "" {
|
||||
i.messageInfo.Url = savedURL
|
||||
}
|
||||
|
||||
case *Timestamp:
|
||||
i.messageInfo.Timestamp = int64(m.Timestamp)
|
||||
|
|
@ -158,9 +167,13 @@ func (i *messageIteratorImpl) preprocessing(msg Message) error {
|
|||
if m.Timestamp == 0 {
|
||||
i.zeroTsLog("SessionEnd")
|
||||
}
|
||||
// Delete session from urls cache layer
|
||||
i.urls.Delete(i.messageInfo.batch.sessionID)
|
||||
|
||||
case *SetPageLocation:
|
||||
i.messageInfo.Url = m.URL
|
||||
// Save session page url in cache for using in next batches
|
||||
i.urls.Set(i.messageInfo.batch.sessionID, m.URL)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue