diff --git a/ee/backend/internal/db/datasaver/messages.go b/ee/backend/internal/db/datasaver/messages.go index ac71a3e91..87ca8d531 100644 --- a/ee/backend/internal/db/datasaver/messages.go +++ b/ee/backend/internal/db/datasaver/messages.go @@ -16,6 +16,14 @@ func (mi *Saver) InsertMessage(msg messages.Message) error { } return nil case *messages.IssueEvent: + session, err := mi.pg.GetSession(sessionID) + if err != nil { + log.Printf("can't get session info for CH: %s", err) + } else { + if err := mi.ch.InsertIssue(session, m); err != nil { + log.Printf("can't insert issue event into clickhouse: %s", err) + } + } return mi.pg.InsertIssueEvent(sessionID, m) //TODO: message adapter (transformer) (at the level of pkg/message) for types: *IOSMetadata, *IOSIssueEvent and others diff --git a/ee/backend/pkg/db/clickhouse/connector.go b/ee/backend/pkg/db/clickhouse/connector.go index e15ca13b2..709fab475 100644 --- a/ee/backend/pkg/db/clickhouse/connector.go +++ b/ee/backend/pkg/db/clickhouse/connector.go @@ -2,16 +2,17 @@ package clickhouse import ( "context" + "encoding/json" "errors" "fmt" "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "log" - "math" "openreplay/backend/pkg/db/types" "openreplay/backend/pkg/hashid" "openreplay/backend/pkg/messages" "openreplay/backend/pkg/url" + "strconv" "strings" "time" @@ -80,6 +81,7 @@ type Connector interface { InsertRequest(session *types.Session, msg *messages.FetchEvent, savePayload bool) error InsertCustom(session *types.Session, msg *messages.CustomEvent) error InsertGraphQL(session *types.Session, msg *messages.GraphQLEvent) error + InsertIssue(session *types.Session, msg *messages.IssueEvent) error } type connectorImpl struct { @@ -136,6 +138,8 @@ var batches = map[string]string{ "requests": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, url, request_body, response_body, status, method, duration, success, event_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", "custom": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, name, payload, event_type) VALUES (?, ?, ?, ?, ?, ?, ?)", "graphql": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, name, request_body, response_body, event_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + "issuesEvents": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, issue_id, issue_type, event_type) VALUES (?, ?, ?, ?, ?, ?, ?)", + "issues": "INSERT INTO experimental.issues (project_id, issue_id, type, context_string, context_keys, context_values) VALUES (?, ?, ?, ?, ?, ?)", } func (c *connectorImpl) Prepare() error { @@ -162,6 +166,68 @@ func (c *connectorImpl) checkError(name string, err error) { } } +func (c *connectorImpl) InsertIssue(session *types.Session, msg *messages.IssueEvent) error { + issueID := hashid.IssueID(session.ProjectID, msg) + if err := c.batches["issuesEvents"].Append( + session.SessionID, + uint16(session.ProjectID), + msg.MessageID, + datetime(msg.Timestamp), + issueID, + msg.Type, + "ISSUE", + ); err != nil { + c.checkError("issuesEvents", err) + return fmt.Errorf("can't append to issuesEvents batch: %s", err) + } + keys, values := contextParser(msg.Context) + if err := c.batches["issues"].Append( + uint16(session.ProjectID), + issueID, + msg.Type, + msg.ContextString, + keys, + values, + ); err != nil { + c.checkError("issues", err) + return fmt.Errorf("can't append to issues batch: %s", err) + } + return nil +} + +func contextParser(context string) ([]string, []*string) { + if context == "" || strings.TrimSpace(context) == "" { + return []string{}, []*string{} + } + contextMap := make(map[string]interface{}) + if err := json.Unmarshal([]byte(context), &contextMap); err != nil { + log.Printf("can't parse context, err: %s", err) + return []string{}, []*string{} + } + keys, values := make([]string, 0, len(contextMap)), make([]*string, 0, len(contextMap)) + for k, v := range contextMap { + keys = append(keys, k) + var value string + switch val := v.(type) { + case nil: + value = "" + case int: + value = strconv.Itoa(val) + case string: + value = val + default: + raw, err := json.Marshal(v) + if err != nil { + log.Println("can't marshal context value:", err) + continue + } + value = string(raw) + } + values = append(values, nullableString(value)) + } + return keys, values +} + func (c *connectorImpl) InsertWebSession(session *types.Session) error { if session.Duration == nil { return errors.New("trying to insert session with nil duration") @@ -453,7 +519,3 @@ func datetime(timestamp uint64) time.Time { } return t } - -func getSqIdx(messageID uint64) uint { - return uint(messageID % math.MaxInt32) -} diff --git a/ee/backend/pkg/failover/failover.go b/ee/backend/pkg/failover/failover.go index 5d3ef534a..1a771588a 100644 --- a/ee/backend/pkg/failover/failover.go +++ b/ee/backend/pkg/failover/failover.go @@ -8,7 +8,6 @@ import ( "openreplay/backend/pkg/messages" "openreplay/backend/pkg/queue" "openreplay/backend/pkg/queue/types" - "strconv" ) const numberOfPartitions = 16 @@ -90,7 +89,7 @@ func (s *sessionFinderImpl) worker() { } func (s *sessionFinderImpl) findSession(sessionID, timestamp, partition uint64) { - err := s.storage.UploadKey(strconv.FormatUint(sessionID, 10), 5) + err := s.storage.UploadSessionFiles(sessionID) if err == nil { log.Printf("found session: %d in partition: %d, original: %d", sessionID, partition, sessionID%numberOfPartitions)