feat(backend): implemented iterator for new message protocol (with message size)

This commit is contained in:
Alexander Zavorotynskiy 2022-08-05 19:49:18 +02:00
parent d06bd8769f
commit f3b431d64b
24 changed files with 2678 additions and 307 deletions

1
backend/cmd/assets/file Normal file
View file

@ -0,0 +1 @@
GROUP_CACHE=from_file

View file

@ -3,7 +3,7 @@ package main
import (
"context"
"log"
"openreplay/backend/pkg/monitoring"
"openreplay/backend/pkg/queue/types"
"os"
"os/signal"
"syscall"
@ -13,8 +13,8 @@ import (
"openreplay/backend/internal/assets/cacher"
config "openreplay/backend/internal/config/assets"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/monitoring"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
)
func main() {
@ -34,22 +34,25 @@ func main() {
consumer := queue.NewMessageConsumer(
cfg.GroupCache,
[]string{cfg.TopicCache},
func(sessionID uint64, message messages.Message, e *types.Meta) {
switch msg := message.(type) {
case *messages.AssetCache:
cacher.CacheURL(sessionID, msg.URL)
totalAssets.Add(context.Background(), 1)
case *messages.ErrorEvent:
if msg.Source != "js_exception" {
return
}
sourceList, err := assets.ExtractJSExceptionSources(&msg.Payload)
if err != nil {
log.Printf("Error on source extraction: %v", err)
return
}
for _, source := range sourceList {
cacher.CacheJSFile(source)
func(sessionID uint64, iter messages.Iterator, meta *types.Meta) {
for iter.Next() {
if iter.Type() == messages.MsgAssetCache {
msg := iter.Message().Decode().(*messages.AssetCache)
cacher.CacheURL(sessionID, msg.URL)
totalAssets.Add(context.Background(), 1)
} else if iter.Type() == messages.MsgErrorEvent {
msg := iter.Message().Decode().(*messages.ErrorEvent)
if msg.Source != "js_exception" {
continue
}
sourceList, err := assets.ExtractJSExceptionSources(&msg.Payload)
if err != nil {
log.Printf("Error on source extraction: %v", err)
continue
}
for _, source := range sourceList {
cacher.CacheJSFile(source)
}
}
}
},

View file

@ -3,24 +3,23 @@ package main
import (
"errors"
"log"
"openreplay/backend/internal/config/db"
"openreplay/backend/internal/db/datasaver"
"openreplay/backend/pkg/handlers"
custom2 "openreplay/backend/pkg/handlers/custom"
"openreplay/backend/pkg/monitoring"
"openreplay/backend/pkg/sessions"
"time"
"openreplay/backend/pkg/queue/types"
"os"
"os/signal"
"syscall"
"time"
"openreplay/backend/internal/config/db"
"openreplay/backend/internal/db/datasaver"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/handlers"
custom2 "openreplay/backend/pkg/handlers/custom"
logger "openreplay/backend/pkg/log"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/monitoring"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/sessions"
)
func main() {
@ -51,49 +50,59 @@ func main() {
saver.InitStats()
statsLogger := logger.NewQueueStats(cfg.LoggerTimeout)
keepMessage := func(tp int) bool {
return tp == messages.MsgMetadata || tp == messages.MsgIssueEvent || tp == messages.MsgSessionStart || tp == messages.MsgSessionEnd || tp == messages.MsgUserID || tp == messages.MsgUserAnonymousID || tp == messages.MsgCustomEvent || tp == messages.MsgClickEvent || tp == messages.MsgInputEvent || tp == messages.MsgPageEvent || tp == messages.MsgErrorEvent || tp == messages.MsgFetchEvent || tp == messages.MsgGraphQLEvent || tp == messages.MsgIntegrationEvent || tp == messages.MsgPerformanceTrackAggr || tp == messages.MsgResourceEvent || tp == messages.MsgLongTask || tp == messages.MsgJSException || tp == messages.MsgResourceTiming || tp == messages.MsgRawCustomEvent || tp == messages.MsgCustomIssue || tp == messages.MsgFetch || tp == messages.MsgGraphQL || tp == messages.MsgStateAction || tp == messages.MsgSetInputTarget || tp == messages.MsgSetInputValue || tp == messages.MsgCreateDocument || tp == messages.MsgMouseClick || tp == messages.MsgSetPageLocation || tp == messages.MsgPageLoadTiming || tp == messages.MsgPageRenderTiming
}
// Handler logic
handler := func(sessionID uint64, msg messages.Message, meta *types.Meta) {
handler := func(sessionID uint64, iter messages.Iterator, meta *types.Meta) {
statsLogger.Collect(sessionID, meta)
// Just save session data into db without additional checks
if err := saver.InsertMessage(sessionID, msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg)
for iter.Next() {
if !keepMessage(iter.Type()) {
continue
}
return
}
msg := iter.Message().Decode()
session, err := pg.GetSession(sessionID)
if session == nil {
if err != nil && !errors.Is(err, cache.NilSessionInCacheError) {
log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, sessionID, msg)
}
return
}
// Save statistics to db
err = saver.InsertStats(session, msg)
if err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg)
}
// Handle heuristics and save to temporary queue in memory
builderMap.HandleMessage(sessionID, msg, msg.Meta().Index)
// Process saved heuristics messages as usual messages above in the code
builderMap.IterateSessionReadyMessages(sessionID, func(msg messages.Message) {
// TODO: DRY code (carefully with the return statement logic)
// Just save session data into db without additional checks
if err := saver.InsertMessage(sessionID, msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg)
log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg)
}
return
}
if err := saver.InsertStats(session, msg); err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg)
session, err := pg.GetSession(sessionID)
if session == nil {
if err != nil && !errors.Is(err, cache.NilSessionInCacheError) {
log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, sessionID, msg)
}
return
}
})
// Save statistics to db
err = saver.InsertStats(session, msg)
if err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg)
}
// Handle heuristics and save to temporary queue in memory
builderMap.HandleMessage(sessionID, msg, msg.Meta().Index)
// Process saved heuristics messages as usual messages above in the code
builderMap.IterateSessionReadyMessages(sessionID, func(msg messages.Message) {
if err := saver.InsertMessage(sessionID, msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
return
}
if err := saver.InsertStats(session, msg); err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
})
}
}
// Init consumer

