feat(backend): insert error tags+ (#768)
* feat(backend): transform legacy messages * refactor(backend/postgres):remove seqIndex transformation * feat(backend/db): parse & insert errors_tags * chore(backend): remove building folder after build * fix(backend/db):remove message types from filter that do not go through kafka * fix(backend/assets):use JSException instead of ErrorEvent * refactor(backend/db):ErrorEvent is no longer a message * feat(backend/db):insert error tags in clickhouse * refactor(backend):remove unused LongTask code
This commit is contained in:
parent
8343274cd3
commit
974afa454b
26 changed files with 2769 additions and 2874 deletions
|
|
@ -38,6 +38,8 @@ function build_api(){
|
|||
}
|
||||
[[ $2 != "" ]] && {
|
||||
build_service $2
|
||||
cd ../backend
|
||||
rm -rf ../_backend
|
||||
return
|
||||
}
|
||||
for image in $(ls cmd);
|
||||
|
|
|
|||
|
|
@ -35,10 +35,8 @@ func main() {
|
|||
case *messages.AssetCache:
|
||||
cacher.CacheURL(m.SessionID(), m.URL)
|
||||
totalAssets.Add(context.Background(), 1)
|
||||
case *messages.ErrorEvent:
|
||||
if m.Source != "js_exception" {
|
||||
return
|
||||
}
|
||||
// TODO: connect to "raw" topic in order to listen for JSException
|
||||
case *messages.JSException:
|
||||
sourceList, err := assets.ExtractJSExceptionSources(&m.Payload)
|
||||
if err != nil {
|
||||
log.Printf("Error on source extraction: %v", err)
|
||||
|
|
@ -53,7 +51,7 @@ func main() {
|
|||
msgConsumer := queue.NewConsumer(
|
||||
cfg.GroupCache,
|
||||
[]string{cfg.TopicCache},
|
||||
messages.NewMessageIterator(msgHandler, []int{messages.MsgAssetCache, messages.MsgErrorEvent}, true),
|
||||
messages.NewMessageIterator(msgHandler, []int{messages.MsgAssetCache, messages.MsgJSException}, true),
|
||||
true,
|
||||
cfg.MessageSizeLimit,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -57,10 +57,9 @@ func main() {
|
|||
statsLogger := logger.NewQueueStats(cfg.LoggerTimeout)
|
||||
|
||||
msgFilter := []int{messages.MsgMetadata, messages.MsgIssueEvent, messages.MsgSessionStart, messages.MsgSessionEnd,
|
||||
messages.MsgUserID, messages.MsgUserAnonymousID, messages.MsgCustomEvent, messages.MsgClickEvent,
|
||||
messages.MsgInputEvent, messages.MsgPageEvent, messages.MsgErrorEvent, messages.MsgFetchEvent,
|
||||
messages.MsgGraphQLEvent, messages.MsgIntegrationEvent, messages.MsgPerformanceTrackAggr,
|
||||
messages.MsgResourceEvent, messages.MsgLongTask, messages.MsgJSException, messages.MsgResourceTiming,
|
||||
messages.MsgUserID, messages.MsgUserAnonymousID, messages.MsgClickEvent,
|
||||
messages.MsgIntegrationEvent, messages.MsgPerformanceTrackAggr,
|
||||
messages.MsgJSException, messages.MsgResourceTiming,
|
||||
messages.MsgRawCustomEvent, messages.MsgCustomIssue, messages.MsgFetch, messages.MsgGraphQL,
|
||||
messages.MsgStateAction, messages.MsgSetInputTarget, messages.MsgSetInputValue, messages.MsgCreateDocument,
|
||||
messages.MsgMouseClick, messages.MsgSetPageLocation, messages.MsgPageLoadTiming, messages.MsgPageRenderTiming}
|
||||
|
|
|
|||
|
|
@ -38,23 +38,16 @@ func (mi *Saver) InsertMessage(msg Message) error {
|
|||
case *PageEvent:
|
||||
mi.sendToFTS(msg, sessionID)
|
||||
return mi.pg.InsertWebPageEvent(sessionID, m)
|
||||
case *ErrorEvent:
|
||||
return mi.pg.InsertWebErrorEvent(sessionID, m)
|
||||
case *FetchEvent:
|
||||
mi.sendToFTS(msg, sessionID)
|
||||
return mi.pg.InsertWebFetchEvent(sessionID, m)
|
||||
case *GraphQLEvent:
|
||||
mi.sendToFTS(msg, sessionID)
|
||||
return mi.pg.InsertWebGraphQLEvent(sessionID, m)
|
||||
case *JSException:
|
||||
return mi.pg.InsertWebJSException(m)
|
||||
case *IntegrationEvent:
|
||||
return mi.pg.InsertWebErrorEvent(sessionID, &ErrorEvent{
|
||||
MessageID: m.Meta().Index,
|
||||
Timestamp: m.Timestamp,
|
||||
Source: m.Source,
|
||||
Name: m.Name,
|
||||
Message: m.Message,
|
||||
Payload: m.Payload,
|
||||
})
|
||||
return mi.pg.InsertWebIntegrationEvent(m)
|
||||
|
||||
// IOS
|
||||
case *IOSSessionStart:
|
||||
|
|
|
|||
|
|
@ -16,8 +16,6 @@ func (si *Saver) InsertStats(session *Session, msg Message) error {
|
|||
return si.pg.InsertWebStatsPerformance(session.SessionID, m)
|
||||
case *ResourceEvent:
|
||||
return si.pg.InsertWebStatsResourceEvent(session.SessionID, m)
|
||||
case *LongTask:
|
||||
return si.pg.InsertWebStatsLongtask(session.SessionID, m)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
6
backend/pkg/db/cache/messages-web.go
vendored
6
backend/pkg/db/cache/messages-web.go
vendored
|
|
@ -71,6 +71,12 @@ func (c *PGCache) HandleWebSessionEnd(sessionID uint64, e *SessionEnd) error {
|
|||
return c.HandleSessionEnd(sessionID)
|
||||
}
|
||||
|
||||
func (c *PGCache) InsertWebJSException(e *JSException) error {
|
||||
return c.InsertWebErrorEvent(e.SessionID(), WrapJSException(e))
|
||||
}
|
||||
func (c *PGCache) InsertWebIntegrationEvent(e *IntegrationEvent) error {
|
||||
return c.InsertWebErrorEvent(e.SessionID(), WrapIntegrationEvent(e))
|
||||
}
|
||||
func (c *PGCache) InsertWebErrorEvent(sessionID uint64, e *ErrorEvent) error {
|
||||
session, err := c.GetSession(sessionID)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -103,14 +103,14 @@ func (conn *Conn) HandleSessionEnd(sessionID uint64) error {
|
|||
}
|
||||
|
||||
func (conn *Conn) InsertRequest(sessionID uint64, timestamp uint64, index uint64, url string, duration uint64, success bool) error {
|
||||
if err := conn.requests.Append(sessionID, timestamp, getSqIdx(index), url, duration, success); err != nil {
|
||||
if err := conn.requests.Append(sessionID, timestamp, index, url, duration, success); err != nil {
|
||||
return fmt.Errorf("insert request in bulk err: %s", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (conn *Conn) InsertCustomEvent(sessionID uint64, timestamp uint64, index uint64, name string, payload string) error {
|
||||
if err := conn.customEvents.Append(sessionID, timestamp, getSqIdx(index), name, payload); err != nil {
|
||||
if err := conn.customEvents.Append(sessionID, timestamp, index, name, payload); err != nil {
|
||||
return fmt.Errorf("insert custom event in bulk err: %s", err)
|
||||
}
|
||||
return nil
|
||||
|
|
@ -184,7 +184,7 @@ func (conn *Conn) InsertIssueEvent(sessionID uint64, projectID uint32, e *messag
|
|||
$1, $2, $3, $4, CAST($5 AS jsonb)
|
||||
)`,
|
||||
sessionID, issueID, e.Timestamp,
|
||||
getSqIdx(e.MessageID),
|
||||
e.MessageID,
|
||||
payload,
|
||||
); err != nil {
|
||||
return err
|
||||
|
|
@ -204,7 +204,7 @@ func (conn *Conn) InsertIssueEvent(sessionID uint64, projectID uint32, e *messag
|
|||
VALUES
|
||||
($1, $2, $3, left($4, 2700), $5, 'error')
|
||||
`,
|
||||
sessionID, getSqIdx(e.MessageID), e.Timestamp, e.ContextString, e.Payload,
|
||||
sessionID, e.MessageID, e.Timestamp, e.ContextString, e.Payload,
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,11 +5,6 @@ import (
|
|||
"openreplay/backend/pkg/url"
|
||||
)
|
||||
|
||||
func (conn *Conn) InsertWebStatsLongtask(sessionID uint64, l *LongTask) error {
|
||||
return nil // Do we even use them?
|
||||
// conn.exec(``);
|
||||
}
|
||||
|
||||
func (conn *Conn) InsertWebStatsPerformance(sessionID uint64, p *PerformanceTrackAggr) error {
|
||||
timestamp := (p.TimestampEnd + p.TimestampStart) / 2
|
||||
|
||||
|
|
|
|||
|
|
@ -2,18 +2,12 @@ package postgres
|
|||
|
||||
import (
|
||||
"log"
|
||||
"math"
|
||||
|
||||
"openreplay/backend/pkg/hashid"
|
||||
"openreplay/backend/pkg/db/types"
|
||||
. "openreplay/backend/pkg/messages"
|
||||
"openreplay/backend/pkg/url"
|
||||
)
|
||||
|
||||
// TODO: change messages and replace everywhere to e.Index
|
||||
func getSqIdx(messageID uint64) uint {
|
||||
return uint(messageID % math.MaxInt32)
|
||||
}
|
||||
|
||||
func (conn *Conn) InsertWebCustomEvent(sessionID uint64, projectID uint32, e *CustomEvent) error {
|
||||
err := conn.InsertCustomEvent(sessionID, e.Timestamp,
|
||||
e.MessageID,
|
||||
|
|
@ -93,7 +87,7 @@ func (conn *Conn) InsertWebInputEvent(sessionID uint64, projectID uint32, e *Inp
|
|||
return nil
|
||||
}
|
||||
|
||||
func (conn *Conn) InsertWebErrorEvent(sessionID uint64, projectID uint32, e *ErrorEvent) (err error) {
|
||||
func (conn *Conn) InsertWebErrorEvent(sessionID uint64, projectID uint32, e *types.ErrorEvent) (err error) {
|
||||
tx, err := conn.c.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
@ -105,7 +99,7 @@ func (conn *Conn) InsertWebErrorEvent(sessionID uint64, projectID uint32, e *Err
|
|||
}
|
||||
}
|
||||
}()
|
||||
errorID := hashid.WebErrorID(projectID, e)
|
||||
errorID := e.ID(projectID)
|
||||
|
||||
if err = tx.exec(`
|
||||
INSERT INTO errors
|
||||
|
|
@ -135,6 +129,18 @@ func (conn *Conn) InsertWebErrorEvent(sessionID uint64, projectID uint32, e *Err
|
|||
return err
|
||||
}
|
||||
err = tx.commit()
|
||||
|
||||
// Insert tags
|
||||
sqlRequest := `
|
||||
INSERT INTO public.errors_tags (
|
||||
session_id, message_id, error_id, key, value
|
||||
) VALUES (
|
||||
$1, $2, $3, $4, $5
|
||||
) ON CONFLICT DO NOTHING`
|
||||
for key, value := range e.Tags {
|
||||
conn.batchQueue(sessionID, sqlRequest, sessionID, e.MessageID, errorID, key, value)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -163,7 +169,7 @@ func (conn *Conn) InsertWebFetchEvent(sessionID uint64, projectID uint32, savePa
|
|||
$12, $13
|
||||
) ON CONFLICT DO NOTHING`
|
||||
conn.batchQueue(sessionID, sqlRequest,
|
||||
sessionID, e.Timestamp, getSqIdx(e.MessageID),
|
||||
sessionID, e.Timestamp, e.MessageID,
|
||||
e.URL, host, path, query,
|
||||
request, response, e.Status, url.EnsureMethod(e.Method),
|
||||
e.Duration, e.Status < 400,
|
||||
|
|
|
|||
89
backend/pkg/db/types/error-event.go
Normal file
89
backend/pkg/db/types/error-event.go
Normal file
|
|
@ -0,0 +1,89 @@
|
|||
package types
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"hash/fnv"
|
||||
"log"
|
||||
"strconv"
|
||||
|
||||
. "openreplay/backend/pkg/messages"
|
||||
)
|
||||
|
||||
type ErrorEvent struct {
|
||||
MessageID uint64
|
||||
Timestamp uint64
|
||||
Source string
|
||||
Name string
|
||||
Message string
|
||||
Payload string
|
||||
Tags map[string]*string
|
||||
}
|
||||
|
||||
func unquote(s string) string {
|
||||
if s[0] == '"' {
|
||||
return s[1 : len(s)-1]
|
||||
}
|
||||
return s
|
||||
}
|
||||
func parseTags(tagsJSON string) (tags map[string]*string, err error) {
|
||||
if tagsJSON[0] == '[' {
|
||||
var tagsArr []json.RawMessage
|
||||
if err = json.Unmarshal([]byte(tagsJSON), &tagsArr); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
tags = make(map[string]*string)
|
||||
for _, keyBts := range tagsArr {
|
||||
tags[unquote(string(keyBts))] = nil
|
||||
}
|
||||
} else if tagsJSON[0] == '{' {
|
||||
var tagsObj map[string]json.RawMessage
|
||||
if err = json.Unmarshal([]byte(tagsJSON), &tagsObj); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
tags = make(map[string]*string)
|
||||
for key, valBts := range tagsObj {
|
||||
val := unquote(string(valBts))
|
||||
tags[key] = &val
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func WrapJSException(m *JSException) *ErrorEvent {
|
||||
meta, err := parseTags(m.Metadata)
|
||||
if err != nil {
|
||||
log.Printf("Error on parsing Exception metadata: %v", err)
|
||||
}
|
||||
return &ErrorEvent{
|
||||
MessageID: m.Meta().Index,
|
||||
Timestamp: uint64(m.Meta().Timestamp),
|
||||
Source: "js_exception",
|
||||
Name: m.Name,
|
||||
Message: m.Message,
|
||||
Payload: m.Payload,
|
||||
Tags: meta,
|
||||
}
|
||||
}
|
||||
|
||||
func WrapIntegrationEvent(m *IntegrationEvent) *ErrorEvent {
|
||||
return &ErrorEvent{
|
||||
MessageID: m.Meta().Index, // This will be always 0 here since it's coming from backend TODO: find another way to index
|
||||
Timestamp: m.Timestamp,
|
||||
Source: m.Source,
|
||||
Name: m.Name,
|
||||
Message: m.Message,
|
||||
Payload: m.Payload,
|
||||
}
|
||||
}
|
||||
|
||||
func (e *ErrorEvent) ID(projectID uint32) string {
|
||||
hash := fnv.New128a()
|
||||
hash.Write([]byte(e.Source))
|
||||
hash.Write([]byte(e.Name))
|
||||
hash.Write([]byte(e.Message))
|
||||
hash.Write([]byte(e.Payload))
|
||||
return strconv.FormatUint(uint64(projectID), 16) + hex.EncodeToString(hash.Sum(nil))
|
||||
}
|
||||
|
|
@ -56,15 +56,6 @@ func (b *EventMapper) Handle(message Message, messageID uint64, timestamp uint64
|
|||
Selector: msg.Selector,
|
||||
}
|
||||
}
|
||||
case *JSException:
|
||||
return &ErrorEvent{
|
||||
MessageID: messageID,
|
||||
Timestamp: timestamp,
|
||||
Source: "js_exception",
|
||||
Name: msg.Name,
|
||||
Message: msg.Message,
|
||||
Payload: msg.Payload,
|
||||
}
|
||||
case *ResourceTiming:
|
||||
return &ResourceEvent{
|
||||
MessageID: messageID,
|
||||
|
|
|
|||
|
|
@ -23,12 +23,3 @@ func IOSCrashID(projectID uint32, crash *messages.IOSCrash) string {
|
|||
hash.Write([]byte(crash.Stacktrace))
|
||||
return strconv.FormatUint(uint64(projectID), 16) + hex.EncodeToString(hash.Sum(nil))
|
||||
}
|
||||
|
||||
func WebErrorID(projectID uint32, errorEvent *messages.ErrorEvent) string {
|
||||
hash := fnv.New128a()
|
||||
hash.Write([]byte(errorEvent.Source))
|
||||
hash.Write([]byte(errorEvent.Name))
|
||||
hash.Write([]byte(errorEvent.Message))
|
||||
hash.Write([]byte(errorEvent.Payload))
|
||||
return strconv.FormatUint(uint64(projectID), 16) + hex.EncodeToString(hash.Sum(nil))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
package messages
|
||||
|
||||
func IsReplayerType(id int) bool {
|
||||
return 80 != id && 81 != id && 82 != id && 1 != id && 3 != id && 17 != id && 23 != id && 24 != id && 25 != id && 26 != id && 27 != id && 28 != id && 29 != id && 30 != id && 31 != id && 32 != id && 33 != id && 34 != id && 35 != id && 36 != id && 42 != id && 43 != id && 50 != id && 51 != id && 52 != id && 53 != id && 56 != id && 62 != id && 63 != id && 64 != id && 66 != id && 78 != id && 127 != id && 107 != id && 91 != id && 92 != id && 94 != id && 95 != id && 97 != id && 98 != id && 99 != id && 101 != id && 104 != id && 110 != id && 111 != id
|
||||
return 80 != id && 81 != id && 82 != id && 1 != id && 3 != id && 17 != id && 23 != id && 24 != id && 25 != id && 26 != id && 27 != id && 28 != id && 29 != id && 30 != id && 31 != id && 32 != id && 33 != id && 35 != id && 36 != id && 42 != id && 43 != id && 50 != id && 51 != id && 52 != id && 53 != id && 56 != id && 62 != id && 63 != id && 64 != id && 66 != id && 78 != id && 127 != id && 107 != id && 91 != id && 92 != id && 94 != id && 95 != id && 97 != id && 98 != id && 99 != id && 101 != id && 104 != id && 110 != id && 111 != id
|
||||
}
|
||||
|
||||
func IsIOSType(id int) bool {
|
||||
|
|
@ -11,4 +11,4 @@ func IsIOSType(id int) bool {
|
|||
|
||||
func IsDOMType(id int) bool {
|
||||
return 0 == id || 4 == id || 5 == id || 6 == id || 7 == id || 8 == id || 9 == id || 10 == id || 11 == id || 12 == id || 13 == id || 14 == id || 15 == id || 16 == id || 18 == id || 19 == id || 20 == id || 37 == id || 38 == id || 49 == id || 54 == id || 55 == id || 59 == id || 60 == id || 61 == id || 67 == id || 69 == id || 70 == id || 71 == id || 72 == id || 73 == id || 74 == id || 75 == id || 76 == id || 77 == id || 90 == id || 93 == id || 96 == id || 100 == id || 102 == id || 103 == id || 105 == id
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -104,7 +104,6 @@ func (i *messageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) {
|
|||
}
|
||||
return
|
||||
}
|
||||
msg = transformDeprecated(msg)
|
||||
}
|
||||
|
||||
// Preprocess "system" messages
|
||||
|
|
|
|||
|
|
@ -1,6 +1,14 @@
|
|||
package messages
|
||||
|
||||
func transformDeprecated(msg Message) Message {
|
||||
// transform legacy message here if needed
|
||||
switch m := msg.(type) {
|
||||
case *JSExceptionDeprecated:
|
||||
return &JSException{
|
||||
Name: m.Name,
|
||||
Message: m.Message,
|
||||
Payload: m.Payload,
|
||||
Metadata: "{}",
|
||||
}
|
||||
}
|
||||
return msg
|
||||
}
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
|
|
@ -56,6 +56,7 @@ func (m *RawMessage) Decode() Message {
|
|||
log.Printf("decode err: %s", err)
|
||||
return nil
|
||||
}
|
||||
msg = transformDeprecated(msg)
|
||||
msg.Meta().SetMeta(m.meta)
|
||||
return msg
|
||||
}
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
|
|
@ -3,19 +3,19 @@ package datasaver
|
|||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"openreplay/backend/pkg/messages"
|
||||
. "openreplay/backend/pkg/messages"
|
||||
)
|
||||
|
||||
func (mi *Saver) InsertMessage(msg messages.Message) error {
|
||||
func (mi *Saver) InsertMessage(msg Message) error {
|
||||
sessionID := msg.SessionID()
|
||||
switch m := msg.(type) {
|
||||
// Common
|
||||
case *messages.Metadata:
|
||||
case *Metadata:
|
||||
if err := mi.pg.InsertMetadata(sessionID, m); err != nil {
|
||||
return fmt.Errorf("insert metadata err: %s", err)
|
||||
}
|
||||
return nil
|
||||
case *messages.IssueEvent:
|
||||
case *IssueEvent:
|
||||
session, err := mi.pg.GetSession(sessionID)
|
||||
if err != nil {
|
||||
log.Printf("can't get session info for CH: %s", err)
|
||||
|
|
@ -28,15 +28,15 @@ func (mi *Saver) InsertMessage(msg messages.Message) error {
|
|||
//TODO: message adapter (transformer) (at the level of pkg/message) for types: *IOSMetadata, *IOSIssueEvent and others
|
||||
|
||||
// Web
|
||||
case *messages.SessionStart:
|
||||
case *SessionStart:
|
||||
return mi.pg.HandleWebSessionStart(sessionID, m)
|
||||
case *messages.SessionEnd:
|
||||
case *SessionEnd:
|
||||
return mi.pg.HandleWebSessionEnd(sessionID, m)
|
||||
case *messages.UserID:
|
||||
case *UserID:
|
||||
return mi.pg.InsertWebUserID(sessionID, m)
|
||||
case *messages.UserAnonymousID:
|
||||
case *UserAnonymousID:
|
||||
return mi.pg.InsertWebUserAnonymousID(sessionID, m)
|
||||
case *messages.CustomEvent:
|
||||
case *CustomEvent:
|
||||
session, err := mi.pg.GetSession(sessionID)
|
||||
if err != nil {
|
||||
log.Printf("can't get session info for CH: %s", err)
|
||||
|
|
@ -46,17 +46,19 @@ func (mi *Saver) InsertMessage(msg messages.Message) error {
|
|||
}
|
||||
}
|
||||
return mi.pg.InsertWebCustomEvent(sessionID, m)
|
||||
case *messages.ClickEvent:
|
||||
case *ClickEvent:
|
||||
return mi.pg.InsertWebClickEvent(sessionID, m)
|
||||
case *messages.InputEvent:
|
||||
case *InputEvent:
|
||||
return mi.pg.InsertWebInputEvent(sessionID, m)
|
||||
|
||||
// Unique Web messages
|
||||
case *messages.PageEvent:
|
||||
case *PageEvent:
|
||||
return mi.pg.InsertWebPageEvent(sessionID, m)
|
||||
case *messages.ErrorEvent:
|
||||
return mi.pg.InsertWebErrorEvent(sessionID, m)
|
||||
case *messages.FetchEvent:
|
||||
case *JSException:
|
||||
return mi.pg.InsertWebJSException(m)
|
||||
case *IntegrationEvent:
|
||||
return mi.pg.InsertWebIntegrationEvent(m)
|
||||
case *FetchEvent:
|
||||
session, err := mi.pg.GetSession(sessionID)
|
||||
if err != nil {
|
||||
log.Printf("can't get session info for CH: %s", err)
|
||||
|
|
@ -71,7 +73,7 @@ func (mi *Saver) InsertMessage(msg messages.Message) error {
|
|||
}
|
||||
}
|
||||
return mi.pg.InsertWebFetchEvent(sessionID, m)
|
||||
case *messages.GraphQLEvent:
|
||||
case *GraphQLEvent:
|
||||
session, err := mi.pg.GetSession(sessionID)
|
||||
if err != nil {
|
||||
log.Printf("can't get session info for CH: %s", err)
|
||||
|
|
@ -81,39 +83,30 @@ func (mi *Saver) InsertMessage(msg messages.Message) error {
|
|||
}
|
||||
}
|
||||
return mi.pg.InsertWebGraphQLEvent(sessionID, m)
|
||||
case *messages.IntegrationEvent:
|
||||
return mi.pg.InsertWebErrorEvent(sessionID, &messages.ErrorEvent{
|
||||
MessageID: m.Meta().Index,
|
||||
Timestamp: m.Timestamp,
|
||||
Source: m.Source,
|
||||
Name: m.Name,
|
||||
Message: m.Message,
|
||||
Payload: m.Payload,
|
||||
})
|
||||
case *messages.SetPageLocation:
|
||||
case *SetPageLocation:
|
||||
return mi.pg.InsertSessionReferrer(sessionID, m.Referrer)
|
||||
|
||||
// IOS
|
||||
case *messages.IOSSessionStart:
|
||||
case *IOSSessionStart:
|
||||
return mi.pg.InsertIOSSessionStart(sessionID, m)
|
||||
case *messages.IOSSessionEnd:
|
||||
case *IOSSessionEnd:
|
||||
return mi.pg.InsertIOSSessionEnd(sessionID, m)
|
||||
case *messages.IOSUserID:
|
||||
case *IOSUserID:
|
||||
return mi.pg.InsertIOSUserID(sessionID, m)
|
||||
case *messages.IOSUserAnonymousID:
|
||||
case *IOSUserAnonymousID:
|
||||
return mi.pg.InsertIOSUserAnonymousID(sessionID, m)
|
||||
case *messages.IOSCustomEvent:
|
||||
case *IOSCustomEvent:
|
||||
return mi.pg.InsertIOSCustomEvent(sessionID, m)
|
||||
case *messages.IOSClickEvent:
|
||||
case *IOSClickEvent:
|
||||
return mi.pg.InsertIOSClickEvent(sessionID, m)
|
||||
case *messages.IOSInputEvent:
|
||||
case *IOSInputEvent:
|
||||
return mi.pg.InsertIOSInputEvent(sessionID, m)
|
||||
// Unique IOS messages
|
||||
case *messages.IOSNetworkCall:
|
||||
case *IOSNetworkCall:
|
||||
return mi.pg.InsertIOSNetworkCall(sessionID, m)
|
||||
case *messages.IOSScreenEnter:
|
||||
case *IOSScreenEnter:
|
||||
return mi.pg.InsertIOSScreenEnter(sessionID, m)
|
||||
case *messages.IOSCrash:
|
||||
case *IOSCrash:
|
||||
return mi.pg.InsertIOSCrash(sessionID, m)
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,8 +32,10 @@ func (si *Saver) InsertStats(session *types.Session, msg messages.Message) error
|
|||
return si.ch.InsertWebPageEvent(session, m)
|
||||
case *messages.ResourceEvent:
|
||||
return si.ch.InsertWebResourceEvent(session, m)
|
||||
case *messages.ErrorEvent:
|
||||
return si.ch.InsertWebErrorEvent(session, m)
|
||||
case *messages.JSException:
|
||||
return si.ch.InsertWebErrorEvent(session, types.WrapJSException(m))
|
||||
case *messages.IntegrationEvent:
|
||||
return si.ch.InsertWebErrorEvent(session, types.WrapIntegrationEvent(m))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
55
ee/backend/pkg/db/clickhouse/bulk.go
Normal file
55
ee/backend/pkg/db/clickhouse/bulk.go
Normal file
|
|
@ -0,0 +1,55 @@
|
|||
package clickhouse
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||
)
|
||||
|
||||
type Bulk interface {
|
||||
Append(args ...interface{}) error
|
||||
Send() error
|
||||
}
|
||||
|
||||
type bulkImpl struct {
|
||||
conn driver.Conn
|
||||
query string
|
||||
values [][]interface{}
|
||||
}
|
||||
|
||||
func NewBulk(conn driver.Conn, query string) (Bulk, error) {
|
||||
switch {
|
||||
case conn == nil:
|
||||
return nil, errors.New("clickhouse connection is empty")
|
||||
case query == "":
|
||||
return nil, errors.New("query is empty")
|
||||
}
|
||||
return &bulkImpl{
|
||||
conn: conn,
|
||||
query: query,
|
||||
values: make([][]interface{}, 0),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (b *bulkImpl) Append(args ...interface{}) error {
|
||||
b.values = append(b.values, args)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *bulkImpl) Send() error {
|
||||
batch, err := b.conn.PrepareBatch(context.Background(), b.query)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't create new batch: %s", err)
|
||||
}
|
||||
for _, set := range b.values {
|
||||
if err := batch.Append(set...); err != nil {
|
||||
log.Printf("can't append value set to batch, err: %s", err)
|
||||
log.Printf("failed query: %s", b.query)
|
||||
}
|
||||
}
|
||||
b.values = make([][]interface{}, 0)
|
||||
return batch.Send()
|
||||
}
|
||||
|
|
@ -1,7 +1,6 @@
|
|||
package clickhouse
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
|
|
@ -17,54 +16,6 @@ import (
|
|||
"openreplay/backend/pkg/license"
|
||||
)
|
||||
|
||||
type Bulk interface {
|
||||
Append(args ...interface{}) error
|
||||
Send() error
|
||||
}
|
||||
|
||||
type bulkImpl struct {
|
||||
conn driver.Conn
|
||||
query string
|
||||
values [][]interface{}
|
||||
}
|
||||
|
||||
func NewBulk(conn driver.Conn, query string) (Bulk, error) {
|
||||
switch {
|
||||
case conn == nil:
|
||||
return nil, errors.New("clickhouse connection is empty")
|
||||
case query == "":
|
||||
return nil, errors.New("query is empty")
|
||||
}
|
||||
return &bulkImpl{
|
||||
conn: conn,
|
||||
query: query,
|
||||
values: make([][]interface{}, 0),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (b *bulkImpl) Append(args ...interface{}) error {
|
||||
b.values = append(b.values, args)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *bulkImpl) Send() error {
|
||||
batch, err := b.conn.PrepareBatch(context.Background(), b.query)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't create new batch: %s", err)
|
||||
}
|
||||
for _, set := range b.values {
|
||||
if err := batch.Append(set...); err != nil {
|
||||
log.Printf("can't append value set to batch, err: %s", err)
|
||||
log.Printf("failed query: %s", b.query)
|
||||
}
|
||||
}
|
||||
b.values = make([][]interface{}, 0)
|
||||
return batch.Send()
|
||||
}
|
||||
|
||||
var CONTEXT_MAP = map[uint64]string{0: "unknown", 1: "self", 2: "same-origin-ancestor", 3: "same-origin-descendant", 4: "same-origin", 5: "cross-origin-ancestor", 6: "cross-origin-descendant", 7: "cross-origin-unreachable", 8: "multiple-contexts"}
|
||||
var CONTAINER_TYPE_MAP = map[uint64]string{0: "window", 1: "iframe", 2: "embed", 3: "object"}
|
||||
|
||||
type Connector interface {
|
||||
Prepare() error
|
||||
Commit() error
|
||||
|
|
@ -73,7 +24,7 @@ type Connector interface {
|
|||
InsertWebPageEvent(session *types.Session, msg *messages.PageEvent) error
|
||||
InsertWebClickEvent(session *types.Session, msg *messages.ClickEvent) error
|
||||
InsertWebInputEvent(session *types.Session, msg *messages.InputEvent) error
|
||||
InsertWebErrorEvent(session *types.Session, msg *messages.ErrorEvent) error
|
||||
InsertWebErrorEvent(session *types.Session, msg *types.ErrorEvent) error
|
||||
InsertWebPerformanceTrackAggr(session *types.Session, msg *messages.PerformanceTrackAggr) error
|
||||
InsertAutocomplete(session *types.Session, msgType, msgValue string) error
|
||||
InsertRequest(session *types.Session, msg *messages.FetchEvent, savePayload bool) error
|
||||
|
|
@ -131,7 +82,7 @@ var batches = map[string]string{
|
|||
"pages": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, url, request_start, response_start, response_end, dom_content_loaded_event_start, dom_content_loaded_event_end, load_event_start, load_event_end, first_paint, first_contentful_paint_time, speed_index, visually_complete, time_to_interactive, event_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
"clicks": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, label, hesitation_time, event_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
"inputs": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, label, event_type) VALUES (?, ?, ?, ?, ?, ?)",
|
||||
"errors": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, source, name, message, error_id, event_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
"errors": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, source, name, message, error_id, event_type, error_tags_keys, error_tags_values) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
"performance": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, url, min_fps, avg_fps, max_fps, min_cpu, avg_cpu, max_cpu, min_total_js_heap_size, avg_total_js_heap_size, max_total_js_heap_size, min_used_js_heap_size, avg_used_js_heap_size, max_used_js_heap_size, event_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
"requests": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, url, request_body, response_body, status, method, duration, success, event_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
"custom": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, name, payload, event_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
|
|
@ -325,7 +276,13 @@ func (c *connectorImpl) InsertWebInputEvent(session *types.Session, msg *message
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *connectorImpl) InsertWebErrorEvent(session *types.Session, msg *messages.ErrorEvent) error {
|
||||
func (c *connectorImpl) InsertWebErrorEvent(session *types.Session, msg *types.ErrorEvent) error {
|
||||
keys, values := make([]string, 0, len(msg.Tags)), make([]*string, 0, len(msg.Tags))
|
||||
for k, v := range msg.Tags {
|
||||
keys = append(keys, k)
|
||||
values = append(values, v)
|
||||
}
|
||||
|
||||
if err := c.batches["errors"].Append(
|
||||
session.SessionID,
|
||||
uint16(session.ProjectID),
|
||||
|
|
@ -334,8 +291,10 @@ func (c *connectorImpl) InsertWebErrorEvent(session *types.Session, msg *message
|
|||
msg.Source,
|
||||
nullableString(msg.Name),
|
||||
msg.Message,
|
||||
hashid.WebErrorID(session.ProjectID, msg),
|
||||
msg.ID(session.ProjectID),
|
||||
"ERROR",
|
||||
keys,
|
||||
values,
|
||||
); err != nil {
|
||||
c.checkError("errors", err)
|
||||
return fmt.Errorf("can't append to errors batch: %s", err)
|
||||
|
|
@ -448,36 +407,3 @@ func (c *connectorImpl) InsertGraphQL(session *types.Session, msg *messages.Grap
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func nullableUint16(v uint16) *uint16 {
|
||||
var p *uint16 = nil
|
||||
if v != 0 {
|
||||
p = &v
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
func nullableUint32(v uint32) *uint32 {
|
||||
var p *uint32 = nil
|
||||
if v != 0 {
|
||||
p = &v
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
func nullableString(v string) *string {
|
||||
var p *string = nil
|
||||
if v != "" {
|
||||
p = &v
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
func datetime(timestamp uint64) time.Time {
|
||||
t := time.Unix(int64(timestamp/1e3), 0)
|
||||
// Temporal solution for not correct timestamps in performance messages
|
||||
if t.Year() < 2022 || t.Year() > 2025 {
|
||||
return time.Now()
|
||||
}
|
||||
return t
|
||||
}
|
||||
|
|
|
|||
38
ee/backend/pkg/db/clickhouse/insert_type.go
Normal file
38
ee/backend/pkg/db/clickhouse/insert_type.go
Normal file
|
|
@ -0,0 +1,38 @@
|
|||
package clickhouse
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
func nullableUint16(v uint16) *uint16 {
|
||||
var p *uint16 = nil
|
||||
if v != 0 {
|
||||
p = &v
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
func nullableUint32(v uint32) *uint32 {
|
||||
var p *uint32 = nil
|
||||
if v != 0 {
|
||||
p = &v
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
func nullableString(v string) *string {
|
||||
var p *string = nil
|
||||
if v != "" {
|
||||
p = &v
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
func datetime(timestamp uint64) time.Time {
|
||||
t := time.Unix(int64(timestamp/1e3), 0)
|
||||
// Temporal solution for not correct timestamps in performance messages
|
||||
if t.Year() < 2022 || t.Year() > 2025 {
|
||||
return time.Now()
|
||||
}
|
||||
return t
|
||||
}
|
||||
|
|
@ -341,18 +341,6 @@ class ClickEvent(Message):
|
|||
self.selector = selector
|
||||
|
||||
|
||||
class ErrorEvent(Message):
|
||||
__id__ = 34
|
||||
|
||||
def __init__(self, message_id, timestamp, source, name, message, payload):
|
||||
self.message_id = message_id
|
||||
self.timestamp = timestamp
|
||||
self.source = source
|
||||
self.name = name
|
||||
self.message = message
|
||||
self.payload = payload
|
||||
|
||||
|
||||
class ResourceEvent(Message):
|
||||
__id__ = 35
|
||||
|
||||
|
|
|
|||
|
|
@ -341,16 +341,6 @@ class MessageCodec(Codec):
|
|||
selector=self.read_string(reader)
|
||||
)
|
||||
|
||||
if message_id == 34:
|
||||
return ErrorEvent(
|
||||
message_id=self.read_uint(reader),
|
||||
timestamp=self.read_uint(reader),
|
||||
source=self.read_string(reader),
|
||||
name=self.read_string(reader),
|
||||
message=self.read_string(reader),
|
||||
payload=self.read_string(reader)
|
||||
)
|
||||
|
||||
if message_id == 35:
|
||||
return ResourceEvent(
|
||||
message_id=self.read_uint(reader),
|
||||
|
|
|
|||
|
|
@ -207,14 +207,14 @@ message 33, 'ClickEvent', :tracker => false, :replayer => false do
|
|||
string 'Label'
|
||||
string 'Selector'
|
||||
end
|
||||
message 34, 'ErrorEvent', :tracker => false, :replayer => false do
|
||||
uint 'MessageID'
|
||||
uint 'Timestamp'
|
||||
string 'Source'
|
||||
string 'Name'
|
||||
string 'Message'
|
||||
string 'Payload'
|
||||
end
|
||||
# message 34, 'ErrorEvent', :tracker => false, :replayer => false do
|
||||
# uint 'MessageID'
|
||||
# uint 'Timestamp'
|
||||
# string 'Source'
|
||||
# string 'Name'
|
||||
# string 'Message'
|
||||
# string 'Payload'
|
||||
# end
|
||||
message 35, 'ResourceEvent', :tracker => false, :replayer => false do
|
||||
uint 'MessageID'
|
||||
uint 'Timestamp'
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue