Issues table for ClickHouse (#764)

* feat(backend): added batch for issues table to clickhouse connector
This commit is contained in:
Alexander 2022-10-11 10:25:45 +02:00 committed by GitHub
parent 8909f3fd2f
commit bdcc98492d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 76 additions and 7 deletions

View file

@ -16,6 +16,14 @@ func (mi *Saver) InsertMessage(msg messages.Message) error {
}
return nil
case *messages.IssueEvent:
session, err := mi.pg.GetSession(sessionID)
if err != nil {
log.Printf("can't get session info for CH: %s", err)
} else {
if err := mi.ch.InsertIssue(session, m); err != nil {
log.Printf("can't insert issue event into clickhouse: %s", err)
}
}
return mi.pg.InsertIssueEvent(sessionID, m)
//TODO: message adapter (transformer) (at the level of pkg/message) for types: *IOSMetadata, *IOSIssueEvent and others

View file

@ -2,16 +2,17 @@ package clickhouse
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"log"
"math"
"openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/hashid"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/url"
"strconv"
"strings"
"time"
@ -80,6 +81,7 @@ type Connector interface {
InsertRequest(session *types.Session, msg *messages.FetchEvent, savePayload bool) error
InsertCustom(session *types.Session, msg *messages.CustomEvent) error
InsertGraphQL(session *types.Session, msg *messages.GraphQLEvent) error
InsertIssue(session *types.Session, msg *messages.IssueEvent) error
}
type connectorImpl struct {
@ -136,6 +138,8 @@ var batches = map[string]string{
"requests": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, url, request_body, response_body, status, method, duration, success, event_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"custom": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, name, payload, event_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
"graphql": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, name, request_body, response_body, event_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
"issuesEvents": "INSERT INTO experimental.events (session_id, project_id, message_id, datetime, issue_id, issue_type, event_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
"issues": "INSERT INTO experimental.issues (project_id, issue_id, type, context_string, context_keys, context_values) VALUES (?, ?, ?, ?, ?, ?)",
}
func (c *connectorImpl) Prepare() error {
@ -162,6 +166,68 @@ func (c *connectorImpl) checkError(name string, err error) {
}
}
func (c *connectorImpl) InsertIssue(session *types.Session, msg *messages.IssueEvent) error {
issueID := hashid.IssueID(session.ProjectID, msg)
if err := c.batches["issuesEvents"].Append(
session.SessionID,
uint16(session.ProjectID),
msg.MessageID,
datetime(msg.Timestamp),
issueID,
msg.Type,
"ISSUE",
); err != nil {
c.checkError("issuesEvents", err)
return fmt.Errorf("can't append to issuesEvents batch: %s", err)
}
keys, values := contextParser(msg.Context)
if err := c.batches["issues"].Append(
uint16(session.ProjectID),
issueID,
msg.Type,
msg.ContextString,
keys,
values,
); err != nil {
c.checkError("issues", err)
return fmt.Errorf("can't append to issues batch: %s", err)
}
return nil
}
func contextParser(context string) ([]string, []*string) {
if context == "" || strings.TrimSpace(context) == "" {
return []string{}, []*string{}
}
contextMap := make(map[string]interface{})
if err := json.Unmarshal([]byte(context), &contextMap); err != nil {
log.Printf("can't parse context, err: %s", err)
return []string{}, []*string{}
}
keys, values := make([]string, 0, len(contextMap)), make([]*string, 0, len(contextMap))
for k, v := range contextMap {
keys = append(keys, k)
var value string
switch val := v.(type) {
case nil:
value = ""
case int:
value = strconv.Itoa(val)
case string:
value = val
default:
raw, err := json.Marshal(v)
if err != nil {
log.Println("can't marshal context value:", err)
continue
}
value = string(raw)
}
values = append(values, nullableString(value))
}
return keys, values
}
func (c *connectorImpl) InsertWebSession(session *types.Session) error {
if session.Duration == nil {
return errors.New("trying to insert session with nil duration")
@ -453,7 +519,3 @@ func datetime(timestamp uint64) time.Time {
}
return t
}
func getSqIdx(messageID uint64) uint {
return uint(messageID % math.MaxInt32)
}

View file

@ -8,7 +8,6 @@ import (
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
"strconv"
)
const numberOfPartitions = 16
@ -90,7 +89,7 @@ func (s *sessionFinderImpl) worker() {
}
func (s *sessionFinderImpl) findSession(sessionID, timestamp, partition uint64) {
err := s.storage.UploadKey(strconv.FormatUint(sessionID, 10), 5)
err := s.storage.UploadSessionFiles(sessionID)
if err == nil {
log.Printf("found session: %d in partition: %d, original: %d",
sessionID, partition, sessionID%numberOfPartitions)