View file

@ -0,0 +1,92 @@
chalice:
env:
jwt_secret: SetARandomStringHere
clickhouse:
enabled: false
fromVersion: v1.6.0
global:
domainName: openreplay.local
email:
emailFrom: OpenReplay<do-not-reply@openreplay.com>
emailHost: ""
emailPassword: ""
emailPort: "587"
emailSslCert: ""
emailSslKey: ""
emailUseSsl: "false"
emailUseTls: "true"
emailUser: ""
enterpriseEditionLicense: ""
ingress:
controller:
config:
enable-real-ip: true
force-ssl-redirect: false
max-worker-connections: 0
proxy-body-size: 10m
ssl-redirect: false
extraArgs:
default-ssl-certificate: app/openreplay-ssl
ingressClass: openreplay
ingressClassResource:
name: openreplay
service:
externalTrafficPolicy: Local
kafka:
kafkaHost: kafka.db.svc.cluster.local
kafkaPort: "9092"
kafkaUseSsl: "false"
zookeeperHost: databases-zookeeper.svc.cluster.local
zookeeperNonTLSPort: 2181
postgresql:
postgresqlDatabase: postgres
postgresqlHost: postgresql.db.svc.cluster.local
postgresqlPassword: changeMePassword
postgresqlPort: "5432"
postgresqlUser: postgres
redis:
redisHost: redis-master.db.svc.cluster.local
redisPort: "6379"
s3:
accessKey: changeMeMinioAccessKey
assetsBucket: sessions-assets
endpoint: http://minio.db.svc.cluster.local:9000
recordingsBucket: mobs
region: us-east-1
secretKey: changeMeMinioPassword
sourcemapsBucket: sourcemaps
ingress-nginx:
controller:
config:
enable-real-ip: true
force-ssl-redirect: false
max-worker-connections: 0
proxy-body-size: 10m
ssl-redirect: false
extraArgs:
default-ssl-certificate: app/openreplay-ssl
ingressClass: openreplay
ingressClassResource:
name: openreplay
service:
externalTrafficPolicy: Local
kafka:
kafkaHost: kafka.db.svc.cluster.local
kafkaPort: "9092"
kafkaUseSsl: "false"
zookeeperHost: databases-zookeeper.svc.cluster.local
zookeeperNonTLSPort: 2181
minio:
global:
minio:
accessKey: changeMeMinioAccessKey
secretKey: changeMeMinioPassword
postgresql:
postgresqlDatabase: postgres
postgresqlHost: postgresql.db.svc.cluster.local
postgresqlPassword: changeMePassword
postgresqlPort: "5432"
postgresqlUser: postgres
redis:
redisHost: redis-master.db.svc.cluster.local
redisPort: "6379"

View file

