diff --git a/backend/cmd/assets/file b/backend/cmd/assets/file new file mode 100644 index 000000000..f0018a2e8 --- /dev/null +++ b/backend/cmd/assets/file @@ -0,0 +1 @@ +GROUP_CACHE=from_file \ No newline at end of file diff --git a/backend/cmd/assets/main.go b/backend/cmd/assets/main.go index 86eb4865f..629224da7 100644 --- a/backend/cmd/assets/main.go +++ b/backend/cmd/assets/main.go @@ -3,7 +3,7 @@ package main import ( "context" "log" - "openreplay/backend/pkg/monitoring" + "openreplay/backend/pkg/queue/types" "os" "os/signal" "syscall" @@ -13,8 +13,8 @@ import ( "openreplay/backend/internal/assets/cacher" config "openreplay/backend/internal/config/assets" "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/monitoring" "openreplay/backend/pkg/queue" - "openreplay/backend/pkg/queue/types" ) func main() { @@ -34,22 +34,25 @@ func main() { consumer := queue.NewMessageConsumer( cfg.GroupCache, []string{cfg.TopicCache}, - func(sessionID uint64, message messages.Message, e *types.Meta) { - switch msg := message.(type) { - case *messages.AssetCache: - cacher.CacheURL(sessionID, msg.URL) - totalAssets.Add(context.Background(), 1) - case *messages.ErrorEvent: - if msg.Source != "js_exception" { - return - } - sourceList, err := assets.ExtractJSExceptionSources(&msg.Payload) - if err != nil { - log.Printf("Error on source extraction: %v", err) - return - } - for _, source := range sourceList { - cacher.CacheJSFile(source) + func(sessionID uint64, iter messages.Iterator, meta *types.Meta) { + for iter.Next() { + if iter.Type() == messages.MsgAssetCache { + msg := iter.Message().Decode().(*messages.AssetCache) + cacher.CacheURL(sessionID, msg.URL) + totalAssets.Add(context.Background(), 1) + } else if iter.Type() == messages.MsgErrorEvent { + msg := iter.Message().Decode().(*messages.ErrorEvent) + if msg.Source != "js_exception" { + continue + } + sourceList, err := assets.ExtractJSExceptionSources(&msg.Payload) + if err != nil { + log.Printf("Error on source extraction: %v", err) + continue + } + for _, source := range sourceList { + cacher.CacheJSFile(source) + } } } }, diff --git a/backend/cmd/db/main.go b/backend/cmd/db/main.go index 1712b8a3f..8f2334f21 100644 --- a/backend/cmd/db/main.go +++ b/backend/cmd/db/main.go @@ -3,24 +3,23 @@ package main import ( "errors" "log" - "openreplay/backend/internal/config/db" - "openreplay/backend/internal/db/datasaver" - "openreplay/backend/pkg/handlers" - custom2 "openreplay/backend/pkg/handlers/custom" - "openreplay/backend/pkg/monitoring" - "openreplay/backend/pkg/sessions" - "time" - + "openreplay/backend/pkg/queue/types" "os" "os/signal" "syscall" + "time" + "openreplay/backend/internal/config/db" + "openreplay/backend/internal/db/datasaver" "openreplay/backend/pkg/db/cache" "openreplay/backend/pkg/db/postgres" + "openreplay/backend/pkg/handlers" + custom2 "openreplay/backend/pkg/handlers/custom" logger "openreplay/backend/pkg/log" "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/monitoring" "openreplay/backend/pkg/queue" - "openreplay/backend/pkg/queue/types" + "openreplay/backend/pkg/sessions" ) func main() { @@ -51,49 +50,59 @@ func main() { saver.InitStats() statsLogger := logger.NewQueueStats(cfg.LoggerTimeout) + keepMessage := func(tp int) bool { + return tp == messages.MsgMetadata || tp == messages.MsgIssueEvent || tp == messages.MsgSessionStart || tp == messages.MsgSessionEnd || tp == messages.MsgUserID || tp == messages.MsgUserAnonymousID || tp == messages.MsgCustomEvent || tp == messages.MsgClickEvent || tp == messages.MsgInputEvent || tp == messages.MsgPageEvent || tp == messages.MsgErrorEvent || tp == messages.MsgFetchEvent || tp == messages.MsgGraphQLEvent || tp == messages.MsgIntegrationEvent || tp == messages.MsgPerformanceTrackAggr || tp == messages.MsgResourceEvent || tp == messages.MsgLongTask || tp == messages.MsgJSException || tp == messages.MsgResourceTiming || tp == messages.MsgRawCustomEvent || tp == messages.MsgCustomIssue || tp == messages.MsgFetch || tp == messages.MsgGraphQL || tp == messages.MsgStateAction || tp == messages.MsgSetInputTarget || tp == messages.MsgSetInputValue || tp == messages.MsgCreateDocument || tp == messages.MsgMouseClick || tp == messages.MsgSetPageLocation || tp == messages.MsgPageLoadTiming || tp == messages.MsgPageRenderTiming + } + // Handler logic - handler := func(sessionID uint64, msg messages.Message, meta *types.Meta) { + handler := func(sessionID uint64, iter messages.Iterator, meta *types.Meta) { statsLogger.Collect(sessionID, meta) - // Just save session data into db without additional checks - if err := saver.InsertMessage(sessionID, msg); err != nil { - if !postgres.IsPkeyViolation(err) { - log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg) + for iter.Next() { + if !keepMessage(iter.Type()) { + continue } - return - } + msg := iter.Message().Decode() - session, err := pg.GetSession(sessionID) - if session == nil { - if err != nil && !errors.Is(err, cache.NilSessionInCacheError) { - log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, sessionID, msg) - } - return - } - - // Save statistics to db - err = saver.InsertStats(session, msg) - if err != nil { - log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg) - } - - // Handle heuristics and save to temporary queue in memory - builderMap.HandleMessage(sessionID, msg, msg.Meta().Index) - - // Process saved heuristics messages as usual messages above in the code - builderMap.IterateSessionReadyMessages(sessionID, func(msg messages.Message) { - // TODO: DRY code (carefully with the return statement logic) + // Just save session data into db without additional checks if err := saver.InsertMessage(sessionID, msg); err != nil { if !postgres.IsPkeyViolation(err) { - log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg) + log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg) } return } - if err := saver.InsertStats(session, msg); err != nil { - log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg) + session, err := pg.GetSession(sessionID) + if session == nil { + if err != nil && !errors.Is(err, cache.NilSessionInCacheError) { + log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, sessionID, msg) + } + return } - }) + + // Save statistics to db + err = saver.InsertStats(session, msg) + if err != nil { + log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg) + } + + // Handle heuristics and save to temporary queue in memory + builderMap.HandleMessage(sessionID, msg, msg.Meta().Index) + + // Process saved heuristics messages as usual messages above in the code + builderMap.IterateSessionReadyMessages(sessionID, func(msg messages.Message) { + if err := saver.InsertMessage(sessionID, msg); err != nil { + if !postgres.IsPkeyViolation(err) { + log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg) + } + return + } + + if err := saver.InsertStats(session, msg); err != nil { + log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg) + } + }) + } } // Init consumer diff --git a/backend/cmd/db/values.yaml b/backend/cmd/db/values.yaml new file mode 100644 index 000000000..2c0f0e7f3 --- /dev/null +++ b/backend/cmd/db/values.yaml @@ -0,0 +1,92 @@ +chalice: + env: + jwt_secret: SetARandomStringHere +clickhouse: + enabled: false +fromVersion: v1.6.0 +global: + domainName: openreplay.local + email: + emailFrom: OpenReplay + emailHost: "" + emailPassword: "" + emailPort: "587" + emailSslCert: "" + emailSslKey: "" + emailUseSsl: "false" + emailUseTls: "true" + emailUser: "" + enterpriseEditionLicense: "" + ingress: + controller: + config: + enable-real-ip: true + force-ssl-redirect: false + max-worker-connections: 0 + proxy-body-size: 10m + ssl-redirect: false + extraArgs: + default-ssl-certificate: app/openreplay-ssl + ingressClass: openreplay + ingressClassResource: + name: openreplay + service: + externalTrafficPolicy: Local + kafka: + kafkaHost: kafka.db.svc.cluster.local + kafkaPort: "9092" + kafkaUseSsl: "false" + zookeeperHost: databases-zookeeper.svc.cluster.local + zookeeperNonTLSPort: 2181 + postgresql: + postgresqlDatabase: postgres + postgresqlHost: postgresql.db.svc.cluster.local + postgresqlPassword: changeMePassword + postgresqlPort: "5432" + postgresqlUser: postgres + redis: + redisHost: redis-master.db.svc.cluster.local + redisPort: "6379" + s3: + accessKey: changeMeMinioAccessKey + assetsBucket: sessions-assets + endpoint: http://minio.db.svc.cluster.local:9000 + recordingsBucket: mobs + region: us-east-1 + secretKey: changeMeMinioPassword + sourcemapsBucket: sourcemaps +ingress-nginx: + controller: + config: + enable-real-ip: true + force-ssl-redirect: false + max-worker-connections: 0 + proxy-body-size: 10m + ssl-redirect: false + extraArgs: + default-ssl-certificate: app/openreplay-ssl + ingressClass: openreplay + ingressClassResource: + name: openreplay + service: + externalTrafficPolicy: Local +kafka: + kafkaHost: kafka.db.svc.cluster.local + kafkaPort: "9092" + kafkaUseSsl: "false" + zookeeperHost: databases-zookeeper.svc.cluster.local + zookeeperNonTLSPort: 2181 +minio: + global: + minio: + accessKey: changeMeMinioAccessKey + secretKey: changeMeMinioPassword +postgresql: + postgresqlDatabase: postgres + postgresqlHost: postgresql.db.svc.cluster.local + postgresqlPassword: changeMePassword + postgresqlPort: "5432" + postgresqlUser: postgres +redis: + redisHost: redis-master.db.svc.cluster.local + redisPort: "6379" diff --git a/backend/cmd/ender/main.go b/backend/cmd/ender/main.go index 1fd2f4e64..524af0894 100644 --- a/backend/cmd/ender/main.go +++ b/backend/cmd/ender/main.go @@ -2,25 +2,23 @@ package main import ( "log" + "openreplay/backend/pkg/queue/types" + "os" + "os/signal" + "syscall" + "time" + "openreplay/backend/internal/config/ender" "openreplay/backend/internal/sessionender" "openreplay/backend/pkg/db/cache" "openreplay/backend/pkg/db/postgres" - "openreplay/backend/pkg/monitoring" - "time" - - "os" - "os/signal" - "syscall" - "openreplay/backend/pkg/intervals" logger "openreplay/backend/pkg/log" "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/monitoring" "openreplay/backend/pkg/queue" - "openreplay/backend/pkg/queue/types" ) -// func main() { metrics := monitoring.New("ender") @@ -45,18 +43,17 @@ func main() { []string{ cfg.TopicRawWeb, }, - func(sessionID uint64, msg messages.Message, meta *types.Meta) { - switch msg.(type) { - case *messages.SessionStart, *messages.SessionEnd: - // Skip several message types - return + func(sessionID uint64, iter messages.Iterator, meta *types.Meta) { + for iter.Next() { + if iter.Type() == messages.MsgSessionStart || iter.Type() == messages.MsgSessionEnd { + continue + } + if iter.Message().Meta().Timestamp == 0 { + log.Printf("ZERO TS, sessID: %d, msgType: %d", sessionID, iter.Type()) + } + statsLogger.Collect(sessionID, meta) + sessions.UpdateSession(sessionID, meta.Timestamp, iter.Message().Meta().Timestamp) } - // Test debug - if msg.Meta().Timestamp == 0 { - log.Printf("ZERO TS, sessID: %d, msgType: %d", sessionID, msg.TypeID()) - } - statsLogger.Collect(sessionID, meta) - sessions.UpdateSession(sessionID, meta.Timestamp, msg.Meta().Timestamp) }, false, cfg.MessageSizeLimit, diff --git a/backend/cmd/heuristics/main.go b/backend/cmd/heuristics/main.go index 2163c648b..bae5b7a40 100644 --- a/backend/cmd/heuristics/main.go +++ b/backend/cmd/heuristics/main.go @@ -2,6 +2,12 @@ package main import ( "log" + "openreplay/backend/pkg/queue/types" + "os" + "os/signal" + "syscall" + "time" + "openreplay/backend/internal/config/heuristics" "openreplay/backend/pkg/handlers" web2 "openreplay/backend/pkg/handlers/web" @@ -9,12 +15,7 @@ import ( logger "openreplay/backend/pkg/log" "openreplay/backend/pkg/messages" "openreplay/backend/pkg/queue" - "openreplay/backend/pkg/queue/types" "openreplay/backend/pkg/sessions" - "os" - "os/signal" - "syscall" - "time" ) func main() { @@ -55,9 +56,11 @@ func main() { []string{ cfg.TopicRawWeb, }, - func(sessionID uint64, msg messages.Message, meta *types.Meta) { - statsLogger.Collect(sessionID, meta) - builderMap.HandleMessage(sessionID, msg, msg.Meta().Index) + func(sessionID uint64, iter messages.Iterator, meta *types.Meta) { + for iter.Next() { + statsLogger.Collect(sessionID, meta) + builderMap.HandleMessage(sessionID, iter.Message(), iter.Message().Meta().Index) + } }, false, cfg.MessageSizeLimit, diff --git a/backend/cmd/sink/main.go b/backend/cmd/sink/main.go index 9dcaa704d..6d52b494c 100644 --- a/backend/cmd/sink/main.go +++ b/backend/cmd/sink/main.go @@ -2,22 +2,20 @@ package main import ( "context" - "encoding/binary" "log" - "openreplay/backend/internal/sink/assetscache" - "openreplay/backend/internal/sink/oswriter" - "openreplay/backend/internal/storage" - "openreplay/backend/pkg/monitoring" - "time" - + "openreplay/backend/pkg/queue/types" "os" "os/signal" "syscall" + "time" "openreplay/backend/internal/config/sink" + "openreplay/backend/internal/sink/assetscache" + "openreplay/backend/internal/sink/oswriter" + "openreplay/backend/internal/storage" . "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/monitoring" "openreplay/backend/pkg/queue" - "openreplay/backend/pkg/queue/types" "openreplay/backend/pkg/url/assets" ) @@ -58,51 +56,49 @@ func main() { []string{ cfg.TopicRawWeb, }, - func(sessionID uint64, message Message, _ *types.Meta) { - // Process assets - message = assetMessageHandler.ParseAssets(sessionID, message) + func(sessionID uint64, iter Iterator, meta *types.Meta) { + for iter.Next() { + // [METRICS] Increase the number of processed messages + totalMessages.Add(context.Background(), 1) - totalMessages.Add(context.Background(), 1) - - // Filter message - typeID := message.TypeID() - - // Send SessionEnd trigger to storage service - switch message.(type) { - case *SessionEnd: - if err := producer.Produce(cfg.TopicTrigger, sessionID, Encode(message)); err != nil { - log.Printf("can't send SessionEnd to trigger topic: %s; sessID: %d", err, sessionID) + // Send SessionEnd trigger to storage service + if iter.Type() == MsgSessionEnd { + if err := producer.Produce(cfg.TopicTrigger, sessionID, iter.Message().Encode()); err != nil { + log.Printf("can't send SessionEnd to trigger topic: %s; sessID: %d", err, sessionID) + } + continue } - return - } - if !IsReplayerType(typeID) { - return - } - // If message timestamp is empty, use at least ts of session start - ts := message.Meta().Timestamp - if ts == 0 { - log.Printf("zero ts; sessID: %d, msg: %+v", sessionID, message) - } else { - // Log ts of last processed message - counter.Update(sessionID, time.UnixMilli(ts)) - } + msg := iter.Message() + // Process assets + if iter.Type() == MsgSetNodeAttributeURLBased || iter.Type() == MsgSetCSSDataURLBased || iter.Type() == MsgCSSInsertRuleURLBased { + msg = assetMessageHandler.ParseAssets(sessionID, msg.Decode()) + } - value := message.Encode() - var data []byte - if IsIOSType(typeID) { - data = value - } else { - data = make([]byte, len(value)+8) - copy(data[8:], value[:]) - binary.LittleEndian.PutUint64(data[0:], message.Meta().Index) - } - if err := writer.Write(sessionID, data); err != nil { - log.Printf("Writer error: %v\n", err) - } + // Filter message + if !IsReplayerType(msg.TypeID()) { + return + } - messageSize.Record(context.Background(), float64(len(data))) - savedMessages.Add(context.Background(), 1) + // If message timestamp is empty, use at least ts of session start + ts := msg.Meta().Timestamp + if ts == 0 { + log.Printf("zero ts; sessID: %d, msgType: %d", sessionID, iter.Type()) + } else { + // Log ts of last processed message + counter.Update(sessionID, time.UnixMilli(ts)) + } + + // Write encoded message with index to session file + data := msg.EncodeWithIndex() + if err := writer.Write(sessionID, data); err != nil { + log.Printf("Writer error: %v\n", err) + } + + // [METRICS] Increase the number of written to the files messages and the message size + messageSize.Record(context.Background(), float64(len(data))) + savedMessages.Add(context.Background(), 1) + } }, false, cfg.MessageSizeLimit, diff --git a/backend/cmd/storage/main.go b/backend/cmd/storage/main.go index fcd3ec252..99236495e 100644 --- a/backend/cmd/storage/main.go +++ b/backend/cmd/storage/main.go @@ -2,8 +2,7 @@ package main import ( "log" - "openreplay/backend/pkg/failover" - "openreplay/backend/pkg/monitoring" + "openreplay/backend/pkg/queue/types" "os" "os/signal" "strconv" @@ -12,9 +11,10 @@ import ( config "openreplay/backend/internal/config/storage" "openreplay/backend/internal/storage" + "openreplay/backend/pkg/failover" "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/monitoring" "openreplay/backend/pkg/queue" - "openreplay/backend/pkg/queue/types" s3storage "openreplay/backend/pkg/storage" ) @@ -43,14 +43,16 @@ func main() { []string{ cfg.TopicTrigger, }, - func(sessionID uint64, msg messages.Message, meta *types.Meta) { - switch m := msg.(type) { - case *messages.SessionEnd: - if err := srv.UploadKey(strconv.FormatUint(sessionID, 10), 5); err != nil { - sessionFinder.Find(sessionID, m.Timestamp) + func(sessionID uint64, iter messages.Iterator, meta *types.Meta) { + for iter.Next() { + if iter.Type() == messages.MsgSessionEnd { + msg := iter.Message().Decode().(*messages.SessionEnd) + if err := srv.UploadKey(strconv.FormatUint(sessionID, 10), 5); err != nil { + sessionFinder.Find(sessionID, msg.Timestamp) + } + // Log timestamp of last processed session + counter.Update(sessionID, time.UnixMilli(meta.Timestamp)) } - // Log timestamp of last processed session - counter.Update(sessionID, time.UnixMilli(meta.Timestamp)) } }, true, diff --git a/backend/internal/http/player/player.go b/backend/internal/http/player/player.go new file mode 100644 index 000000000..9c4838941 --- /dev/null +++ b/backend/internal/http/player/player.go @@ -0,0 +1,55 @@ +package player + +import ( + "bufio" + "fmt" + "log" + "os" +) + +type request struct { + ts int64 + method string + url string + headers map[string][]string + body []byte +} + +type playerImpl struct { + // +} + +func (p playerImpl) LoadRecord(filePath string) error { + if filePath == "" { + return fmt.Errorf("file name is empty") + } + file, err := os.OpenFile(filePath, os.O_RDONLY, 0644) + if err != nil { + return fmt.Errorf("open file err: %s", err) + } + defer file.Close() + + sc := bufio.NewScanner(file) + for sc.Scan() { + line := sc.Text() + log.Println(line) + } + if err := sc.Err(); err != nil { + return fmt.Errorf("scan file error: %v", err) + } + return nil +} + +func (p playerImpl) PlayRecord(host string) error { + //TODO implement me + panic("implement me") +} + +type Player interface { + LoadRecord(filePath string) error + PlayRecord(host string) error +} + +func New() Player { + return &playerImpl{} +} diff --git a/backend/internal/http/player/player_test.go b/backend/internal/http/player/player_test.go new file mode 100644 index 000000000..0aa318095 --- /dev/null +++ b/backend/internal/http/player/player_test.go @@ -0,0 +1,10 @@ +package player + +import "testing" + +func TestPlayer(t *testing.T) { + player := New() + if err := player.LoadRecord("/Users/alexander/7048055123532800"); err != nil { + t.Logf("can't load session record: %s", err) + } +} diff --git a/backend/internal/http/recorder/recorder.go b/backend/internal/http/recorder/recorder.go new file mode 100644 index 000000000..e6a3ac0f7 --- /dev/null +++ b/backend/internal/http/recorder/recorder.go @@ -0,0 +1,94 @@ +package recorder + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + "os" + "strconv" + "sync" + "time" +) + +type recorderImpl struct { + sessions map[uint64]*sync.Mutex + sessionsDir string +} + +func (r *recorderImpl) SaveRequest(sessionID uint64, req *http.Request, body []byte) error { + pwd, _ := os.Getwd() + log.Printf("new request, pwd: %s", pwd) + // Hold mutex for session + if _, ok := r.sessions[sessionID]; !ok { + r.sessions[sessionID] = &sync.Mutex{} + } + r.sessions[sessionID].Lock() + // Release mutex for session on exit + defer r.sessions[sessionID].Unlock() + + // Open file + file, err := os.OpenFile(r.sessionsDir+strconv.FormatUint(sessionID, 10), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + return err + } + // Close file on exit + defer file.Close() + log.Printf("file name: %s", strconv.FormatUint(sessionID, 10)) + + // Save request info + /* + Record format: + - timestamp + - method + - url + - headers + - body + */ + if _, err := file.Write([]byte("\n")); err != nil { + log.Printf("can't write data to file: %s", err) + } + if _, err := file.Write([]byte(fmt.Sprintf("%d\n", time.Now().UnixMilli()))); err != nil { + log.Printf("can't write timestamp to file: %s", err) + } + if _, err := file.Write([]byte(fmt.Sprintf("%s\n", req.Method))); err != nil { + log.Printf("can't write method to file: %s", err) + } + if _, err := file.Write([]byte(fmt.Sprintf("%s\n", req.URL.Path))); err != nil { + log.Printf("can't write url to file: %s", err) + } + reqHeaders, err := json.Marshal(req.Header) + if err == nil { + if _, err := file.Write([]byte(fmt.Sprintf("%s\n", string(reqHeaders)))); err != nil { + log.Printf("can't write headers to file: %s", err) + } + } else { + log.Printf("can't marshal request headers: %s", err) + } + if _, err := file.Write([]byte(fmt.Sprintf("%s\n", string(body)))); err != nil { + log.Printf("can't write body to file: %s", err) + } + if _, err := file.Write([]byte("\n")); err != nil { + log.Printf("can't write data to file: %s", err) + } + + // Sync file changes + if err := file.Sync(); err != nil { + log.Printf("can't sync file: %s", err) + } + return nil +} + +type Recorder interface { + SaveRequest(sessionID uint64, req *http.Request, body []byte) error +} + +func New(dir string) Recorder { + if dir == "" { + dir = "./" + } + return &recorderImpl{ + sessions: make(map[uint64]*sync.Mutex), + sessionsDir: dir, + } +} diff --git a/backend/internal/sink/oswriter/oswriter.go b/backend/internal/sink/oswriter/oswriter.go index 839e61eba..4feb3e2aa 100644 --- a/backend/internal/sink/oswriter/oswriter.go +++ b/backend/internal/sink/oswriter/oswriter.go @@ -71,6 +71,7 @@ func (w *Writer) Write(key uint64, data []byte) error { if err != nil { return err } + // TODO: add check for the number of recorded bytes to file _, err = file.Write(data) return err } diff --git a/backend/pkg/db/clickhouse/batch.go b/backend/pkg/db/clickhouse/batch.go new file mode 100644 index 000000000..80fea7430 --- /dev/null +++ b/backend/pkg/db/clickhouse/batch.go @@ -0,0 +1,32 @@ +package clickhouse + +import "fmt" + +type batch struct { + size int + values [][]interface{} + oldValues [][]interface{} +} + +func NewBatch(size int) *batch { + return &batch{ + size: size, + values: make([][]interface{}, 0), + oldValues: make([][]interface{}, 0), + } +} + +func (b *batch) Append(v ...interface{}) error { + if len(v) != b.size { + return fmt.Errorf("wrong values set size, got: %d, waited: %d", len(v), b.size) + } + b.values = append(b.values, v) + return nil +} + +func (b *batch) Commit() error { + if len(b.oldValues) > 0 { + // + } + +} diff --git a/backend/pkg/db/clickhouse/connector.go b/backend/pkg/db/clickhouse/connector.go new file mode 100644 index 000000000..97fa54d0d --- /dev/null +++ b/backend/pkg/db/clickhouse/connector.go @@ -0,0 +1,401 @@ +package clickhouse + +import ( + "context" + "errors" + "fmt" + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "log" + "openreplay/backend/pkg/db/types" + "openreplay/backend/pkg/hashid" + "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/url" + "strings" + "time" + + "openreplay/backend/pkg/license" +) + +var CONTEXT_MAP = map[uint64]string{0: "unknown", 1: "self", 2: "same-origin-ancestor", 3: "same-origin-descendant", 4: "same-origin", 5: "cross-origin-ancestor", 6: "cross-origin-descendant", 7: "cross-origin-unreachable", 8: "multiple-contexts"} +var CONTAINER_TYPE_MAP = map[uint64]string{0: "window", 1: "iframe", 2: "embed", 3: "object"} + +type Connector interface { + Prepare() error + Commit() error + FinaliseSessionsTable() error + InsertWebSession(session *types.Session) error + InsertWebResourceEvent(session *types.Session, msg *messages.ResourceEvent) error + InsertWebPageEvent(session *types.Session, msg *messages.PageEvent) error + InsertWebClickEvent(session *types.Session, msg *messages.ClickEvent) error + InsertWebInputEvent(session *types.Session, msg *messages.InputEvent) error + InsertWebErrorEvent(session *types.Session, msg *messages.ErrorEvent) error + InsertWebPerformanceTrackAggr(session *types.Session, msg *messages.PerformanceTrackAggr) error + InsertLongtask(session *types.Session, msg *messages.LongTask) error +} + +type connectorImpl struct { + conn driver.Conn + batches map[string]driver.Batch +} + +func NewConnector(url string) Connector { + license.CheckLicense() + url = strings.TrimPrefix(url, "tcp://") + url = strings.TrimSuffix(url, "/default") + conn, err := clickhouse.Open(&clickhouse.Options{ + Addr: []string{url}, + Auth: clickhouse.Auth{ + Database: "default", + }, + MaxOpenConns: 20, + MaxIdleConns: 15, + ConnMaxLifetime: 3 * time.Minute, + Compression: &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }, + // Debug: true, + }) + if err != nil { + log.Fatal(err) + } + + c := &connectorImpl{ + conn: conn, + batches: make(map[string]driver.Batch, 9), + } + return c +} + +func (c *connectorImpl) newBatch(name, query string) error { + batch, err := c.conn.PrepareBatch(context.Background(), query) + if err != nil { + return fmt.Errorf("can't create new batch: %s", err) + } + if _, ok := c.batches[name]; ok { + delete(c.batches, name) + } + c.batches[name] = batch + return nil +} + +var batches = map[string]string{ + // Sessions table + "sessions": "INSERT INTO sessions (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, datetime, duration, pages_count, events_count, errors_count, user_browser, user_browser_version) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + "metadata": "INSERT INTO sessions_metadata (session_id, user_id, user_anonymous_id, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10, datetime) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + // Events table + "resources": "INSERT INTO resources (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, url, type, duration, ttfb, header_size, encoded_body_size, decoded_body_size, success) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + "pages": "INSERT INTO pages (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, url, request_start, response_start, response_end, dom_content_loaded_event_start, dom_content_loaded_event_end, load_event_start, load_event_end, first_paint, first_contentful_paint, speed_index, visually_complete, time_to_interactive) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + "clicks": "INSERT INTO clicks (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, label, hesitation_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + "inputs": "INSERT INTO inputs (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, label) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + "errors": "INSERT INTO errors (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, source, name, message, error_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + "performance": "INSERT INTO performance (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, min_fps, avg_fps, max_fps, min_cpu, avg_cpu, max_cpu, min_total_js_heap_size, avg_total_js_heap_size, max_total_js_heap_size, min_used_js_heap_size, avg_used_js_heap_size, max_used_js_heap_size) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + "longtasks": "INSERT INTO longtasks (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_browser, user_browser_version, user_device, user_device_type, user_country, datetime, context, container_type, container_id, container_name, container_src) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", +} + +func (c *connectorImpl) Prepare() error { + for table, query := range batches { + if err := c.newBatch(table, query); err != nil { + return fmt.Errorf("can't create %s batch: %s", table, err) + } + } + return nil +} + +func (c *connectorImpl) Commit() error { + for _, b := range c.batches { + if err := b.Send(); err != nil { + return fmt.Errorf("can't send batch: %s", err) + } + } + return nil +} + +func (c *connectorImpl) FinaliseSessionsTable() error { + if err := c.conn.Exec(context.Background(), "OPTIMIZE TABLE sessions FINAL"); err != nil { + return fmt.Errorf("can't finalise sessions table: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertWebSession(session *types.Session) error { + if session.Duration == nil { + return errors.New("trying to insert session with nil duration") + } + if err := c.batches["sessions"].Append( + session.SessionID, + session.ProjectID, + session.TrackerVersion, + nullableString(session.RevID), + session.UserUUID, + session.UserOS, + nullableString(session.UserOSVersion), + nullableString(session.UserDevice), + session.UserDeviceType, + session.UserCountry, + datetime(session.Timestamp), + uint32(*session.Duration), + uint16(session.PagesCount), + uint16(session.EventsCount), + uint16(session.ErrorsCount), + // Web unique columns + session.UserBrowser, + nullableString(session.UserBrowserVersion), + ); err != nil { + return fmt.Errorf("can't append to sessions batch: %s", err) + } + if err := c.batches["metadata"].Append( + session.SessionID, + session.UserID, + session.UserAnonymousID, + session.Metadata1, + session.Metadata2, + session.Metadata3, + session.Metadata4, + session.Metadata5, + session.Metadata6, + session.Metadata7, + session.Metadata8, + session.Metadata9, + session.Metadata10, + datetime(session.Timestamp), + ); err != nil { + return fmt.Errorf("can't append to metadata batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertWebResourceEvent(session *types.Session, msg *messages.ResourceEvent) error { + var method interface{} = url.EnsureMethod(msg.Method) + if method == "" { + method = nil + } + if err := c.batches["resources"].Append( + session.SessionID, + session.ProjectID, + session.TrackerVersion, + nullableString(session.RevID), + session.UserUUID, + session.UserOS, + nullableString(session.UserOSVersion), + session.UserBrowser, + nullableString(session.UserBrowserVersion), + nullableString(session.UserDevice), + session.UserDeviceType, + session.UserCountry, + datetime(msg.Timestamp), + url.DiscardURLQuery(msg.URL), + msg.Type, + nullableUint16(uint16(msg.Duration)), + nullableUint16(uint16(msg.TTFB)), + nullableUint16(uint16(msg.HeaderSize)), + nullableUint32(uint32(msg.EncodedBodySize)), + nullableUint32(uint32(msg.DecodedBodySize)), + msg.Success, + ); err != nil { + return fmt.Errorf("can't append to resources batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertWebPageEvent(session *types.Session, msg *messages.PageEvent) error { + if err := c.batches["pages"].Append( + session.SessionID, + session.ProjectID, + session.TrackerVersion, nullableString(session.RevID), + session.UserUUID, + session.UserOS, + nullableString(session.UserOSVersion), + session.UserBrowser, + nullableString(session.UserBrowserVersion), + nullableString(session.UserDevice), + session.UserDeviceType, + session.UserCountry, + datetime(msg.Timestamp), + url.DiscardURLQuery(msg.URL), + nullableUint16(uint16(msg.RequestStart)), + nullableUint16(uint16(msg.ResponseStart)), + nullableUint16(uint16(msg.ResponseEnd)), + nullableUint16(uint16(msg.DomContentLoadedEventStart)), + nullableUint16(uint16(msg.DomContentLoadedEventEnd)), + nullableUint16(uint16(msg.LoadEventStart)), + nullableUint16(uint16(msg.LoadEventEnd)), + nullableUint16(uint16(msg.FirstPaint)), + nullableUint16(uint16(msg.FirstContentfulPaint)), + nullableUint16(uint16(msg.SpeedIndex)), + nullableUint16(uint16(msg.VisuallyComplete)), + nullableUint16(uint16(msg.TimeToInteractive)), + ); err != nil { + return fmt.Errorf("can't append to pages batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertWebClickEvent(session *types.Session, msg *messages.ClickEvent) error { + if msg.Label == "" { + return nil + } + if err := c.batches["clicks"].Append( + session.SessionID, + session.ProjectID, + session.TrackerVersion, + nullableString(session.RevID), + session.UserUUID, + session.UserOS, + nullableString(session.UserOSVersion), + session.UserBrowser, + nullableString(session.UserBrowserVersion), + nullableString(session.UserDevice), + session.UserDeviceType, + session.UserCountry, + datetime(msg.Timestamp), + msg.Label, + nullableUint32(uint32(msg.HesitationTime)), + ); err != nil { + return fmt.Errorf("can't append to clicks batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertWebInputEvent(session *types.Session, msg *messages.InputEvent) error { + if msg.Label == "" { + return nil + } + if err := c.batches["inputs"].Append( + session.SessionID, + session.ProjectID, + session.TrackerVersion, + nullableString(session.RevID), + session.UserUUID, + session.UserOS, + nullableString(session.UserOSVersion), + session.UserBrowser, + nullableString(session.UserBrowserVersion), + nullableString(session.UserDevice), + session.UserDeviceType, + session.UserCountry, + datetime(msg.Timestamp), + msg.Label, + ); err != nil { + return fmt.Errorf("can't append to inputs batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertWebErrorEvent(session *types.Session, msg *messages.ErrorEvent) error { + if err := c.batches["errors"].Append( + session.SessionID, + session.ProjectID, + session.TrackerVersion, + nullableString(session.RevID), + session.UserUUID, + session.UserOS, + nullableString(session.UserOSVersion), + session.UserBrowser, + nullableString(session.UserBrowserVersion), + nullableString(session.UserDevice), + session.UserDeviceType, + session.UserCountry, + datetime(msg.Timestamp), + msg.Source, + nullableString(msg.Name), + msg.Message, + hashid.WebErrorID(session.ProjectID, msg), + ); err != nil { + return fmt.Errorf("can't append to errors batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertWebPerformanceTrackAggr(session *types.Session, msg *messages.PerformanceTrackAggr) error { + var timestamp uint64 = (msg.TimestampStart + msg.TimestampEnd) / 2 + if err := c.batches["performance"].Append( + session.SessionID, + session.ProjectID, + session.TrackerVersion, + nullableString(session.RevID), + session.UserUUID, + session.UserOS, + nullableString(session.UserOSVersion), + session.UserBrowser, + nullableString(session.UserBrowserVersion), + nullableString(session.UserDevice), + session.UserDeviceType, + session.UserCountry, + datetime(timestamp), + uint8(msg.MinFPS), + uint8(msg.AvgFPS), + uint8(msg.MaxFPS), + uint8(msg.MinCPU), + uint8(msg.AvgCPU), + uint8(msg.MaxCPU), + msg.MinTotalJSHeapSize, + msg.AvgTotalJSHeapSize, + msg.MaxTotalJSHeapSize, + msg.MinUsedJSHeapSize, + msg.AvgUsedJSHeapSize, + msg.MaxUsedJSHeapSize, + ); err != nil { + return fmt.Errorf("can't append to performance batch: %s", err) + } + return nil +} + +func (c *connectorImpl) InsertLongtask(session *types.Session, msg *messages.LongTask) error { + if err := c.batches["longtasks"].Append( + session.SessionID, + session.ProjectID, + session.TrackerVersion, + nullableString(session.RevID), + session.UserUUID, + session.UserOS, + nullableString(session.UserOSVersion), + session.UserBrowser, + nullableString(session.UserBrowserVersion), + nullableString(session.UserDevice), + session.UserDeviceType, + session.UserCountry, + datetime(msg.Timestamp), + CONTEXT_MAP[msg.Context], + CONTAINER_TYPE_MAP[msg.ContainerType], + msg.ContainerId, + msg.ContainerName, + msg.ContainerSrc, + ); err != nil { + return fmt.Errorf("can't append to longtasks batch: %s", err) + } + return nil +} + +func nullableUint16(v uint16) *uint16 { + var p *uint16 = nil + if v != 0 { + p = &v + } + return p +} + +func nullableUint32(v uint32) *uint32 { + var p *uint32 = nil + if v != 0 { + p = &v + } + return p +} + +func nullableString(v string) *string { + var p *string = nil + if v != "" { + p = &v + } + return p +} + +func datetime(timestamp uint64) time.Time { + t := time.Unix(int64(timestamp/1e3), 0) + // Temporal solution for not correct timestamps in performance messages + if t.Year() < 2022 && t.Year() > 2025 { + return time.Now() + } + return t +} diff --git a/backend/pkg/db/postgres/bulk_test.go b/backend/pkg/db/postgres/bulk_test.go new file mode 100644 index 000000000..4e7489684 --- /dev/null +++ b/backend/pkg/db/postgres/bulk_test.go @@ -0,0 +1,54 @@ +package postgres + +import ( + "fmt" + "github.com/jackc/pgx/v4" + "testing" +) + +type poolMock struct { + // +} + +func (p poolMock) Query(sql string, args ...interface{}) (pgx.Rows, error) { + return nil, nil +} + +func (p poolMock) QueryRow(sql string, args ...interface{}) pgx.Row { + return nil +} + +func (p poolMock) Exec(sql string, arguments ...interface{}) error { + fmt.Println(sql) + fmt.Println(arguments...) + return nil +} + +func (p poolMock) SendBatch(b *pgx.Batch) pgx.BatchResults { + return nil +} + +func (p poolMock) Begin() (*_Tx, error) { + return nil, nil +} + +func (p poolMock) Close() { +} + +func NewPoolMock() Pool { + return &poolMock{} +} + +func TestBulk(t *testing.T) { + conn := NewPoolMock() + bulk, err := NewBulk(conn, "autocomplete", "(value, type, project_id)", "($%d, $%d, $%d)", 3, 10) + if err != nil { + t.Errorf("can't create bulk: %s", err) + } + for i := 0; i < 10; i++ { + if err := bulk.Append(fmt.Sprintf("var1+%d", i), fmt.Sprintf("var2+%d", i), + i%2 == 0); err != nil { + t.Errorf("can't add new values to bulk: %s", err) + } + } +} diff --git a/backend/pkg/messages/batch.go b/backend/pkg/messages/batch.go index 6bd868cf1..f5d25d8a7 100644 --- a/backend/pkg/messages/batch.go +++ b/backend/pkg/messages/batch.go @@ -2,169 +2,155 @@ package messages import ( "bytes" - "fmt" - "github.com/pkg/errors" "io" "log" "strings" ) -// RawMessage is a not decoded message -type RawMessage struct { - tp uint64 - size uint64 - data []byte - meta *message +type Iterator interface { + Next() bool // Return true if we have next message + Type() int // Return type of the next message + Message() Message // Return raw or decoded message } -func (m *RawMessage) Encode() []byte { - return m.data +type iteratorImpl struct { + data *bytes.Reader + index uint64 + timestamp int64 + version uint64 + msgType uint64 + msgSize uint64 + canSkip bool + msg Message } -func (m *RawMessage) TypeID() int { - return int(m.tp) -} - -func (m *RawMessage) Meta() *message { - return m.meta -} - -func WriteSize(size uint64, buf []byte, p int) { - var m uint64 = 255 - for i := 0; i < 3; i++ { - buf[p+i] = byte(size & m) - size = size >> 8 +func NewIterator(data []byte) Iterator { + return &iteratorImpl{ + data: bytes.NewReader(data), } - fmt.Println(buf) } -func ReadSize(reader io.Reader) (uint64, error) { - buf := make([]byte, 3) - n, err := io.ReadFull(reader, buf) +func (i *iteratorImpl) Next() bool { + if i.canSkip { + log.Printf("skip message, type: %d, size: %d", i.msgType, i.msgSize) + if _, err := i.data.Seek(int64(i.msgSize), io.SeekCurrent); err != nil { + log.Printf("seek err: %s", err) + return false + } + } + i.canSkip = false + + var err error + i.msgType, err = ReadUint(i.data) if err != nil { - return 0, err + if err == io.EOF { + return false + } + log.Printf("can't read message type: %s", err) + return false } - if n != 3 { - return 0, fmt.Errorf("read only %d of 3 size bytes", n) + log.Printf("message type: %d", i.msgType) + + if i.version > 0 && messageHasSize(i.msgType) { + // Read message size if it is a new protocol version + i.msgSize, err = ReadSize(i.data) + if err != nil { + log.Printf("can't read message size: %s", err) + return false + } + log.Println("message size:", i.msgSize) + i.msg = &RawMessage{ + tp: i.msgType, + size: i.msgSize, + meta: &message{}, + reader: i.data, + skipped: &i.canSkip, + } + i.canSkip = true + } else { + i.msg, err = ReadMessage(i.msgType, i.data) + if err == io.EOF { + return false + } else if err != nil { + if strings.HasPrefix(err.Error(), "Unknown message code:") { + code := strings.TrimPrefix(err.Error(), "Unknown message code: ") + i.msg, err = DecodeExtraMessage(code, i.data) + if err != nil { + log.Printf("can't decode msg: %s", err) + return false + } + } else { + log.Printf("Batch Message decoding error on message with index %v, err: %s", i.index, err) + return false + } + } + i.msg = transformDeprecated(i.msg) } - var size uint64 - for i, b := range buf { - size += uint64(b) << (8 * i) + + // Process meta information + isBatchMeta := false + switch i.msgType { + case MsgBatchMetadata: + if i.index != 0 { // Might be several 0-0 BatchMeta in a row without an error though + log.Printf("Batch Meta found at the end of the batch") + return false + } + m := i.msg.Decode().(*BatchMetadata) + i.index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha) + i.timestamp = m.Timestamp + i.version = m.Version + isBatchMeta = true + log.Printf("new batch version: %d", i.version) + + case MsgBatchMeta: // Is not required to be present in batch since IOS doesn't have it (though we might change it) + if i.index != 0 { // Might be several 0-0 BatchMeta in a row without an error though + log.Printf("Batch Meta found at the end of the batch") + return false + } + m := i.msg.Decode().(*BatchMeta) + i.index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha) + i.timestamp = m.Timestamp + isBatchMeta = true + // continue readLoop + case MsgIOSBatchMeta: + if i.index != 0 { // Might be several 0-0 BatchMeta in a row without an error though + log.Printf("Batch Meta found at the end of the batch") + return false + } + m := i.msg.Decode().(*IOSBatchMeta) + i.index = m.FirstIndex + i.timestamp = int64(m.Timestamp) + isBatchMeta = true + // continue readLoop + case MsgTimestamp: + m := i.msg.Decode().(*Timestamp) + i.timestamp = int64(m.Timestamp) + // No skipping here for making it easy to encode back the same sequence of message + // continue readLoop + case MsgSessionStart: + m := i.msg.Decode().(*SessionStart) + i.timestamp = int64(m.Timestamp) + case MsgSessionEnd: + m := i.msg.Decode().(*SessionEnd) + i.timestamp = int64(m.Timestamp) } - return size, nil + i.msg.Meta().Index = i.index + i.msg.Meta().Timestamp = i.timestamp + + if !isBatchMeta { // Without that indexes will be unique anyway, though shifted by 1 because BatchMeta is not counted in tracker + i.index++ + } + return true +} + +func (i *iteratorImpl) Type() int { + return int(i.msgType) +} + +func (i *iteratorImpl) Message() Message { + return i.msg } func messageHasSize(msgType uint64) bool { return !(msgType == 80 || msgType == 81 || msgType == 82) } - -func ReadBatchReader(reader io.Reader, messageHandler func(Message)) error { - var ( - index uint64 - timestamp int64 - version uint64 - ) - log.Println("new batch") - - for { - // Read message type - msgType, err := ReadUint(reader) - if err != nil { - if err == io.EOF { - return nil - } - return fmt.Errorf("can't read message type: %s", err) - } - log.Printf("message type: %d", msgType) - - var msg Message - if version > 0 && messageHasSize(msgType) { - // Read message size if it's new protocol version - msgSize, err := ReadSize(reader) - if err != nil { - return fmt.Errorf("can't read message size: %s", err) - } - // Read raw message (bytes) - log.Println("size:", msgSize) - buf := make([]byte, msgSize) - _, err = io.ReadFull(reader, buf) - if err != nil { - return fmt.Errorf("can't read raw body: %s", err) - } - // Create message object - msg = &RawMessage{ - tp: msgType, - size: msgSize, - data: buf, - meta: &message{}, - } - // Temp code - msg, err = ReadMessage(msgType, bytes.NewReader(buf)) - if err != nil { - return err - } - } else { - msg, err = ReadMessage(msgType, reader) - if err == io.EOF { - return nil - } else if err != nil { - if strings.HasPrefix(err.Error(), "Unknown message code:") { - code := strings.TrimPrefix(err.Error(), "Unknown message code: ") - msg, err = DecodeExtraMessage(code, reader) - if err != nil { - return fmt.Errorf("can't decode msg: %s", err) - } - } else { - return errors.Wrapf(err, "Batch Message decoding error on message with index %v", index) - } - } - msg = transformDeprecated(msg) - } - - isBatchMeta := false - switch m := msg.(type) { - case *BatchMetadata: - if index != 0 { // Might be several 0-0 BatchMeta in a row without a error though - return errors.New("Batch Meta found at the end of the batch") - } - index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha) - timestamp = m.Timestamp - version = m.Version - isBatchMeta = true - log.Printf("new batch version: %d", version) - - case *BatchMeta: // Is not required to be present in batch since IOS doesn't have it (though we might change it) - if index != 0 { // Might be several 0-0 BatchMeta in a row without a error though - return errors.New("Batch Meta found at the end of the batch") - } - index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha) - timestamp = m.Timestamp - isBatchMeta = true - // continue readLoop - case *IOSBatchMeta: - if index != 0 { // Might be several 0-0 BatchMeta in a row without a error though - return errors.New("Batch Meta found at the end of the batch") - } - index = m.FirstIndex - timestamp = int64(m.Timestamp) - isBatchMeta = true - // continue readLoop - case *Timestamp: - timestamp = int64(m.Timestamp) // TODO(?): replace timestamp type to int64 everywhere (including encoding part in tracker) - // No skipping here for making it easy to encode back the same sequence of message - // continue readLoop - case *SessionStart: - timestamp = int64(m.Timestamp) - case *SessionEnd: - timestamp = int64(m.Timestamp) - } - msg.Meta().Index = index - msg.Meta().Timestamp = timestamp - - messageHandler(msg) - if !isBatchMeta { // Without that indexes will be unique anyway, though shifted by 1 because BatchMeta is not counted in tracker - index++ - } - } - return errors.New("Error of the codeflow. (Should return on EOF)") -} diff --git a/backend/pkg/messages/extra.go b/backend/pkg/messages/extra.go index 1691d905f..b2a57e2ad 100644 --- a/backend/pkg/messages/extra.go +++ b/backend/pkg/messages/extra.go @@ -1,6 +1,7 @@ package messages import ( + "encoding/binary" "fmt" "io" ) @@ -20,6 +21,21 @@ func (msg *SessionSearch) Encode() []byte { return buf[:p] } +func (msg *SessionSearch) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *SessionSearch) Decode() Message { + return msg +} + func (msg *SessionSearch) TypeID() int { return 127 } diff --git a/backend/pkg/messages/message.go b/backend/pkg/messages/message.go index c4066c225..b479648b9 100644 --- a/backend/pkg/messages/message.go +++ b/backend/pkg/messages/message.go @@ -16,6 +16,8 @@ func (m *message) SetMeta(origin *message) { type Message interface { Encode() []byte + EncodeWithIndex() []byte + Decode() Message TypeID() int Meta() *message } diff --git a/backend/pkg/messages/messages.go b/backend/pkg/messages/messages.go index 8dea5c26f..a691cd8d1 100644 --- a/backend/pkg/messages/messages.go +++ b/backend/pkg/messages/messages.go @@ -1,6 +1,188 @@ // Auto-generated, do not edit package messages +import "encoding/binary" + +const ( + MsgBatchMeta = 80 + + MsgBatchMetadata = 81 + + MsgPartitionedMessage = 82 + + MsgTimestamp = 0 + + MsgSessionStart = 1 + + MsgSessionDisconnect = 2 + + MsgSessionEnd = 3 + + MsgSetPageLocation = 4 + + MsgSetViewportSize = 5 + + MsgSetViewportScroll = 6 + + MsgCreateDocument = 7 + + MsgCreateElementNode = 8 + + MsgCreateTextNode = 9 + + MsgMoveNode = 10 + + MsgRemoveNode = 11 + + MsgSetNodeAttribute = 12 + + MsgRemoveNodeAttribute = 13 + + MsgSetNodeData = 14 + + MsgSetCSSData = 15 + + MsgSetNodeScroll = 16 + + MsgSetInputTarget = 17 + + MsgSetInputValue = 18 + + MsgSetInputChecked = 19 + + MsgMouseMove = 20 + + MsgMouseClickDepricated = 21 + + MsgConsoleLog = 22 + + MsgPageLoadTiming = 23 + + MsgPageRenderTiming = 24 + + MsgJSException = 25 + + MsgIntegrationEvent = 26 + + MsgRawCustomEvent = 27 + + MsgUserID = 28 + + MsgUserAnonymousID = 29 + + MsgMetadata = 30 + + MsgPageEvent = 31 + + MsgInputEvent = 32 + + MsgClickEvent = 33 + + MsgErrorEvent = 34 + + MsgResourceEvent = 35 + + MsgCustomEvent = 36 + + MsgCSSInsertRule = 37 + + MsgCSSDeleteRule = 38 + + MsgFetch = 39 + + MsgProfiler = 40 + + MsgOTable = 41 + + MsgStateAction = 42 + + MsgStateActionEvent = 43 + + MsgRedux = 44 + + MsgVuex = 45 + + MsgMobX = 46 + + MsgNgRx = 47 + + MsgGraphQL = 48 + + MsgPerformanceTrack = 49 + + MsgGraphQLEvent = 50 + + MsgFetchEvent = 51 + + MsgDOMDrop = 52 + + MsgResourceTiming = 53 + + MsgConnectionInformation = 54 + + MsgSetPageVisibility = 55 + + MsgPerformanceTrackAggr = 56 + + MsgLongTask = 59 + + MsgSetNodeAttributeURLBased = 60 + + MsgSetCSSDataURLBased = 61 + + MsgIssueEvent = 62 + + MsgTechnicalInfo = 63 + + MsgCustomIssue = 64 + + MsgAssetCache = 66 + + MsgCSSInsertRuleURLBased = 67 + + MsgMouseClick = 69 + + MsgCreateIFrameDocument = 70 + + MsgIOSBatchMeta = 107 + + MsgIOSSessionStart = 90 + + MsgIOSSessionEnd = 91 + + MsgIOSMetadata = 92 + + MsgIOSCustomEvent = 93 + + MsgIOSUserID = 94 + + MsgIOSUserAnonymousID = 95 + + MsgIOSScreenChanges = 96 + + MsgIOSCrash = 97 + + MsgIOSScreenEnter = 98 + + MsgIOSScreenLeave = 99 + + MsgIOSClickEvent = 100 + + MsgIOSInputEvent = 101 + + MsgIOSPerformanceEvent = 102 + + MsgIOSLog = 103 + + MsgIOSInternalError = 104 + + MsgIOSNetworkCall = 105 + + MsgIOSPerformanceAggregated = 110 + + MsgIOSIssueEvent = 111 +) + type BatchMeta struct { message PageNo uint64 @@ -18,6 +200,21 @@ func (msg *BatchMeta) Encode() []byte { return buf[:p] } +func (msg *BatchMeta) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *BatchMeta) Decode() Message { + return msg +} + func (msg *BatchMeta) TypeID() int { return 80 } @@ -43,6 +240,21 @@ func (msg *BatchMetadata) Encode() []byte { return buf[:p] } +func (msg *BatchMetadata) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *BatchMetadata) Decode() Message { + return msg +} + func (msg *BatchMetadata) TypeID() int { return 81 } @@ -62,6 +274,21 @@ func (msg *PartitionedMessage) Encode() []byte { return buf[:p] } +func (msg *PartitionedMessage) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *PartitionedMessage) Decode() Message { + return msg +} + func (msg *PartitionedMessage) TypeID() int { return 82 } @@ -79,6 +306,21 @@ func (msg *Timestamp) Encode() []byte { return buf[:p] } +func (msg *Timestamp) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *Timestamp) Decode() Message { + return msg +} + func (msg *Timestamp) TypeID() int { return 0 } @@ -126,6 +368,21 @@ func (msg *SessionStart) Encode() []byte { return buf[:p] } +func (msg *SessionStart) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *SessionStart) Decode() Message { + return msg +} + func (msg *SessionStart) TypeID() int { return 1 } @@ -143,6 +400,21 @@ func (msg *SessionDisconnect) Encode() []byte { return buf[:p] } +func (msg *SessionDisconnect) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *SessionDisconnect) Decode() Message { + return msg +} + func (msg *SessionDisconnect) TypeID() int { return 2 } @@ -160,6 +432,21 @@ func (msg *SessionEnd) Encode() []byte { return buf[:p] } +func (msg *SessionEnd) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *SessionEnd) Decode() Message { + return msg +} + func (msg *SessionEnd) TypeID() int { return 3 } @@ -181,6 +468,21 @@ func (msg *SetPageLocation) Encode() []byte { return buf[:p] } +func (msg *SetPageLocation) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *SetPageLocation) Decode() Message { + return msg +} + func (msg *SetPageLocation) TypeID() int { return 4 } @@ -200,6 +502,21 @@ func (msg *SetViewportSize) Encode() []byte { return buf[:p] } +func (msg *SetViewportSize) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *SetViewportSize) Decode() Message { + return msg +} + func (msg *SetViewportSize) TypeID() int { return 5 } @@ -219,6 +536,21 @@ func (msg *SetViewportScroll) Encode() []byte { return buf[:p] } +func (msg *SetViewportScroll) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *SetViewportScroll) Decode() Message { + return msg +} + func (msg *SetViewportScroll) TypeID() int { return 6 } @@ -235,6 +567,21 @@ func (msg *CreateDocument) Encode() []byte { return buf[:p] } +func (msg *CreateDocument) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *CreateDocument) Decode() Message { + return msg +} + func (msg *CreateDocument) TypeID() int { return 7 } @@ -260,6 +607,21 @@ func (msg *CreateElementNode) Encode() []byte { return buf[:p] } +func (msg *CreateElementNode) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *CreateElementNode) Decode() Message { + return msg +} + func (msg *CreateElementNode) TypeID() int { return 8 } @@ -281,6 +643,21 @@ func (msg *CreateTextNode) Encode() []byte { return buf[:p] } +func (msg *CreateTextNode) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *CreateTextNode) Decode() Message { + return msg +} + func (msg *CreateTextNode) TypeID() int { return 9 } @@ -302,6 +679,21 @@ func (msg *MoveNode) Encode() []byte { return buf[:p] } +func (msg *MoveNode) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *MoveNode) Decode() Message { + return msg +} + func (msg *MoveNode) TypeID() int { return 10 } @@ -319,6 +711,21 @@ func (msg *RemoveNode) Encode() []byte { return buf[:p] } +func (msg *RemoveNode) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *RemoveNode) Decode() Message { + return msg +} + func (msg *RemoveNode) TypeID() int { return 11 } @@ -340,6 +747,21 @@ func (msg *SetNodeAttribute) Encode() []byte { return buf[:p] } +func (msg *SetNodeAttribute) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *SetNodeAttribute) Decode() Message { + return msg +} + func (msg *SetNodeAttribute) TypeID() int { return 12 } @@ -359,6 +781,21 @@ func (msg *RemoveNodeAttribute) Encode() []byte { return buf[:p] } +func (msg *RemoveNodeAttribute) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *RemoveNodeAttribute) Decode() Message { + return msg +} + func (msg *RemoveNodeAttribute) TypeID() int { return 13 } @@ -378,6 +815,21 @@ func (msg *SetNodeData) Encode() []byte { return buf[:p] } +func (msg *SetNodeData) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *SetNodeData) Decode() Message { + return msg +} + func (msg *SetNodeData) TypeID() int { return 14 } @@ -397,6 +849,21 @@ func (msg *SetCSSData) Encode() []byte { return buf[:p] } +func (msg *SetCSSData) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *SetCSSData) Decode() Message { + return msg +} + func (msg *SetCSSData) TypeID() int { return 15 } @@ -418,6 +885,21 @@ func (msg *SetNodeScroll) Encode() []byte { return buf[:p] } +func (msg *SetNodeScroll) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *SetNodeScroll) Decode() Message { + return msg +} + func (msg *SetNodeScroll) TypeID() int { return 16 } @@ -437,6 +919,21 @@ func (msg *SetInputTarget) Encode() []byte { return buf[:p] } +func (msg *SetInputTarget) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *SetInputTarget) Decode() Message { + return msg +} + func (msg *SetInputTarget) TypeID() int { return 17 } @@ -458,6 +955,21 @@ func (msg *SetInputValue) Encode() []byte { return buf[:p] } +func (msg *SetInputValue) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *SetInputValue) Decode() Message { + return msg +} + func (msg *SetInputValue) TypeID() int { return 18 } @@ -477,6 +989,21 @@ func (msg *SetInputChecked) Encode() []byte { return buf[:p] } +func (msg *SetInputChecked) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *SetInputChecked) Decode() Message { + return msg +} + func (msg *SetInputChecked) TypeID() int { return 19 } @@ -496,6 +1023,21 @@ func (msg *MouseMove) Encode() []byte { return buf[:p] } +func (msg *MouseMove) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *MouseMove) Decode() Message { + return msg +} + func (msg *MouseMove) TypeID() int { return 20 } @@ -517,6 +1059,21 @@ func (msg *MouseClickDepricated) Encode() []byte { return buf[:p] } +func (msg *MouseClickDepricated) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *MouseClickDepricated) Decode() Message { + return msg +} + func (msg *MouseClickDepricated) TypeID() int { return 21 } @@ -536,6 +1093,21 @@ func (msg *ConsoleLog) Encode() []byte { return buf[:p] } +func (msg *ConsoleLog) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *ConsoleLog) Decode() Message { + return msg +} + func (msg *ConsoleLog) TypeID() int { return 22 } @@ -569,6 +1141,21 @@ func (msg *PageLoadTiming) Encode() []byte { return buf[:p] } +func (msg *PageLoadTiming) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *PageLoadTiming) Decode() Message { + return msg +} + func (msg *PageLoadTiming) TypeID() int { return 23 } @@ -590,6 +1177,21 @@ func (msg *PageRenderTiming) Encode() []byte { return buf[:p] } +func (msg *PageRenderTiming) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *PageRenderTiming) Decode() Message { + return msg +} + func (msg *PageRenderTiming) TypeID() int { return 24 } @@ -611,6 +1213,21 @@ func (msg *JSException) Encode() []byte { return buf[:p] } +func (msg *JSException) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *JSException) Decode() Message { + return msg +} + func (msg *JSException) TypeID() int { return 25 } @@ -636,6 +1253,21 @@ func (msg *IntegrationEvent) Encode() []byte { return buf[:p] } +func (msg *IntegrationEvent) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *IntegrationEvent) Decode() Message { + return msg +} + func (msg *IntegrationEvent) TypeID() int { return 26 } @@ -655,6 +1287,21 @@ func (msg *RawCustomEvent) Encode() []byte { return buf[:p] } +func (msg *RawCustomEvent) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *RawCustomEvent) Decode() Message { + return msg +} + func (msg *RawCustomEvent) TypeID() int { return 27 } @@ -672,6 +1319,21 @@ func (msg *UserID) Encode() []byte { return buf[:p] } +func (msg *UserID) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *UserID) Decode() Message { + return msg +} + func (msg *UserID) TypeID() int { return 28 } @@ -689,6 +1351,21 @@ func (msg *UserAnonymousID) Encode() []byte { return buf[:p] } +func (msg *UserAnonymousID) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *UserAnonymousID) Decode() Message { + return msg +} + func (msg *UserAnonymousID) TypeID() int { return 29 } @@ -708,6 +1385,21 @@ func (msg *Metadata) Encode() []byte { return buf[:p] } +func (msg *Metadata) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *Metadata) Decode() Message { + return msg +} + func (msg *Metadata) TypeID() int { return 30 } @@ -757,6 +1449,21 @@ func (msg *PageEvent) Encode() []byte { return buf[:p] } +func (msg *PageEvent) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *PageEvent) Decode() Message { + return msg +} + func (msg *PageEvent) TypeID() int { return 31 } @@ -782,6 +1489,21 @@ func (msg *InputEvent) Encode() []byte { return buf[:p] } +func (msg *InputEvent) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *InputEvent) Decode() Message { + return msg +} + func (msg *InputEvent) TypeID() int { return 32 } @@ -807,6 +1529,21 @@ func (msg *ClickEvent) Encode() []byte { return buf[:p] } +func (msg *ClickEvent) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *ClickEvent) Decode() Message { + return msg +} + func (msg *ClickEvent) TypeID() int { return 33 } @@ -834,6 +1571,21 @@ func (msg *ErrorEvent) Encode() []byte { return buf[:p] } +func (msg *ErrorEvent) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *ErrorEvent) Decode() Message { + return msg +} + func (msg *ErrorEvent) TypeID() int { return 34 } @@ -873,6 +1625,21 @@ func (msg *ResourceEvent) Encode() []byte { return buf[:p] } +func (msg *ResourceEvent) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *ResourceEvent) Decode() Message { + return msg +} + func (msg *ResourceEvent) TypeID() int { return 35 } @@ -896,6 +1663,21 @@ func (msg *CustomEvent) Encode() []byte { return buf[:p] } +func (msg *CustomEvent) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *CustomEvent) Decode() Message { + return msg +} + func (msg *CustomEvent) TypeID() int { return 36 } @@ -917,6 +1699,21 @@ func (msg *CSSInsertRule) Encode() []byte { return buf[:p] } +func (msg *CSSInsertRule) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *CSSInsertRule) Decode() Message { + return msg +} + func (msg *CSSInsertRule) TypeID() int { return 37 } @@ -936,6 +1733,21 @@ func (msg *CSSDeleteRule) Encode() []byte { return buf[:p] } +func (msg *CSSDeleteRule) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *CSSDeleteRule) Decode() Message { + return msg +} + func (msg *CSSDeleteRule) TypeID() int { return 38 } @@ -965,6 +1777,21 @@ func (msg *Fetch) Encode() []byte { return buf[:p] } +func (msg *Fetch) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *Fetch) Decode() Message { + return msg +} + func (msg *Fetch) TypeID() int { return 39 } @@ -988,6 +1815,21 @@ func (msg *Profiler) Encode() []byte { return buf[:p] } +func (msg *Profiler) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *Profiler) Decode() Message { + return msg +} + func (msg *Profiler) TypeID() int { return 40 } @@ -1007,6 +1849,21 @@ func (msg *OTable) Encode() []byte { return buf[:p] } +func (msg *OTable) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *OTable) Decode() Message { + return msg +} + func (msg *OTable) TypeID() int { return 41 } @@ -1024,6 +1881,21 @@ func (msg *StateAction) Encode() []byte { return buf[:p] } +func (msg *StateAction) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *StateAction) Decode() Message { + return msg +} + func (msg *StateAction) TypeID() int { return 42 } @@ -1045,6 +1917,21 @@ func (msg *StateActionEvent) Encode() []byte { return buf[:p] } +func (msg *StateActionEvent) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *StateActionEvent) Decode() Message { + return msg +} + func (msg *StateActionEvent) TypeID() int { return 43 } @@ -1066,6 +1953,21 @@ func (msg *Redux) Encode() []byte { return buf[:p] } +func (msg *Redux) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *Redux) Decode() Message { + return msg +} + func (msg *Redux) TypeID() int { return 44 } @@ -1085,6 +1987,21 @@ func (msg *Vuex) Encode() []byte { return buf[:p] } +func (msg *Vuex) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *Vuex) Decode() Message { + return msg +} + func (msg *Vuex) TypeID() int { return 45 } @@ -1104,6 +2021,21 @@ func (msg *MobX) Encode() []byte { return buf[:p] } +func (msg *MobX) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *MobX) Decode() Message { + return msg +} + func (msg *MobX) TypeID() int { return 46 } @@ -1125,6 +2057,21 @@ func (msg *NgRx) Encode() []byte { return buf[:p] } +func (msg *NgRx) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *NgRx) Decode() Message { + return msg +} + func (msg *NgRx) TypeID() int { return 47 } @@ -1148,6 +2095,21 @@ func (msg *GraphQL) Encode() []byte { return buf[:p] } +func (msg *GraphQL) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *GraphQL) Decode() Message { + return msg +} + func (msg *GraphQL) TypeID() int { return 48 } @@ -1171,6 +2133,21 @@ func (msg *PerformanceTrack) Encode() []byte { return buf[:p] } +func (msg *PerformanceTrack) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *PerformanceTrack) Decode() Message { + return msg +} + func (msg *PerformanceTrack) TypeID() int { return 49 } @@ -1198,6 +2175,21 @@ func (msg *GraphQLEvent) Encode() []byte { return buf[:p] } +func (msg *GraphQLEvent) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *GraphQLEvent) Decode() Message { + return msg +} + func (msg *GraphQLEvent) TypeID() int { return 50 } @@ -1229,6 +2221,21 @@ func (msg *FetchEvent) Encode() []byte { return buf[:p] } +func (msg *FetchEvent) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *FetchEvent) Decode() Message { + return msg +} + func (msg *FetchEvent) TypeID() int { return 51 } @@ -1246,6 +2253,21 @@ func (msg *DOMDrop) Encode() []byte { return buf[:p] } +func (msg *DOMDrop) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *DOMDrop) Decode() Message { + return msg +} + func (msg *DOMDrop) TypeID() int { return 52 } @@ -1277,6 +2299,21 @@ func (msg *ResourceTiming) Encode() []byte { return buf[:p] } +func (msg *ResourceTiming) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *ResourceTiming) Decode() Message { + return msg +} + func (msg *ResourceTiming) TypeID() int { return 53 } @@ -1296,6 +2333,21 @@ func (msg *ConnectionInformation) Encode() []byte { return buf[:p] } +func (msg *ConnectionInformation) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *ConnectionInformation) Decode() Message { + return msg +} + func (msg *ConnectionInformation) TypeID() int { return 54 } @@ -1313,6 +2365,21 @@ func (msg *SetPageVisibility) Encode() []byte { return buf[:p] } +func (msg *SetPageVisibility) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *SetPageVisibility) Decode() Message { + return msg +} + func (msg *SetPageVisibility) TypeID() int { return 55 } @@ -1356,6 +2423,21 @@ func (msg *PerformanceTrackAggr) Encode() []byte { return buf[:p] } +func (msg *PerformanceTrackAggr) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *PerformanceTrackAggr) Decode() Message { + return msg +} + func (msg *PerformanceTrackAggr) TypeID() int { return 56 } @@ -1385,6 +2467,21 @@ func (msg *LongTask) Encode() []byte { return buf[:p] } +func (msg *LongTask) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *LongTask) Decode() Message { + return msg +} + func (msg *LongTask) TypeID() int { return 59 } @@ -1408,6 +2505,21 @@ func (msg *SetNodeAttributeURLBased) Encode() []byte { return buf[:p] } +func (msg *SetNodeAttributeURLBased) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *SetNodeAttributeURLBased) Decode() Message { + return msg +} + func (msg *SetNodeAttributeURLBased) TypeID() int { return 60 } @@ -1429,6 +2541,21 @@ func (msg *SetCSSDataURLBased) Encode() []byte { return buf[:p] } +func (msg *SetCSSDataURLBased) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *SetCSSDataURLBased) Decode() Message { + return msg +} + func (msg *SetCSSDataURLBased) TypeID() int { return 61 } @@ -1456,6 +2583,21 @@ func (msg *IssueEvent) Encode() []byte { return buf[:p] } +func (msg *IssueEvent) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *IssueEvent) Decode() Message { + return msg +} + func (msg *IssueEvent) TypeID() int { return 62 } @@ -1475,6 +2617,21 @@ func (msg *TechnicalInfo) Encode() []byte { return buf[:p] } +func (msg *TechnicalInfo) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *TechnicalInfo) Decode() Message { + return msg +} + func (msg *TechnicalInfo) TypeID() int { return 63 } @@ -1494,6 +2651,21 @@ func (msg *CustomIssue) Encode() []byte { return buf[:p] } +func (msg *CustomIssue) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *CustomIssue) Decode() Message { + return msg +} + func (msg *CustomIssue) TypeID() int { return 64 } @@ -1511,6 +2683,21 @@ func (msg *AssetCache) Encode() []byte { return buf[:p] } +func (msg *AssetCache) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *AssetCache) Decode() Message { + return msg +} + func (msg *AssetCache) TypeID() int { return 66 } @@ -1534,6 +2721,21 @@ func (msg *CSSInsertRuleURLBased) Encode() []byte { return buf[:p] } +func (msg *CSSInsertRuleURLBased) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *CSSInsertRuleURLBased) Decode() Message { + return msg +} + func (msg *CSSInsertRuleURLBased) TypeID() int { return 67 } @@ -1557,6 +2759,21 @@ func (msg *MouseClick) Encode() []byte { return buf[:p] } +func (msg *MouseClick) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *MouseClick) Decode() Message { + return msg +} + func (msg *MouseClick) TypeID() int { return 69 } @@ -1576,6 +2793,21 @@ func (msg *CreateIFrameDocument) Encode() []byte { return buf[:p] } +func (msg *CreateIFrameDocument) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *CreateIFrameDocument) Decode() Message { + return msg +} + func (msg *CreateIFrameDocument) TypeID() int { return 70 } @@ -1597,6 +2829,21 @@ func (msg *IOSBatchMeta) Encode() []byte { return buf[:p] } +func (msg *IOSBatchMeta) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *IOSBatchMeta) Decode() Message { + return msg +} + func (msg *IOSBatchMeta) TypeID() int { return 107 } @@ -1632,6 +2879,21 @@ func (msg *IOSSessionStart) Encode() []byte { return buf[:p] } +func (msg *IOSSessionStart) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *IOSSessionStart) Decode() Message { + return msg +} + func (msg *IOSSessionStart) TypeID() int { return 90 } @@ -1649,6 +2911,21 @@ func (msg *IOSSessionEnd) Encode() []byte { return buf[:p] } +func (msg *IOSSessionEnd) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *IOSSessionEnd) Decode() Message { + return msg +} + func (msg *IOSSessionEnd) TypeID() int { return 91 } @@ -1672,6 +2949,21 @@ func (msg *IOSMetadata) Encode() []byte { return buf[:p] } +func (msg *IOSMetadata) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *IOSMetadata) Decode() Message { + return msg +} + func (msg *IOSMetadata) TypeID() int { return 92 } @@ -1695,6 +2987,21 @@ func (msg *IOSCustomEvent) Encode() []byte { return buf[:p] } +func (msg *IOSCustomEvent) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *IOSCustomEvent) Decode() Message { + return msg +} + func (msg *IOSCustomEvent) TypeID() int { return 93 } @@ -1716,6 +3023,21 @@ func (msg *IOSUserID) Encode() []byte { return buf[:p] } +func (msg *IOSUserID) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *IOSUserID) Decode() Message { + return msg +} + func (msg *IOSUserID) TypeID() int { return 94 } @@ -1737,6 +3059,21 @@ func (msg *IOSUserAnonymousID) Encode() []byte { return buf[:p] } +func (msg *IOSUserAnonymousID) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *IOSUserAnonymousID) Decode() Message { + return msg +} + func (msg *IOSUserAnonymousID) TypeID() int { return 95 } @@ -1764,6 +3101,21 @@ func (msg *IOSScreenChanges) Encode() []byte { return buf[:p] } +func (msg *IOSScreenChanges) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *IOSScreenChanges) Decode() Message { + return msg +} + func (msg *IOSScreenChanges) TypeID() int { return 96 } @@ -1789,6 +3141,21 @@ func (msg *IOSCrash) Encode() []byte { return buf[:p] } +func (msg *IOSCrash) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *IOSCrash) Decode() Message { + return msg +} + func (msg *IOSCrash) TypeID() int { return 97 } @@ -1812,6 +3179,21 @@ func (msg *IOSScreenEnter) Encode() []byte { return buf[:p] } +func (msg *IOSScreenEnter) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *IOSScreenEnter) Decode() Message { + return msg +} + func (msg *IOSScreenEnter) TypeID() int { return 98 } @@ -1835,6 +3217,21 @@ func (msg *IOSScreenLeave) Encode() []byte { return buf[:p] } +func (msg *IOSScreenLeave) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *IOSScreenLeave) Decode() Message { + return msg +} + func (msg *IOSScreenLeave) TypeID() int { return 99 } @@ -1860,6 +3257,21 @@ func (msg *IOSClickEvent) Encode() []byte { return buf[:p] } +func (msg *IOSClickEvent) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *IOSClickEvent) Decode() Message { + return msg +} + func (msg *IOSClickEvent) TypeID() int { return 100 } @@ -1885,6 +3297,21 @@ func (msg *IOSInputEvent) Encode() []byte { return buf[:p] } +func (msg *IOSInputEvent) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *IOSInputEvent) Decode() Message { + return msg +} + func (msg *IOSInputEvent) TypeID() int { return 101 } @@ -1908,6 +3335,21 @@ func (msg *IOSPerformanceEvent) Encode() []byte { return buf[:p] } +func (msg *IOSPerformanceEvent) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *IOSPerformanceEvent) Decode() Message { + return msg +} + func (msg *IOSPerformanceEvent) TypeID() int { return 102 } @@ -1931,6 +3373,21 @@ func (msg *IOSLog) Encode() []byte { return buf[:p] } +func (msg *IOSLog) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *IOSLog) Decode() Message { + return msg +} + func (msg *IOSLog) TypeID() int { return 103 } @@ -1952,6 +3409,21 @@ func (msg *IOSInternalError) Encode() []byte { return buf[:p] } +func (msg *IOSInternalError) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *IOSInternalError) Decode() Message { + return msg +} + func (msg *IOSInternalError) TypeID() int { return 104 } @@ -1985,6 +3457,21 @@ func (msg *IOSNetworkCall) Encode() []byte { return buf[:p] } +func (msg *IOSNetworkCall) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *IOSNetworkCall) Decode() Message { + return msg +} + func (msg *IOSNetworkCall) TypeID() int { return 105 } @@ -2028,6 +3515,21 @@ func (msg *IOSPerformanceAggregated) Encode() []byte { return buf[:p] } +func (msg *IOSPerformanceAggregated) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *IOSPerformanceAggregated) Decode() Message { + return msg +} + func (msg *IOSPerformanceAggregated) TypeID() int { return 110 } @@ -2053,6 +3555,21 @@ func (msg *IOSIssueEvent) Encode() []byte { return buf[:p] } +func (msg *IOSIssueEvent) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *IOSIssueEvent) Decode() Message { + return msg +} + func (msg *IOSIssueEvent) TypeID() int { return 111 } diff --git a/backend/pkg/messages/primitives.go b/backend/pkg/messages/primitives.go index 8687ef413..eb65ae7b1 100644 --- a/backend/pkg/messages/primitives.go +++ b/backend/pkg/messages/primitives.go @@ -3,6 +3,7 @@ package messages import ( "encoding/json" "errors" + "fmt" "io" "log" ) @@ -16,15 +17,6 @@ func ReadByte(reader io.Reader) (byte, error) { return p[0], nil } -// func SkipBytes(reader io.ReadSeeker) error { -// n, err := ReadUint(reader) -// if err != nil { -// return err -// } -// _, err = reader.Seek(n, io.SeekCurrent); -// return err -// } - func ReadData(reader io.Reader) ([]byte, error) { n, err := ReadUint(reader) if err != nil { @@ -153,3 +145,28 @@ func WriteJson(v interface{}, buf []byte, p int) int { } return WriteData(data, buf, p) } + +func WriteSize(size uint64, buf []byte, p int) { + var m uint64 = 255 + for i := 0; i < 3; i++ { + buf[p+i] = byte(size & m) + size = size >> 8 + } + fmt.Println(buf) +} + +func ReadSize(reader io.Reader) (uint64, error) { + buf := make([]byte, 3) + n, err := io.ReadFull(reader, buf) + if err != nil { + return 0, err + } + if n != 3 { + return 0, fmt.Errorf("read only %d of 3 size bytes", n) + } + var size uint64 + for i, b := range buf { + size += uint64(b) << (8 * i) + } + return size, nil +} diff --git a/backend/pkg/messages/raw.go b/backend/pkg/messages/raw.go new file mode 100644 index 000000000..f580ec33f --- /dev/null +++ b/backend/pkg/messages/raw.go @@ -0,0 +1,67 @@ +package messages + +import ( + "bytes" + "encoding/binary" + "io" + "log" +) + +// RawMessage is a not decoded message +type RawMessage struct { + tp uint64 + size uint64 + data []byte + reader *bytes.Reader + meta *message + encoded bool + skipped *bool +} + +func (m *RawMessage) Encode() []byte { + if m.encoded { + return m.data + } + m.data = make([]byte, m.size) + m.encoded = true + *m.skipped = false + n, err := io.ReadFull(m.reader, m.data) + if err != nil { + log.Printf("message encode err: %s", err) + return nil + } + log.Printf("encode: read %d of %d bytes", n, m.size) + return m.data +} + +func (m *RawMessage) EncodeWithIndex() []byte { + if !m.encoded { + m.Encode() + } + if IsIOSType(int(m.tp)) { + return m.data + } + data := make([]byte, len(m.data)+8) + copy(data[8:], m.data[:]) + binary.LittleEndian.PutUint64(data[0:], m.Meta().Index) + return data +} + +func (m *RawMessage) Decode() Message { + if !m.encoded { + m.Encode() + } + msg, err := ReadMessage(m.tp, bytes.NewReader(m.data)) + if err != nil { + log.Printf("decode err: %s", err) + } + return msg +} + +func (m *RawMessage) TypeID() int { + return int(m.tp) +} + +func (m *RawMessage) Meta() *message { + return m.meta +} diff --git a/backend/pkg/queue/messages.go b/backend/pkg/queue/messages.go index 2da62ac6e..f52813492 100644 --- a/backend/pkg/queue/messages.go +++ b/backend/pkg/queue/messages.go @@ -1,19 +1,12 @@ package queue import ( - "bytes" - "log" - "openreplay/backend/pkg/messages" "openreplay/backend/pkg/queue/types" ) -func NewMessageConsumer(group string, topics []string, handler types.DecodedMessageHandler, autoCommit bool, messageSizeLimit int) types.Consumer { +func NewMessageConsumer(group string, topics []string, handler types.RawMessageHandler, autoCommit bool, messageSizeLimit int) types.Consumer { return NewConsumer(group, topics, func(sessionID uint64, value []byte, meta *types.Meta) { - if err := messages.ReadBatchReader(bytes.NewReader(value), func(msg messages.Message) { - handler(sessionID, msg, meta) - }); err != nil { - log.Printf("Decode error: %v\n", err) - } + handler(sessionID, messages.NewIterator(value), meta) }, autoCommit, messageSizeLimit) } diff --git a/backend/pkg/queue/types/types.go b/backend/pkg/queue/types/types.go index f1e90e184..aaf6f7afa 100644 --- a/backend/pkg/queue/types/types.go +++ b/backend/pkg/queue/types/types.go @@ -26,3 +26,4 @@ type Meta struct { type MessageHandler func(uint64, []byte, *Meta) type DecodedMessageHandler func(uint64, messages.Message, *Meta) +type RawMessageHandler func(uint64, messages.Iterator, *Meta) diff --git a/mobs/templates/backend~pkg~messages~messages.go.erb b/mobs/templates/backend~pkg~messages~messages.go.erb index a39939234..5efcf7f05 100644 --- a/mobs/templates/backend~pkg~messages~messages.go.erb +++ b/mobs/templates/backend~pkg~messages~messages.go.erb @@ -1,5 +1,12 @@ // Auto-generated, do not edit package messages + +const ( +<% $messages.each do |msg| %> + Msg<%= msg.name %> = <%= msg.id %> +<% end %> +) + <% $messages.each do |msg| %> type <%= msg.name %> struct { message @@ -16,6 +23,21 @@ func (msg *<%= msg.name %>) Encode() []byte { return buf[:p] } +func (msg *<%= msg.name %>) EncodeWithIndex() []byte { + encoded := msg.Encode() + if IsIOSType(msg.TypeID()) { + return encoded + } + data := make([]byte, len(encoded)+8) + copy(data[8:], encoded[:]) + binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index) + return data +} + +func (msg *<%= msg.name %>) Decode() Message { + return msg +} + func (msg *<%= msg.name %>) TypeID() int { return <%= msg.id %> }