@ -2,25 +2,23 @@ package main
import (
"log"
"openreplay/backend/pkg/queue/types"
"os"
"os/signal"
"syscall"
"time"
"openreplay/backend/internal/config/ender"
"openreplay/backend/internal/sessionender"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/monitoring"
"time"
"os"
"os/signal"
"syscall"
"openreplay/backend/pkg/intervals"
logger "openreplay/backend/pkg/log"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/monitoring"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
)
//
func main() {
metrics := monitoring.New("ender")
@ -45,18 +43,17 @@ func main() {
[]string{
cfg.TopicRawWeb,
},
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
switch msg.(type) {
case *messages.SessionStart, *messages.SessionEnd:
// Skip several message types
return
func(sessionID uint64, iter messages.Iterator, meta *types.Meta) {
for iter.Next() {
if iter.Type() == messages.MsgSessionStart || iter.Type() == messages.MsgSessionEnd {
continue
}
if iter.Message().Meta().Timestamp == 0 {
log.Printf("ZERO TS, sessID: %d, msgType: %d", sessionID, iter.Type())
}
statsLogger.Collect(sessionID, meta)
sessions.UpdateSession(sessionID, meta.Timestamp, iter.Message().Meta().Timestamp)
}
// Test debug
if msg.Meta().Timestamp == 0 {
log.Printf("ZERO TS, sessID: %d, msgType: %d", sessionID, msg.TypeID())
}
statsLogger.Collect(sessionID, meta)
sessions.UpdateSession(sessionID, meta.Timestamp, msg.Meta().Timestamp)
},
false,
cfg.MessageSizeLimit,

View file

@ -2,6 +2,12 @@ package main
import (
"log"
"openreplay/backend/pkg/queue/types"
"os"
"os/signal"
"syscall"
"time"
"openreplay/backend/internal/config/heuristics"
"openreplay/backend/pkg/handlers"
web2 "openreplay/backend/pkg/handlers/web"
@ -9,12 +15,7 @@ import (
logger "openreplay/backend/pkg/log"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/sessions"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
@ -55,9 +56,11 @@ func main() {
[]string{
cfg.TopicRawWeb,
},
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
statsLogger.Collect(sessionID, meta)
builderMap.HandleMessage(sessionID, msg, msg.Meta().Index)
func(sessionID uint64, iter messages.Iterator, meta *types.Meta) {
for iter.Next() {
statsLogger.Collect(sessionID, meta)
builderMap.HandleMessage(sessionID, iter.Message(), iter.Message().Meta().Index)
}
},
false,
cfg.MessageSizeLimit,

View file

@ -2,22 +2,20 @@ package main
import (
"context"
"encoding/binary"
"log"
"openreplay/backend/internal/sink/assetscache"
"openreplay/backend/internal/sink/oswriter"
"openreplay/backend/internal/storage"
"openreplay/backend/pkg/monitoring"
"time"
"openreplay/backend/pkg/queue/types"
"os"
"os/signal"
"syscall"
"time"
"openreplay/backend/internal/config/sink"
"openreplay/backend/internal/sink/assetscache"
"openreplay/backend/internal/sink/oswriter"
"openreplay/backend/internal/storage"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/monitoring"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/url/assets"
)
@ -58,51 +56,49 @@ func main() {
[]string{
cfg.TopicRawWeb,
},
func(sessionID uint64, message Message, _ *types.Meta) {
// Process assets
message = assetMessageHandler.ParseAssets(sessionID, message)
func(sessionID uint64, iter Iterator, meta *types.Meta) {
for iter.Next() {
// [METRICS] Increase the number of processed messages
totalMessages.Add(context.Background(), 1)
totalMessages.Add(context.Background(), 1)
// Filter message
typeID := message.TypeID()
// Send SessionEnd trigger to storage service
switch message.(type) {
case *SessionEnd:
if err := producer.Produce(cfg.TopicTrigger, sessionID, Encode(message)); err != nil {
log.Printf("can't send SessionEnd to trigger topic: %s; sessID: %d", err, sessionID)
// Send SessionEnd trigger to storage service
if iter.Type() == MsgSessionEnd {
if err := producer.Produce(cfg.TopicTrigger, sessionID, iter.Message().Encode()); err != nil {
log.Printf("can't send SessionEnd to trigger topic: %s; sessID: %d", err, sessionID)
}
continue
}
return
}
if !IsReplayerType(typeID) {
return
}
// If message timestamp is empty, use at least ts of session start
ts := message.Meta().Timestamp
if ts == 0 {
log.Printf("zero ts; sessID: %d, msg: %+v", sessionID, message)
} else {
// Log ts of last processed message
counter.Update(sessionID, time.UnixMilli(ts))
}
msg := iter.Message()
// Process assets
if iter.Type() == MsgSetNodeAttributeURLBased || iter.Type() == MsgSetCSSDataURLBased || iter.Type() == MsgCSSInsertRuleURLBased {
msg = assetMessageHandler.ParseAssets(sessionID, msg.Decode())
}
value := message.Encode()
var data []byte
if IsIOSType(typeID) {
data = value
} else {
data = make([]byte, len(value)+8)
copy(data[8:], value[:])
binary.LittleEndian.PutUint64(data[0:], message.Meta().Index)
}
if err := writer.Write(sessionID, data); err != nil {
log.Printf("Writer error: %v\n", err)
}
// Filter message
if !IsReplayerType(msg.TypeID()) {
return
}
messageSize.Record(context.Background(), float64(len(data)))
savedMessages.Add(context.Background(), 1)
// If message timestamp is empty, use at least ts of session start
ts := msg.Meta().Timestamp
if ts == 0 {
log.Printf("zero ts; sessID: %d, msgType: %d", sessionID, iter.Type())
} else {
// Log ts of last processed message
counter.Update(sessionID, time.UnixMilli(ts))
}
// Write encoded message with index to session file
data := msg.EncodeWithIndex()
if err := writer.Write(sessionID, data); err != nil {
log.Printf("Writer error: %v\n", err)
}
// [METRICS] Increase the number of written to the files messages and the message size
messageSize.Record(context.Background(), float64(len(data)))
savedMessages.Add(context.Background(), 1)
}
},
false,
cfg.MessageSizeLimit,

View file

@ -2,8 +2,7 @@ package main
import (
"log"
"openreplay/backend/pkg/failover"
"openreplay/backend/pkg/monitoring"
"openreplay/backend/pkg/queue/types"
"os"
"os/signal"
"strconv"
@ -12,9 +11,10 @@ import (
config "openreplay/backend/internal/config/storage"
"openreplay/backend/internal/storage"
"openreplay/backend/pkg/failover"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/monitoring"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
s3storage "openreplay/backend/pkg/storage"
)
@ -43,14 +43,16 @@ func main() {
[]string{
cfg.TopicTrigger,
},
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
switch m := msg.(type) {
case *messages.SessionEnd:
if err := srv.UploadKey(strconv.FormatUint(sessionID, 10), 5); err != nil {
sessionFinder.Find(sessionID, m.Timestamp)
func(sessionID uint64, iter messages.Iterator, meta *types.Meta) {
for iter.Next() {
if iter.Type() == messages.MsgSessionEnd {
msg := iter.Message().Decode().(*messages.SessionEnd)
if err := srv.UploadKey(strconv.FormatUint(sessionID, 10), 5); err != nil {
sessionFinder.Find(sessionID, msg.Timestamp)
}
// Log timestamp of last processed session
counter.Update(sessionID, time.UnixMilli(meta.Timestamp))
}
// Log timestamp of last processed session
counter.Update(sessionID, time.UnixMilli(meta.Timestamp))
}
},
true,

View file

@ -0,0 +1,55 @@
package player
import (
"bufio"
"fmt"
"log"
"os"
)
type request struct {
ts int64
method string
url string
headers map[string][]string
body []byte
}
type playerImpl struct {
//
}
func (p playerImpl) LoadRecord(filePath string) error {
if filePath == "" {
return fmt.Errorf("file name is empty")
}
file, err := os.OpenFile(filePath, os.O_RDONLY, 0644)
if err != nil {
return fmt.Errorf("open file err: %s", err)
}
defer file.Close()
sc := bufio.NewScanner(file)
for sc.Scan() {
line := sc.Text()
log.Println(line)
}
if err := sc.Err(); err != nil {
return fmt.Errorf("scan file error: %v", err)
}
return nil
}
func (p playerImpl) PlayRecord(host string) error {
//TODO implement me
panic("implement me")
}
type Player interface {
LoadRecord(filePath string) error
PlayRecord(host string) error
}
func New() Player {
return &playerImpl{}
}

View file

@ -0,0 +1,10 @@
package player
import "testing"
func TestPlayer(t *testing.T) {
player := New()
if err := player.LoadRecord("/Users/alexander/7048055123532800"); err != nil {
t.Logf("can't load session record: %s", err)
}
}

View file

@ -0,0 +1,94 @@
package recorder
import (
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"strconv"
"sync"
"time"
)
type recorderImpl struct {
sessions map[uint64]*sync.Mutex
sessionsDir string
}
func (r *recorderImpl) SaveRequest(sessionID uint64, req *http.Request, body []byte) error {
pwd, _ := os.Getwd()
log.Printf("new request, pwd: %s", pwd)
// Hold mutex for session
if _, ok := r.sessions[sessionID]; !ok {
r.sessions[sessionID] = &sync.Mutex{}
}
r.sessions[sessionID].Lock()
// Release mutex for session on exit
defer r.sessions[sessionID].Unlock()
// Open file
file, err := os.OpenFile(r.sessionsDir+strconv.FormatUint(sessionID, 10), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return err
}
// Close file on exit
defer file.Close()
log.Printf("file name: %s", strconv.FormatUint(sessionID, 10))
// Save request info
/*
Record format:
- timestamp
- method
- url
- headers
- body
*/
if _, err := file.Write([]byte("<request>\n")); err != nil {
log.Printf("can't write data to file: %s", err)
}
if _, err := file.Write([]byte(fmt.Sprintf("<ts>%d</ts>\n", time.Now().UnixMilli()))); err != nil {
log.Printf("can't write timestamp to file: %s", err)
}
if _, err := file.Write([]byte(fmt.Sprintf("<method>%s</method>\n", req.Method))); err != nil {
log.Printf("can't write method to file: %s", err)
}
if _, err := file.Write([]byte(fmt.Sprintf("<url>%s</url>\n", req.URL.Path))); err != nil {
log.Printf("can't write url to file: %s", err)
}
reqHeaders, err := json.Marshal(req.Header)
if err == nil {
if _, err := file.Write([]byte(fmt.Sprintf("<headers>%s</headers>\n", string(reqHeaders)))); err != nil {
log.Printf("can't write headers to file: %s", err)
}
} else {
log.Printf("can't marshal request headers: %s", err)
}
if _, err := file.Write([]byte(fmt.Sprintf("<body>%s</body>\n", string(body)))); err != nil {
log.Printf("can't write body to file: %s", err)
}
if _, err := file.Write([]byte("</request>\n")); err != nil {
log.Printf("can't write data to file: %s", err)
}
// Sync file changes
if err := file.Sync(); err != nil {
log.Printf("can't sync file: %s", err)
}
return nil
}
type Recorder interface {
SaveRequest(sessionID uint64, req *http.Request, body []byte) error
}
func New(dir string) Recorder {
if dir == "" {
dir = "./"
}
return &recorderImpl{
sessions: make(map[uint64]*sync.Mutex),
sessionsDir: dir,
}
}

View file

@ -71,6 +71,7 @@ func (w *Writer) Write(key uint64, data []byte) error {
if err != nil {
return err
}
// TODO: add check for the number of recorded bytes to file
_, err = file.Write(data)
return err
}

View file

@ -0,0 +1,32 @@
package clickhouse
import "fmt"
type batch struct {
size int
values [][]interface{}
oldValues [][]interface{}
}
func NewBatch(size int) *batch {
return &batch{
size: size,
values: make([][]interface{}, 0),
oldValues: make([][]interface{}, 0),
}
}
func (b *batch) Append(v ...interface{}) error {
if len(v) != b.size {
return fmt.Errorf("wrong values set size, got: %d, waited: %d", len(v), b.size)
}
b.values = append(b.values, v)
return nil
}
func (b *batch) Commit() error {
if len(b.oldValues) > 0 {
//
}
}

View file

@ -0,0 +1,401 @@
package clickhouse
import (
"context"
"errors"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"log"
"openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/hashid"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/url"
"strings"
"time"
"openreplay/backend/pkg/license"
)
var CONTEXT_MAP = map[uint64]string{0: "unknown", 1: "self", 2: "same-origin-ancestor", 3: "same-origin-descendant", 4: "same-origin", 5: "cross-origin-ancestor", 6: "cross-origin-descendant", 7: "cross-origin-unreachable", 8: "multiple-contexts"}
var CONTAINER_TYPE_MAP = map[uint64]string{0: "window", 1: "iframe", 2: "embed", 3: "object"}
type Connector interface {
Prepare() error
Commit() error
FinaliseSessionsTable() error
InsertWebSession(session *types.Session) error
InsertWebResourceEvent(session *types.Session, msg *messages.ResourceEvent) error
InsertWebPageEvent(session *types.Session, msg *messages.PageEvent) error
InsertWebClickEvent(session *types.Session, msg *messages.ClickEvent) error
InsertWebInputEvent(session *types.Session, msg *messages.InputEvent) error
InsertWebErrorEvent(session *types.Session, msg *messages.ErrorEvent) error
InsertWebPerformanceTrackAggr(session *types.Session, msg *messages.PerformanceTrackAggr) error
InsertLongtask(session *types.Session, msg *messages.LongTask) error
}
type connectorImpl struct {
conn driver.Conn
batches map[string]driver.Batch
}
func NewConnector(url string) Connector {
license.CheckLicense()
url = strings.TrimPrefix(url, "tcp://")
url = strings.TrimSuffix(url, "/default")
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{url},
Auth: clickhouse.Auth{
Database: "default",
},
MaxOpenConns: 20,
MaxIdleConns: 15,
ConnMaxLifetime: 3 * time.Minute,
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
},
// Debug: true,
})
if err != nil {
log.Fatal(err)
}
c := &connectorImpl{
conn: conn,
batches: make(map[string]driver.Batch, 9),
}
return c
}
func (c *connectorImpl) newBatch(name, query string) error {
batch, err := c.conn.PrepareBatch(context.Background(), query)
if err != nil {
return fmt.Errorf("can't create new batch: %s", err)
}
if _, ok := c.batches[name]; ok {
delete(c.batches, name)
}
c.batches[name] = batch
return nil
}
var batches = map[string]string{
// Sessions table
"sessions": "INSERT INTO sessions (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, datetime, duration, pages_count, events_count, errors_count, user_browser, user_browser_version) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"metadata": "INSERT INTO sessions_metadata (session_id, user_id, user_anonymous_id, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10, datetime) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
// Events table
"resources": "INSERT INTO resources (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, url, type, duration, ttfb, header_size, encoded_body_size, decoded_body_size, success) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"pages": "INSERT INTO pages (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, url, request_start, response_start, response_end, dom_content_loaded_event_start, dom_content_loaded_event_end, load_event_start, load_event_end, first_paint, first_contentful_paint, speed_index, visually_complete, time_to_interactive) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"clicks": "INSERT INTO clicks (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, label, hesitation_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"inputs": "INSERT INTO inputs (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, label) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"errors": "INSERT INTO errors (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, source, name, message, error_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"performance": "INSERT INTO performance (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, min_fps, avg_fps, max_fps, min_cpu, avg_cpu, max_cpu, min_total_js_heap_size, avg_total_js_heap_size, max_total_js_heap_size, min_used_js_heap_size, avg_used_js_heap_size, max_used_js_heap_size) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"longtasks": "INSERT INTO longtasks (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, context, container_type, container_id, container_name, container_src) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
}
func (c *connectorImpl) Prepare() error {
for table, query := range batches {
if err := c.newBatch(table, query); err != nil {
return fmt.Errorf("can't create %s batch: %s", table, err)
}
}
return nil
}
func (c *connectorImpl) Commit() error {
for _, b := range c.batches {
if err := b.Send(); err != nil {
return fmt.Errorf("can't send batch: %s", err)
}
}
return nil
}
func (c *connectorImpl) FinaliseSessionsTable() error {
if err := c.conn.Exec(context.Background(), "OPTIMIZE TABLE sessions FINAL"); err != nil {
return fmt.Errorf("can't finalise sessions table: %s", err)
}
return nil
}
func (c *connectorImpl) InsertWebSession(session *types.Session) error {
if session.Duration == nil {
return errors.New("trying to insert session with nil duration")
}
if err := c.batches["sessions"].Append(
session.SessionID,
session.ProjectID,
session.TrackerVersion,
nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
datetime(session.Timestamp),
uint32(*session.Duration),
uint16(session.PagesCount),
uint16(session.EventsCount),
uint16(session.ErrorsCount),
// Web unique columns
session.UserBrowser,
nullableString(session.UserBrowserVersion),
); err != nil {
return fmt.Errorf("can't append to sessions batch: %s", err)
}
if err := c.batches["metadata"].Append(
session.SessionID,
session.UserID,
session.UserAnonymousID,
session.Metadata1,
session.Metadata2,
session.Metadata3,
session.Metadata4,
session.Metadata5,
session.Metadata6,
session.Metadata7,
session.Metadata8,
session.Metadata9,
session.Metadata10,
datetime(session.Timestamp),
); err != nil {
return fmt.Errorf("can't append to metadata batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertWebResourceEvent(session *types.Session, msg *messages.ResourceEvent) error {
var method interface{} = url.EnsureMethod(msg.Method)
if method == "" {
method = nil
}
if err := c.batches["resources"].Append(
session.SessionID,
session.ProjectID,
session.TrackerVersion,
nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
session.UserBrowser,
nullableString(session.UserBrowserVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
datetime(msg.Timestamp),
url.DiscardURLQuery(msg.URL),
msg.Type,
nullableUint16(uint16(msg.Duration)),
nullableUint16(uint16(msg.TTFB)),
nullableUint16(uint16(msg.HeaderSize)),
nullableUint32(uint32(msg.EncodedBodySize)),
nullableUint32(uint32(msg.DecodedBodySize)),
msg.Success,
); err != nil {
return fmt.Errorf("can't append to resources batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertWebPageEvent(session *types.Session, msg *messages.PageEvent) error {
if err := c.batches["pages"].Append(
session.SessionID,
session.ProjectID,
session.TrackerVersion, nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
session.UserBrowser,
nullableString(session.UserBrowserVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
datetime(msg.Timestamp),
url.DiscardURLQuery(msg.URL),
nullableUint16(uint16(msg.RequestStart)),
nullableUint16(uint16(msg.ResponseStart)),
nullableUint16(uint16(msg.ResponseEnd)),
nullableUint16(uint16(msg.DomContentLoadedEventStart)),
nullableUint16(uint16(msg.DomContentLoadedEventEnd)),
nullableUint16(uint16(msg.LoadEventStart)),
nullableUint16(uint16(msg.LoadEventEnd)),
nullableUint16(uint16(msg.FirstPaint)),
nullableUint16(uint16(msg.FirstContentfulPaint)),
nullableUint16(uint16(msg.SpeedIndex)),
nullableUint16(uint16(msg.VisuallyComplete)),
nullableUint16(uint16(msg.TimeToInteractive)),
); err != nil {
return fmt.Errorf("can't append to pages batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertWebClickEvent(session *types.Session, msg *messages.ClickEvent) error {
if msg.Label == "" {
return nil
}
if err := c.batches["clicks"].Append(
session.SessionID,
session.ProjectID,
session.TrackerVersion,
nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
session.UserBrowser,
nullableString(session.UserBrowserVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
datetime(msg.Timestamp),
msg.Label,
nullableUint32(uint32(msg.HesitationTime)),
); err != nil {
return fmt.Errorf("can't append to clicks batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertWebInputEvent(session *types.Session, msg *messages.InputEvent) error {
if msg.Label == "" {
return nil
}
if err := c.batches["inputs"].Append(
session.SessionID,
session.ProjectID,
session.TrackerVersion,
nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
session.UserBrowser,
nullableString(session.UserBrowserVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
datetime(msg.Timestamp),
msg.Label,
); err != nil {
return fmt.Errorf("can't append to inputs batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertWebErrorEvent(session *types.Session, msg *messages.ErrorEvent) error {
if err := c.batches["errors"].Append(
session.SessionID,
session.ProjectID,
session.TrackerVersion,
nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
session.UserBrowser,
nullableString(session.UserBrowserVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
datetime(msg.Timestamp),
msg.Source,
nullableString(msg.Name),
msg.Message,
hashid.WebErrorID(session.ProjectID, msg),
); err != nil {
return fmt.Errorf("can't append to errors batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertWebPerformanceTrackAggr(session *types.Session, msg *messages.PerformanceTrackAggr) error {
var timestamp uint64 = (msg.TimestampStart + msg.TimestampEnd) / 2
if err := c.batches["performance"].Append(
session.SessionID,
session.ProjectID,
session.TrackerVersion,
nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
session.UserBrowser,
nullableString(session.UserBrowserVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
datetime(timestamp),
uint8(msg.MinFPS),
uint8(msg.AvgFPS),
uint8(msg.MaxFPS),
uint8(msg.MinCPU),
uint8(msg.AvgCPU),
uint8(msg.MaxCPU),
msg.MinTotalJSHeapSize,
msg.AvgTotalJSHeapSize,
msg.MaxTotalJSHeapSize,
msg.MinUsedJSHeapSize,
msg.AvgUsedJSHeapSize,
msg.MaxUsedJSHeapSize,
); err != nil {
return fmt.Errorf("can't append to performance batch: %s", err)
}
return nil
}
func (c *connectorImpl) InsertLongtask(session *types.Session, msg *messages.LongTask) error {
if err := c.batches["longtasks"].Append(
session.SessionID,
session.ProjectID,
session.TrackerVersion,
nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
session.UserBrowser,
nullableString(session.UserBrowserVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
datetime(msg.Timestamp),
CONTEXT_MAP[msg.Context],
CONTAINER_TYPE_MAP[msg.ContainerType],
msg.ContainerId,
msg.ContainerName,
msg.ContainerSrc,
); err != nil {
return fmt.Errorf("can't append to longtasks batch: %s", err)
}
return nil
}
func nullableUint16(v uint16) *uint16 {
var p *uint16 = nil
if v != 0 {
p = &v
}
return p
}
func nullableUint32(v uint32) *uint32 {
var p *uint32 = nil
if v != 0 {
p = &v
}
return p
}
func nullableString(v string) *string {
var p *string = nil
if v != "" {
p = &v
}
return p
}
func datetime(timestamp uint64) time.Time {
t := time.Unix(int64(timestamp/1e3), 0)
// Temporal solution for not correct timestamps in performance messages
if t.Year() < 2022 && t.Year() > 2025 {
return time.Now()
}
return t
}

View file

@ -0,0 +1,54 @@
package postgres
import (
"fmt"
"github.com/jackc/pgx/v4"
"testing"
)
type poolMock struct {
//
}
func (p poolMock) Query(sql string, args ...interface{}) (pgx.Rows, error) {
return nil, nil
}
func (p poolMock) QueryRow(sql string, args ...interface{}) pgx.Row {
return nil
}
func (p poolMock) Exec(sql string, arguments ...interface{}) error {
fmt.Println(sql)
fmt.Println(arguments...)
return nil
}
func (p poolMock) SendBatch(b *pgx.Batch) pgx.BatchResults {
return nil
}
func (p poolMock) Begin() (*_Tx, error) {
return nil, nil
}
func (p poolMock) Close() {
}
func NewPoolMock() Pool {
return &poolMock{}
}
func TestBulk(t *testing.T) {
conn := NewPoolMock()
bulk, err := NewBulk(conn, "autocomplete", "(value, type, project_id)", "($%d, $%d, $%d)", 3, 10)
if err != nil {
t.Errorf("can't create bulk: %s", err)
}
for i := 0; i < 10; i++ {
if err := bulk.Append(fmt.Sprintf("var1+%d", i), fmt.Sprintf("var2+%d", i),
i%2 == 0); err != nil {
t.Errorf("can't add new values to bulk: %s", err)
}
}
}

View file

@ -2,169 +2,155 @@ package messages
import (
"bytes"
"fmt"
"github.com/pkg/errors"
"io"
"log"
"strings"
)
// RawMessage is a not decoded message
type RawMessage struct {
tp uint64
size uint64
data []byte
meta *message
type Iterator interface {
Next() bool // Return true if we have next message
Type() int // Return type of the next message
Message() Message // Return raw or decoded message
}
func (m *RawMessage) Encode() []byte {
return m.data
type iteratorImpl struct {
data *bytes.Reader
index uint64
timestamp int64
version uint64
msgType uint64
msgSize uint64
canSkip bool
msg Message
}
func (m *RawMessage) TypeID() int {
return int(m.tp)
}
func (m *RawMessage) Meta() *message {
return m.meta
}
func WriteSize(size uint64, buf []byte, p int) {
var m uint64 = 255
for i := 0; i < 3; i++ {
buf[p+i] = byte(size & m)
size = size >> 8
func NewIterator(data []byte) Iterator {
return &iteratorImpl{
data: bytes.NewReader(data),
}
fmt.Println(buf)
}
func ReadSize(reader io.Reader) (uint64, error) {
buf := make([]byte, 3)
n, err := io.ReadFull(reader, buf)
func (i *iteratorImpl) Next() bool {
if i.canSkip {
log.Printf("skip message, type: %d, size: %d", i.msgType, i.msgSize)
if _, err := i.data.Seek(int64(i.msgSize), io.SeekCurrent); err != nil {
log.Printf("seek err: %s", err)
return false
}
}
i.canSkip = false
var err error
i.msgType, err = ReadUint(i.data)
if err != nil {
return 0, err
if err == io.EOF {
return false
}
log.Printf("can't read message type: %s", err)
return false
}
if n != 3 {
return 0, fmt.Errorf("read only %d of 3 size bytes", n)
log.Printf("message type: %d", i.msgType)
if i.version > 0 && messageHasSize(i.msgType) {
// Read message size if it is a new protocol version
i.msgSize, err = ReadSize(i.data)
if err != nil {
log.Printf("can't read message size: %s", err)
return false
}
log.Println("message size:", i.msgSize)
i.msg = &RawMessage{
tp: i.msgType,
size: i.msgSize,
meta: &message{},
reader: i.data,
skipped: &i.canSkip,
}
i.canSkip = true
} else {
i.msg, err = ReadMessage(i.msgType, i.data)
if err == io.EOF {
return false
} else if err != nil {
if strings.HasPrefix(err.Error(), "Unknown message code:") {
code := strings.TrimPrefix(err.Error(), "Unknown message code: ")
i.msg, err = DecodeExtraMessage(code, i.data)
if err != nil {
log.Printf("can't decode msg: %s", err)
return false
}
} else {
log.Printf("Batch Message decoding error on message with index %v, err: %s", i.index, err)
return false
}
}
i.msg = transformDeprecated(i.msg)
}
var size uint64
for i, b := range buf {
size += uint64(b) << (8 * i)
// Process meta information
isBatchMeta := false
switch i.msgType {
case MsgBatchMetadata:
if i.index != 0 { // Might be several 0-0 BatchMeta in a row without an error though
log.Printf("Batch Meta found at the end of the batch")
return false
}
m := i.msg.Decode().(*BatchMetadata)
i.index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha)
i.timestamp = m.Timestamp
i.version = m.Version
isBatchMeta = true
log.Printf("new batch version: %d", i.version)
case MsgBatchMeta: // Is not required to be present in batch since IOS doesn't have it (though we might change it)
if i.index != 0 { // Might be several 0-0 BatchMeta in a row without an error though
log.Printf("Batch Meta found at the end of the batch")
return false
}
m := i.msg.Decode().(*BatchMeta)
i.index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha)
i.timestamp = m.Timestamp
isBatchMeta = true
// continue readLoop
case MsgIOSBatchMeta:
if i.index != 0 { // Might be several 0-0 BatchMeta in a row without an error though
log.Printf("Batch Meta found at the end of the batch")
return false
}
m := i.msg.Decode().(*IOSBatchMeta)
i.index = m.FirstIndex
i.timestamp = int64(m.Timestamp)
isBatchMeta = true
// continue readLoop
case MsgTimestamp:
m := i.msg.Decode().(*Timestamp)
i.timestamp = int64(m.Timestamp)
// No skipping here for making it easy to encode back the same sequence of message
// continue readLoop
case MsgSessionStart:
m := i.msg.Decode().(*SessionStart)
i.timestamp = int64(m.Timestamp)
case MsgSessionEnd:
m := i.msg.Decode().(*SessionEnd)
i.timestamp = int64(m.Timestamp)
}
return size, nil
i.msg.Meta().Index = i.index
i.msg.Meta().Timestamp = i.timestamp
if !isBatchMeta { // Without that indexes will be unique anyway, though shifted by 1 because BatchMeta is not counted in tracker
i.index++
}
return true
}
func (i *iteratorImpl) Type() int {
return int(i.msgType)
}
func (i *iteratorImpl) Message() Message {
return i.msg
}
func messageHasSize(msgType uint64) bool {
return !(msgType == 80 || msgType == 81 || msgType == 82)
}
func ReadBatchReader(reader io.Reader, messageHandler func(Message)) error {
var (
index uint64
timestamp int64
version uint64
)
log.Println("new batch")
for {
// Read message type
msgType, err := ReadUint(reader)
if err != nil {
if err == io.EOF {
return nil
}
return fmt.Errorf("can't read message type: %s", err)
}
log.Printf("message type: %d", msgType)
var msg Message
if version > 0 && messageHasSize(msgType) {
// Read message size if it's new protocol version
msgSize, err := ReadSize(reader)
if err != nil {
return fmt.Errorf("can't read message size: %s", err)
}
// Read raw message (bytes)
log.Println("size:", msgSize)
buf := make([]byte, msgSize)
_, err = io.ReadFull(reader, buf)
if err != nil {
return fmt.Errorf("can't read raw body: %s", err)
}
// Create message object
msg = &RawMessage{
tp: msgType,
size: msgSize,
data: buf,
meta: &message{},
}
// Temp code
msg, err = ReadMessage(msgType, bytes.NewReader(buf))
if err != nil {
return err
}
} else {
msg, err = ReadMessage(msgType, reader)
if err == io.EOF {
return nil
} else if err != nil {
if strings.HasPrefix(err.Error(), "Unknown message code:") {
code := strings.TrimPrefix(err.Error(), "Unknown message code: ")
msg, err = DecodeExtraMessage(code, reader)
if err != nil {
return fmt.Errorf("can't decode msg: %s", err)
}
} else {
return errors.Wrapf(err, "Batch Message decoding error on message with index %v", index)
}
}
msg = transformDeprecated(msg)
}
isBatchMeta := false
switch m := msg.(type) {
case *BatchMetadata:
if index != 0 { // Might be several 0-0 BatchMeta in a row without a error though
return errors.New("Batch Meta found at the end of the batch")
}
index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha)
timestamp = m.Timestamp
version = m.Version
isBatchMeta = true
log.Printf("new batch version: %d", version)
case *BatchMeta: // Is not required to be present in batch since IOS doesn't have it (though we might change it)
if index != 0 { // Might be several 0-0 BatchMeta in a row without a error though
return errors.New("Batch Meta found at the end of the batch")
}
index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha)
timestamp = m.Timestamp
isBatchMeta = true
// continue readLoop
case *IOSBatchMeta:
if index != 0 { // Might be several 0-0 BatchMeta in a row without a error though
return errors.New("Batch Meta found at the end of the batch")
}
index = m.FirstIndex
timestamp = int64(m.Timestamp)
isBatchMeta = true
// continue readLoop
case *Timestamp:
timestamp = int64(m.Timestamp) // TODO(?): replace timestamp type to int64 everywhere (including encoding part in tracker)
// No skipping here for making it easy to encode back the same sequence of message
// continue readLoop
case *SessionStart:
timestamp = int64(m.Timestamp)
case *SessionEnd:
timestamp = int64(m.Timestamp)
}
msg.Meta().Index = index
msg.Meta().Timestamp = timestamp
messageHandler(msg)
if !isBatchMeta { // Without that indexes will be unique anyway, though shifted by 1 because BatchMeta is not counted in tracker
index++
}
}
return errors.New("Error of the codeflow. (Should return on EOF)")
}

View file

@ -1,6 +1,7 @@
package messages
import (
"encoding/binary"
"fmt"
"io"
)
@ -20,6 +21,21 @@ func (msg *SessionSearch) Encode() []byte {
return buf[:p]
}
func (msg *SessionSearch) EncodeWithIndex() []byte {
encoded := msg.Encode()
if IsIOSType(msg.TypeID()) {
return encoded
}
data := make([]byte, len(encoded)+8)
copy(data[8:], encoded[:])
binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index)
return data
}
func (msg *SessionSearch) Decode() Message {
return msg
}
func (msg *SessionSearch) TypeID() int {
return 127
}

View file

@ -16,6 +16,8 @@ func (m *message) SetMeta(origin *message) {
type Message interface {
Encode() []byte
EncodeWithIndex() []byte
Decode() Message
TypeID() int
Meta() *message
}

File diff suppressed because it is too large Load diff

View file

@ -3,6 +3,7 @@ package messages
import (
"encoding/json"
"errors"
"fmt"
"io"
"log"
)
@ -16,15 +17,6 @@ func ReadByte(reader io.Reader) (byte, error) {
return p[0], nil
}
// func SkipBytes(reader io.ReadSeeker) error {
// n, err := ReadUint(reader)
// if err != nil {
// return err
// }
// _, err = reader.Seek(n, io.SeekCurrent);
// return err
// }
func ReadData(reader io.Reader) ([]byte, error) {
n, err := ReadUint(reader)
if err != nil {
@ -153,3 +145,28 @@ func WriteJson(v interface{}, buf []byte, p int) int {
}
return WriteData(data, buf, p)
}
func WriteSize(size uint64, buf []byte, p int) {
var m uint64 = 255
for i := 0; i < 3; i++ {
buf[p+i] = byte(size & m)
size = size >> 8
}
fmt.Println(buf)
}
func ReadSize(reader io.Reader) (uint64, error) {
buf := make([]byte, 3)
n, err := io.ReadFull(reader, buf)
if err != nil {
return 0, err
}
if n != 3 {
return 0, fmt.Errorf("read only %d of 3 size bytes", n)
}
var size uint64
for i, b := range buf {
size += uint64(b) << (8 * i)
}
return size, nil
}

View file

@ -0,0 +1,67 @@
package messages
import (
"bytes"
"encoding/binary"
"io"
"log"
)
// RawMessage is a not decoded message
type RawMessage struct {
tp uint64
size uint64
data []byte
reader *bytes.Reader
meta *message
encoded bool
skipped *bool
}
func (m *RawMessage) Encode() []byte {
if m.encoded {
return m.data
}
m.data = make([]byte, m.size)
m.encoded = true
*m.skipped = false
n, err := io.ReadFull(m.reader, m.data)
if err != nil {
log.Printf("message encode err: %s", err)
return nil
}
log.Printf("encode: read %d of %d bytes", n, m.size)
return m.data
}
func (m *RawMessage) EncodeWithIndex() []byte {
if !m.encoded {
m.Encode()
}
if IsIOSType(int(m.tp)) {
return m.data
}
data := make([]byte, len(m.data)+8)
copy(data[8:], m.data[:])
binary.LittleEndian.PutUint64(data[0:], m.Meta().Index)
return data
}
func (m *RawMessage) Decode() Message {
if !m.encoded {
m.Encode()
}
msg, err := ReadMessage(m.tp, bytes.NewReader(m.data))
if err != nil {
log.Printf("decode err: %s", err)
}
return msg
}
func (m *RawMessage) TypeID() int {
return int(m.tp)
}
func (m *RawMessage) Meta() *message {
return m.meta
}

View file

@ -1,19 +1,12 @@
package queue
import (
"bytes"
"log"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue/types"
)
func NewMessageConsumer(group string, topics []string, handler types.DecodedMessageHandler, autoCommit bool, messageSizeLimit int) types.Consumer {
func NewMessageConsumer(group string, topics []string, handler types.RawMessageHandler, autoCommit bool, messageSizeLimit int) types.Consumer {
return NewConsumer(group, topics, func(sessionID uint64, value []byte, meta *types.Meta) {
if err := messages.ReadBatchReader(bytes.NewReader(value), func(msg messages.Message) {
handler(sessionID, msg, meta)
}); err != nil {
log.Printf("Decode error: %v\n", err)
}
handler(sessionID, messages.NewIterator(value), meta)
}, autoCommit, messageSizeLimit)
}

View file

@ -26,3 +26,4 @@ type Meta struct {
type MessageHandler func(uint64, []byte, *Meta)
type DecodedMessageHandler func(uint64, messages.Message, *Meta)
type RawMessageHandler func(uint64, messages.Iterator, *Meta)

View file

@ -1,5 +1,12 @@
// Auto-generated, do not edit
package messages
const (
<% $messages.each do |msg| %>
Msg<%= msg.name %> = <%= msg.id %>
<% end %>
)
<% $messages.each do |msg| %>
type <%= msg.name %> struct {
message
@ -16,6 +23,21 @@ func (msg *<%= msg.name %>) Encode() []byte {
return buf[:p]
}
func (msg *<%= msg.name %>) EncodeWithIndex() []byte {
encoded := msg.Encode()
if IsIOSType(msg.TypeID()) {
return encoded
}
data := make([]byte, len(encoded)+8)
copy(data[8:], encoded[:])
binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index)
return data
}
func (msg *<%= msg.name %>) Decode() Message {
return msg
}
func (msg *<%= msg.name %>) TypeID() int {
return <%= msg.id %>
}