Message processing refactoring (#743)

* feat(backend): refactored message processing logic and cleaned up previous changes
This commit is contained in:
Alexander 2022-09-28 13:41:50 +02:00 committed by GitHub
parent 8a1de30e5b
commit 56ed06ed17
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
36 changed files with 3221 additions and 3152 deletions

View file

@ -3,7 +3,6 @@ package main
import (
"context"
"log"
"openreplay/backend/pkg/queue/types"
"os"
"os/signal"
"syscall"
@ -31,40 +30,30 @@ func main() {
log.Printf("can't create assets_total metric: %s", err)
}
consumer := queue.NewMessageConsumer(
msgHandler := func(msg messages.Message) {
switch m := msg.(type) {
case *messages.AssetCache:
cacher.CacheURL(m.SessionID(), m.URL)
totalAssets.Add(context.Background(), 1)
case *messages.ErrorEvent:
if m.Source != "js_exception" {
return
}
sourceList, err := assets.ExtractJSExceptionSources(&m.Payload)
if err != nil {
log.Printf("Error on source extraction: %v", err)
return
}
for _, source := range sourceList {
cacher.CacheJSFile(source)
}
}
}
msgConsumer := queue.NewConsumer(
cfg.GroupCache,
[]string{cfg.TopicCache},
func(sessionID uint64, iter messages.Iterator, meta *types.Meta) {
for iter.Next() {
if iter.Type() == messages.MsgAssetCache {
m := iter.Message().Decode()
if m == nil {
return
}
msg := m.(*messages.AssetCache)
cacher.CacheURL(sessionID, msg.URL)
totalAssets.Add(context.Background(), 1)
} else if iter.Type() == messages.MsgErrorEvent {
m := iter.Message().Decode()
if m == nil {
return
}
msg := m.(*messages.ErrorEvent)
if msg.Source != "js_exception" {
continue
}
sourceList, err := assets.ExtractJSExceptionSources(&msg.Payload)
if err != nil {
log.Printf("Error on source extraction: %v", err)
continue
}
for _, source := range sourceList {
cacher.CacheJSFile(source)
}
}
}
iter.Close()
},
messages.NewMessageIterator(msgHandler, []int{messages.MsgAssetCache, messages.MsgErrorEvent}, true),
true,
cfg.MessageSizeLimit,
)
@ -79,15 +68,14 @@ func main() {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
consumer.Close()
msgConsumer.Close()
os.Exit(0)
case err := <-cacher.Errors:
log.Printf("Error while caching: %v", err)
// TODO: notify user
case <-tick:
cacher.UpdateTimeouts()
default:
if err := consumer.ConsumeNext(); err != nil {
if err := msgConsumer.ConsumeNext(); err != nil {
log.Fatalf("Error on consumption: %v", err)
}
}

View file

@ -45,10 +45,6 @@ func main() {
// Create handler's aggregator
builderMap := sessions.NewBuilderMap(handlersFabric)
keepMessage := func(tp int) bool {
return tp == messages.MsgMetadata || tp == messages.MsgIssueEvent || tp == messages.MsgSessionStart || tp == messages.MsgSessionEnd || tp == messages.MsgUserID || tp == messages.MsgUserAnonymousID || tp == messages.MsgCustomEvent || tp == messages.MsgClickEvent || tp == messages.MsgInputEvent || tp == messages.MsgPageEvent || tp == messages.MsgErrorEvent || tp == messages.MsgFetchEvent || tp == messages.MsgGraphQLEvent || tp == messages.MsgIntegrationEvent || tp == messages.MsgPerformanceTrackAggr || tp == messages.MsgResourceEvent || tp == messages.MsgLongTask || tp == messages.MsgJSException || tp == messages.MsgResourceTiming || tp == messages.MsgRawCustomEvent || tp == messages.MsgCustomIssue || tp == messages.MsgFetch || tp == messages.MsgGraphQL || tp == messages.MsgStateAction || tp == messages.MsgSetInputTarget || tp == messages.MsgSetInputValue || tp == messages.MsgCreateDocument || tp == messages.MsgMouseClick || tp == messages.MsgSetPageLocation || tp == messages.MsgPageLoadTiming || tp == messages.MsgPageRenderTiming
}
var producer types.Producer = nil
if cfg.UseQuickwit {
producer = queue.NewProducer(cfg.MessageSizeLimit, true)
@ -60,69 +56,67 @@ func main() {
saver.InitStats()
statsLogger := logger.NewQueueStats(cfg.LoggerTimeout)
msgFilter := []int{messages.MsgMetadata, messages.MsgIssueEvent, messages.MsgSessionStart, messages.MsgSessionEnd,
messages.MsgUserID, messages.MsgUserAnonymousID, messages.MsgCustomEvent, messages.MsgClickEvent,
messages.MsgInputEvent, messages.MsgPageEvent, messages.MsgErrorEvent, messages.MsgFetchEvent,
messages.MsgGraphQLEvent, messages.MsgIntegrationEvent, messages.MsgPerformanceTrackAggr,
messages.MsgResourceEvent, messages.MsgLongTask, messages.MsgJSException, messages.MsgResourceTiming,
messages.MsgRawCustomEvent, messages.MsgCustomIssue, messages.MsgFetch, messages.MsgGraphQL,
messages.MsgStateAction, messages.MsgSetInputTarget, messages.MsgSetInputValue, messages.MsgCreateDocument,
messages.MsgMouseClick, messages.MsgSetPageLocation, messages.MsgPageLoadTiming, messages.MsgPageRenderTiming}
// Handler logic
handler := func(sessionID uint64, iter messages.Iterator, meta *types.Meta) {
statsLogger.Collect(sessionID, meta)
msgHandler := func(msg messages.Message) {
statsLogger.Collect(msg) // TODO: carefully check message meta and batch meta confusion situation
for iter.Next() {
if !keepMessage(iter.Type()) {
continue
// Just save session data into db without additional checks
if err := saver.InsertMessage(msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, msg.SessionID(), msg)
}
msg := iter.Message().Decode()
if msg == nil {
return
}
// Just save session data into db without additional checks
if err := saver.InsertMessage(sessionID, msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg)
}
return
}
session, err := pg.GetSession(sessionID)
if session == nil {
if err != nil && !errors.Is(err, cache.NilSessionInCacheError) {
log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, sessionID, msg)
}
return
}
// Save statistics to db
err = saver.InsertStats(session, msg)
if err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg)
}
// Handle heuristics and save to temporary queue in memory
builderMap.HandleMessage(sessionID, msg, msg.Meta().Index)
// Process saved heuristics messages as usual messages above in the code
builderMap.IterateSessionReadyMessages(sessionID, func(msg messages.Message) {
if err := saver.InsertMessage(sessionID, msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
return
}
if err := saver.InsertStats(session, msg); err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
})
return
}
iter.Close()
session, err := pg.GetSession(msg.SessionID())
if session == nil {
if err != nil && !errors.Is(err, cache.NilSessionInCacheError) {
log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, msg.SessionID(), msg)
}
return
}
// Save statistics to db
err = saver.InsertStats(session, msg)
if err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg)
}
// Handle heuristics and save to temporary queue in memory
builderMap.HandleMessage(msg)
// Process saved heuristics messages as usual messages above in the code
builderMap.IterateSessionReadyMessages(msg.SessionID(), func(msg messages.Message) {
if err := saver.InsertMessage(msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
return
}
if err := saver.InsertStats(session, msg); err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
})
}
// Init consumer
consumer := queue.NewMessageConsumer(
consumer := queue.NewConsumer(
cfg.GroupDB,
[]string{
cfg.TopicRawWeb,
cfg.TopicAnalytics,
},
handler,
messages.NewMessageIterator(msgHandler, msgFilter, true),
false,
cfg.MessageSizeLimit,
)
@ -146,7 +140,7 @@ func main() {
pgDur := time.Now().Sub(start).Milliseconds()
start = time.Now()
if err := saver.CommitStats(consumer.HasFirstPartition()); err != nil {
if err := saver.CommitStats(); err != nil {
log.Printf("Error on stats commit: %v", err)
}
chDur := time.Now().Sub(start).Milliseconds()

View file

@ -2,7 +2,6 @@ package main
import (
"log"
"openreplay/backend/pkg/queue/types"
"os"
"os/signal"
"syscall"
@ -38,24 +37,24 @@ func main() {
return
}
producer := queue.NewProducer(cfg.MessageSizeLimit, true)
consumer := queue.NewMessageConsumer(
msgHandler := func(msg messages.Message) {
if msg.TypeID() == messages.MsgSessionStart || msg.TypeID() == messages.MsgSessionEnd {
return
}
if msg.Meta().Timestamp == 0 {
log.Printf("ZERO TS, sessID: %d, msgType: %d", msg.Meta().SessionID(), msg.TypeID())
}
statsLogger.Collect(msg)
sessions.UpdateSession(msg) //TODO: recheck timestamps(sessionID, meta.Timestamp, iter.Message().Meta().Timestamp)
}
consumer := queue.NewConsumer(
cfg.GroupEnder,
[]string{
cfg.TopicRawWeb,
},
func(sessionID uint64, iter messages.Iterator, meta *types.Meta) {
for iter.Next() {
if iter.Type() == messages.MsgSessionStart || iter.Type() == messages.MsgSessionEnd {
continue
}
if iter.Message().Meta().Timestamp == 0 {
log.Printf("ZERO TS, sessID: %d, msgType: %d", sessionID, iter.Type())
}
statsLogger.Collect(sessionID, meta)
sessions.UpdateSession(sessionID, meta.Timestamp, iter.Message().Meta().Timestamp)
}
iter.Close()
},
messages.NewMessageIterator(msgHandler, nil, false),
false,
cfg.MessageSizeLimit,
)
@ -94,7 +93,7 @@ func main() {
currDuration, newDuration)
return true
}
if err := producer.Produce(cfg.TopicRawWeb, sessionID, messages.Encode(msg)); err != nil {
if err := producer.Produce(cfg.TopicRawWeb, sessionID, msg.Encode()); err != nil {
log.Printf("can't send sessionEnd to topic: %s; sessID: %d", err, sessionID)
return false
}

View file

@ -2,7 +2,6 @@ package main
import (
"log"
"openreplay/backend/pkg/queue/types"
"os"
"os/signal"
"syscall"
@ -47,25 +46,18 @@ func main() {
// Init producer and consumer for data bus
producer := queue.NewProducer(cfg.MessageSizeLimit, true)
consumer := queue.NewMessageConsumer(
msgHandler := func(msg messages.Message) {
statsLogger.Collect(msg)
builderMap.HandleMessage(msg) //(sessionID, msg, iter.Message().Meta().Index)
}
consumer := queue.NewConsumer(
cfg.GroupHeuristics,
[]string{
cfg.TopicRawWeb,
},
func(sessionID uint64, iter messages.Iterator, meta *types.Meta) {
var lastMessageID uint64
for iter.Next() {
statsLogger.Collect(sessionID, meta)
msg := iter.Message().Decode()
if msg == nil {
log.Printf("failed batch, sess: %d, lastIndex: %d", sessionID, lastMessageID)
continue
}
lastMessageID = msg.Meta().Index
builderMap.HandleMessage(sessionID, msg, iter.Message().Meta().Index)
}
iter.Close()
},
messages.NewMessageIterator(msgHandler, nil, true),
false,
cfg.MessageSizeLimit,
)
@ -86,7 +78,7 @@ func main() {
os.Exit(0)
case <-tick:
builderMap.IterateReadyMessages(func(sessionID uint64, readyMsg messages.Message) {
producer.Produce(cfg.TopicAnalytics, sessionID, messages.Encode(readyMsg))
producer.Produce(cfg.TopicAnalytics, sessionID, readyMsg.Encode())
})
producer.Flush(cfg.ProducerTimeout)
consumer.Commit()

View file

@ -13,7 +13,6 @@ import (
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/intervals"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/token"
)
@ -84,7 +83,7 @@ func main() {
}
sessionID = sessData.ID
}
producer.Produce(cfg.TopicAnalytics, sessionID, messages.Encode(event.IntegrationEvent))
producer.Produce(cfg.TopicAnalytics, sessionID, event.IntegrationEvent.Encode())
case err := <-manager.Errors:
log.Printf("Integration error: %v\n", err)
case i := <-manager.RequestDataUpdates:

View file

@ -3,7 +3,6 @@ package main
import (
"context"
"log"
"openreplay/backend/pkg/queue/types"
"os"
"os/signal"
"syscall"
@ -13,7 +12,7 @@ import (
"openreplay/backend/internal/sink/assetscache"
"openreplay/backend/internal/sink/oswriter"
"openreplay/backend/internal/storage"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/monitoring"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/url/assets"
@ -51,64 +50,58 @@ func main() {
log.Printf("can't create messages_size metric: %s", err)
}
consumer := queue.NewMessageConsumer(
msgHandler := func(msg messages.Message) {
// [METRICS] Increase the number of processed messages
totalMessages.Add(context.Background(), 1)
// Send SessionEnd trigger to storage service
if msg.TypeID() == messages.MsgSessionEnd {
if err := producer.Produce(cfg.TopicTrigger, msg.SessionID(), msg.Encode()); err != nil {
log.Printf("can't send SessionEnd to trigger topic: %s; sessID: %d", err, msg.SessionID())
}
return
}
// Process assets
if msg.TypeID() == messages.MsgSetNodeAttributeURLBased ||
msg.TypeID() == messages.MsgSetCSSDataURLBased ||
msg.TypeID() == messages.MsgCSSInsertRuleURLBased ||
msg.TypeID() == messages.MsgAdoptedSSReplaceURLBased ||
msg.TypeID() == messages.MsgAdoptedSSInsertRuleURLBased {
msg = assetMessageHandler.ParseAssets(msg.Decode()) // TODO: filter type only once (use iterator inside or bring ParseAssets out here).
}
// Filter message
if !messages.IsReplayerType(msg.TypeID()) {
return
}
// If message timestamp is empty, use at least ts of session start
ts := msg.Meta().Timestamp
if ts == 0 {
log.Printf("zero ts; sessID: %d, msgType: %d", msg.SessionID(), msg.TypeID())
} else {
// Log ts of last processed message
counter.Update(msg.SessionID(), time.UnixMilli(ts))
}
// Write encoded message with index to session file
data := msg.EncodeWithIndex()
if err := writer.Write(msg.SessionID(), data); err != nil {
log.Printf("Writer error: %v\n", err)
}
// [METRICS] Increase the number of written to the files messages and the message size
messageSize.Record(context.Background(), float64(len(data)))
savedMessages.Add(context.Background(), 1)
}
consumer := queue.NewConsumer(
cfg.GroupSink,
[]string{
cfg.TopicRawWeb,
},
func(sessionID uint64, iter Iterator, meta *types.Meta) {
for iter.Next() {
// [METRICS] Increase the number of processed messages
totalMessages.Add(context.Background(), 1)
// Send SessionEnd trigger to storage service
if iter.Type() == MsgSessionEnd {
if err := producer.Produce(cfg.TopicTrigger, sessionID, iter.Message().Encode()); err != nil {
log.Printf("can't send SessionEnd to trigger topic: %s; sessID: %d", err, sessionID)
}
continue
}
msg := iter.Message()
// Process assets
if iter.Type() == MsgSetNodeAttributeURLBased ||
iter.Type() == MsgSetCSSDataURLBased ||
iter.Type() == MsgCSSInsertRuleURLBased ||
iter.Type() == MsgAdoptedSSReplaceURLBased ||
iter.Type() == MsgAdoptedSSInsertRuleURLBased {
m := msg.Decode()
if m == nil {
return
}
msg = assetMessageHandler.ParseAssets(sessionID, m) // TODO: filter type only once (use iterator inide or bring ParseAssets out here).
}
// Filter message
if !IsReplayerType(msg.TypeID()) {
continue
}
// If message timestamp is empty, use at least ts of session start
ts := msg.Meta().Timestamp
if ts == 0 {
log.Printf("zero ts; sessID: %d, msgType: %d", sessionID, iter.Type())
} else {
// Log ts of last processed message
counter.Update(sessionID, time.UnixMilli(ts))
}
// Write encoded message with index to session file
data := msg.EncodeWithIndex()
if err := writer.Write(sessionID, data); err != nil {
log.Printf("Writer error: %v\n", err)
}
// [METRICS] Increase the number of written to the files messages and the message size
messageSize.Record(context.Background(), float64(len(data)))
savedMessages.Add(context.Background(), 1)
}
iter.Close()
},
messages.NewMessageIterator(msgHandler, nil, false),
false,
cfg.MessageSizeLimit,
)

View file

@ -2,7 +2,6 @@ package main
import (
"log"
"openreplay/backend/pkg/queue/types"
"os"
"os/signal"
"strconv"
@ -38,24 +37,24 @@ func main() {
log.Fatalf("can't init sessionFinder module: %s", err)
}
consumer := queue.NewMessageConsumer(
consumer := queue.NewConsumer(
cfg.GroupStorage,
[]string{
cfg.TopicTrigger,
},
func(sessionID uint64, iter messages.Iterator, meta *types.Meta) {
for iter.Next() {
if iter.Type() == messages.MsgSessionEnd {
msg := iter.Message().Decode().(*messages.SessionEnd)
if err := srv.UploadKey(strconv.FormatUint(sessionID, 10), 5); err != nil {
log.Printf("can't find session: %d", sessionID)
sessionFinder.Find(sessionID, msg.Timestamp)
}
// Log timestamp of last processed session
counter.Update(sessionID, time.UnixMilli(meta.Timestamp))
messages.NewMessageIterator(
func(msg messages.Message) {
m := msg.(*messages.SessionEnd)
if err := srv.UploadKey(strconv.FormatUint(msg.SessionID(), 10), 5); err != nil {
log.Printf("can't find session: %d", msg.SessionID())
sessionFinder.Find(msg.SessionID(), m.Timestamp)
}
}
},
// Log timestamp of last processed session
counter.Update(msg.SessionID(), time.UnixMilli(msg.Meta().Batch().Timestamp()))
},
[]int{messages.MsgSessionEnd},
true,
),
true,
cfg.MessageSizeLimit,
)

View file

@ -5,7 +5,8 @@ import (
. "openreplay/backend/pkg/messages"
)
func (mi *Saver) InsertMessage(sessionID uint64, msg Message) error {
func (mi *Saver) InsertMessage(msg Message) error {
sessionID := msg.SessionID()
switch m := msg.(type) {
// Common
case *Metadata:

View file

@ -22,6 +22,6 @@ func (si *Saver) InsertStats(session *Session, msg Message) error {
return nil
}
func (si *Saver) CommitStats(optimize bool) error {
func (si *Saver) CommitStats() error {
return nil
}

View file

@ -74,7 +74,7 @@ func (e *Router) startSessionHandlerIOS(w http.ResponseWriter, r *http.Request)
country := e.services.GeoIP.ExtractISOCodeFromHTTPRequest(r)
// The difference with web is mostly here:
e.services.Producer.Produce(e.cfg.TopicRawIOS, tokenData.ID, Encode(&IOSSessionStart{
sessStart := &IOSSessionStart{
Timestamp: req.Timestamp,
ProjectID: uint64(p.ProjectID),
TrackerVersion: req.TrackerVersion,
@ -85,7 +85,8 @@ func (e *Router) startSessionHandlerIOS(w http.ResponseWriter, r *http.Request)
UserDevice: ios.MapIOSDevice(req.UserDevice),
UserDeviceType: ios.GetIOSDeviceType(req.UserDevice),
UserCountry: country,
}))
}
e.services.Producer.Produce(e.cfg.TopicRawIOS, tokenData.ID, sessStart.Encode())
}
ResponseWithJSON(w, &StartIOSSessionResponse{

View file

@ -125,7 +125,7 @@ func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request)
}
// Send sessionStart message to kafka
if err := e.services.Producer.Produce(e.cfg.TopicRawWeb, tokenData.ID, Encode(sessionStart)); err != nil {
if err := e.services.Producer.Produce(e.cfg.TopicRawWeb, tokenData.ID, sessionStart.Encode()); err != nil {
log.Printf("can't send session start: %s", err)
}
}

View file

@ -5,6 +5,7 @@ import (
"fmt"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"log"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/monitoring"
"time"
)
@ -52,9 +53,11 @@ func New(metrics *monitoring.Metrics, timeout int64, parts int) (*SessionEnder,
}
// UpdateSession save timestamp for new sessions and update for existing sessions
func (se *SessionEnder) UpdateSession(sessionID uint64, timestamp, msgTimestamp int64) {
func (se *SessionEnder) UpdateSession(msg messages.Message) {
sessionID := msg.Meta().SessionID()
currTS := msg.Meta().Batch().Timestamp()
msgTimestamp := msg.Meta().Timestamp
localTS := time.Now().UnixMilli()
currTS := timestamp
if currTS == 0 {
log.Printf("got empty timestamp for sessionID: %d", sessionID)
return

View file

@ -22,14 +22,14 @@ func New(cfg *sink.Config, rewriter *assets.Rewriter, producer types.Producer) *
}
}
func (e *AssetsCache) ParseAssets(sessID uint64, msg messages.Message) messages.Message {
func (e *AssetsCache) ParseAssets(msg messages.Message) messages.Message {
switch m := msg.(type) {
case *messages.SetNodeAttributeURLBased:
if m.Name == "src" || m.Name == "href" {
newMsg := &messages.SetNodeAttribute{
ID: m.ID,
Name: m.Name,
Value: e.handleURL(sessID, m.BaseURL, m.Value),
Value: e.handleURL(m.SessionID(), m.BaseURL, m.Value),
}
newMsg.SetMeta(msg.Meta())
return newMsg
@ -37,7 +37,7 @@ func (e *AssetsCache) ParseAssets(sessID uint64, msg messages.Message) messages.
newMsg := &messages.SetNodeAttribute{
ID: m.ID,
Name: m.Name,
Value: e.handleCSS(sessID, m.BaseURL, m.Value),
Value: e.handleCSS(m.SessionID(), m.BaseURL, m.Value),
}
newMsg.SetMeta(msg.Meta())
return newMsg
@ -45,7 +45,7 @@ func (e *AssetsCache) ParseAssets(sessID uint64, msg messages.Message) messages.
case *messages.SetCSSDataURLBased:
newMsg := &messages.SetCSSData{
ID: m.ID,
Data: e.handleCSS(sessID, m.BaseURL, m.Data),
Data: e.handleCSS(m.SessionID(), m.BaseURL, m.Data),
}
newMsg.SetMeta(msg.Meta())
return newMsg
@ -53,14 +53,14 @@ func (e *AssetsCache) ParseAssets(sessID uint64, msg messages.Message) messages.
newMsg := &messages.CSSInsertRule{
ID: m.ID,
Index: m.Index,
Rule: e.handleCSS(sessID, m.BaseURL, m.Rule),
Rule: e.handleCSS(m.SessionID(), m.BaseURL, m.Rule),
}
newMsg.SetMeta(msg.Meta())
return newMsg
case *messages.AdoptedSSReplaceURLBased:
newMsg := &messages.AdoptedSSReplace{
SheetID: m.SheetID,
Text: e.handleCSS(sessID, m.BaseURL, m.Text),
Text: e.handleCSS(m.SessionID(), m.BaseURL, m.Text),
}
newMsg.SetMeta(msg.Meta())
return newMsg
@ -68,7 +68,7 @@ func (e *AssetsCache) ParseAssets(sessID uint64, msg messages.Message) messages.
newMsg := &messages.AdoptedSSInsertRule{
SheetID: m.SheetID,
Index: m.Index,
Rule: e.handleCSS(sessID, m.BaseURL, m.Rule),
Rule: e.handleCSS(m.SessionID(), m.BaseURL, m.Rule),
}
newMsg.SetMeta(msg.Meta())
return newMsg
@ -78,10 +78,11 @@ func (e *AssetsCache) ParseAssets(sessID uint64, msg messages.Message) messages.
func (e *AssetsCache) sendAssetForCache(sessionID uint64, baseURL string, relativeURL string) {
if fullURL, cacheable := assets.GetFullCachableURL(baseURL, relativeURL); cacheable {
assetMessage := &messages.AssetCache{URL: fullURL}
if err := e.producer.Produce(
e.cfg.TopicCache,
sessionID,
messages.Encode(&messages.AssetCache{URL: fullURL}),
assetMessage.Encode(),
); err != nil {
log.Printf("can't send asset to cache topic, sessID: %d, err: %s", sessionID, err)
}

View file

@ -5,8 +5,7 @@ import (
"log"
"time"
"openreplay/backend/pkg/queue/types"
//"openreplay/backend/pkg/env"
"openreplay/backend/pkg/messages"
)
type partitionStats struct {
@ -18,15 +17,15 @@ type partitionStats struct {
}
// Update partition statistic
func (prt *partitionStats) update(m *types.Meta) {
if prt.maxts < m.Timestamp {
prt.maxts = m.Timestamp
func (prt *partitionStats) update(m *messages.BatchInfo) {
if prt.maxts < m.Timestamp() {
prt.maxts = m.Timestamp()
}
if prt.mints > m.Timestamp || prt.mints == 0 {
prt.mints = m.Timestamp
if prt.mints > m.Timestamp() || prt.mints == 0 {
prt.mints = m.Timestamp()
}
prt.lastts = m.Timestamp
prt.lastID = m.ID
prt.lastts = m.Timestamp()
prt.lastID = m.ID()
prt.count += 1
}
@ -43,14 +42,14 @@ func NewQueueStats(sec int) *queueStats {
}
// Collect writes new data to partition statistic
func (qs *queueStats) Collect(sessionID uint64, m *types.Meta) {
prti := int32(sessionID % 16) // TODO use GetKeyPartition from kafka/key.go
func (qs *queueStats) Collect(msg messages.Message) {
prti := int32(msg.SessionID() % 16) // TODO use GetKeyPartition from kafka/key.go
prt, ok := qs.prts[prti]
if !ok {
qs.prts[prti] = &partitionStats{}
prt = qs.prts[prti]
}
prt.update(m)
prt.update(msg.Meta().Batch())
select {
case <-qs.tick:

View file

@ -1,197 +0,0 @@
package messages
import (
"bytes"
"io"
"log"
"strings"
)
type Iterator interface {
Next() bool // Return true if we have next message
Type() int // Return type of the next message
Message() Message // Return raw or decoded message
Close()
}
type iteratorImpl struct {
data *bytes.Reader
index uint64
timestamp int64
version uint64
msgType uint64
msgSize uint64
canSkip bool
msg Message
url string
}
func NewIterator(data []byte) Iterator {
return &iteratorImpl{
data: bytes.NewReader(data),
}
}
func (i *iteratorImpl) Next() bool {
if i.canSkip {
if _, err := i.data.Seek(int64(i.msgSize), io.SeekCurrent); err != nil {
log.Printf("seek err: %s", err)
return false
}
}
i.canSkip = false
var err error
i.msgType, err = ReadUint(i.data)
if err != nil {
if err == io.EOF {
return false
}
log.Printf("can't read message type: %s", err)
return false
}
if i.version > 0 && messageHasSize(i.msgType) {
// Read message size if it is a new protocol version
i.msgSize, err = ReadSize(i.data)
if err != nil {
log.Printf("can't read message size: %s", err)
return false
}
i.msg = &RawMessage{
tp: i.msgType,
size: i.msgSize,
meta: &message{},
reader: i.data,
skipped: &i.canSkip,
}
i.canSkip = true
} else {
i.msg, err = ReadMessage(i.msgType, i.data)
if err == io.EOF {
return false
} else if err != nil {
if strings.HasPrefix(err.Error(), "Unknown message code:") {
code := strings.TrimPrefix(err.Error(), "Unknown message code: ")
i.msg, err = DecodeExtraMessage(code, i.data)
if err != nil {
log.Printf("can't decode msg: %s", err)
return false
}
} else {
log.Printf("Batch Message decoding error on message with index %v, err: %s", i.index, err)
return false
}
}
i.msg = transformDeprecated(i.msg)
}
// Process meta information
isBatchMeta := false
switch i.msgType {
case MsgBatchMetadata:
if i.index != 0 { // Might be several 0-0 BatchMeta in a row without an error though
log.Printf("Batch Metadata found at the end of the batch")
return false
}
msg := i.msg.Decode()
if msg == nil {
return false
}
m := msg.(*BatchMetadata)
i.index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha)
i.timestamp = m.Timestamp
i.version = m.Version
i.url = m.Url
isBatchMeta = true
if i.version > 1 {
log.Printf("incorrect batch version, skip current batch")
return false
}
case MsgBatchMeta: // Is not required to be present in batch since IOS doesn't have it (though we might change it)
if i.index != 0 { // Might be several 0-0 BatchMeta in a row without an error though
log.Printf("Batch Meta found at the end of the batch")
return false
}
msg := i.msg.Decode()
if msg == nil {
return false
}
m := msg.(*BatchMeta)
i.index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha)
i.timestamp = m.Timestamp
isBatchMeta = true
// continue readLoop
case MsgIOSBatchMeta:
if i.index != 0 { // Might be several 0-0 BatchMeta in a row without an error though
log.Printf("Batch Meta found at the end of the batch")
return false
}
msg := i.msg.Decode()
if msg == nil {
return false
}
m := msg.(*IOSBatchMeta)
i.index = m.FirstIndex
i.timestamp = int64(m.Timestamp)
isBatchMeta = true
// continue readLoop
case MsgTimestamp:
msg := i.msg.Decode()
if msg == nil {
return false
}
m := msg.(*Timestamp)
i.timestamp = int64(m.Timestamp)
// No skipping here for making it easy to encode back the same sequence of message
// continue readLoop
case MsgSessionStart:
msg := i.msg.Decode()
if msg == nil {
return false
}
m := msg.(*SessionStart)
i.timestamp = int64(m.Timestamp)
case MsgSessionEnd:
msg := i.msg.Decode()
if msg == nil {
return false
}
m := msg.(*SessionEnd)
i.timestamp = int64(m.Timestamp)
case MsgSetPageLocation:
msg := i.msg.Decode()
if msg == nil {
return false
}
m := msg.(*SetPageLocation)
i.url = m.URL
}
i.msg.Meta().Index = i.index
i.msg.Meta().Timestamp = i.timestamp
i.msg.Meta().Url = i.url
if !isBatchMeta { // Without that indexes will be unique anyway, though shifted by 1 because BatchMeta is not counted in tracker
i.index++
}
return true
}
func (i *iteratorImpl) Type() int {
return int(i.msgType)
}
func (i *iteratorImpl) Message() Message {
return i.msg
}
func (i *iteratorImpl) Close() {
_, err := i.data.Seek(0, io.SeekEnd)
if err != nil {
log.Printf("can't set seek pointer at the end: %s", err)
}
}
func messageHasSize(msgType uint64) bool {
return !(msgType == 80 || msgType == 81 || msgType == 82)
}

View file

@ -1,56 +0,0 @@
package messages
import (
"encoding/binary"
"fmt"
"io"
)
type SessionSearch struct {
message
Timestamp uint64
Partition uint64
}
func (msg *SessionSearch) Encode() []byte {
buf := make([]byte, 11)
buf[0] = 127
p := 1
p = WriteUint(msg.Timestamp, buf, p)
p = WriteUint(msg.Partition, buf, p)
return buf[:p]
}
func (msg *SessionSearch) EncodeWithIndex() []byte {
encoded := msg.Encode()
if IsIOSType(msg.TypeID()) {
return encoded
}
data := make([]byte, len(encoded)+8)
copy(data[8:], encoded[:])
binary.LittleEndian.PutUint64(data[0:], msg.Meta().Index)
return data
}
func (msg *SessionSearch) Decode() Message {
return msg
}
func (msg *SessionSearch) TypeID() int {
return 127
}
func DecodeExtraMessage(code string, reader io.Reader) (Message, error) {
var err error
if code != "127" {
return nil, fmt.Errorf("unknown message code: %s", code)
}
msg := &SessionSearch{}
if msg.Timestamp, err = ReadUint(reader); err != nil {
return nil, fmt.Errorf("can't read message timestamp: %s", err)
}
if msg.Partition, err = ReadUint(reader); err != nil {
return nil, fmt.Errorf("can't read last partition: %s", err)
}
return msg, nil
}

View file

@ -1,5 +0,0 @@
package messages
func Encode(msg Message) []byte {
return msg.Encode()
}

View file

@ -2,7 +2,7 @@
package messages
func IsReplayerType(id int) bool {
return 0 == id || 4 == id || 5 == id || 6 == id || 7 == id || 8 == id || 9 == id || 10 == id || 11 == id || 12 == id || 13 == id || 14 == id || 15 == id || 16 == id || 18 == id || 19 == id || 20 == id || 22 == id || 37 == id || 38 == id || 39 == id || 40 == id || 41 == id || 44 == id || 45 == id || 46 == id || 47 == id || 48 == id || 49 == id || 54 == id || 55 == id || 59 == id || 60 == id || 61 == id || 67 == id || 69 == id || 70 == id || 71 == id || 72 == id || 73 == id || 74 == id || 75 == id || 76 == id || 77 == id || 79 == id || 90 == id || 93 == id || 96 == id || 100 == id || 102 == id || 103 == id || 105 == id
return 0 == id || 4 == id || 5 == id || 6 == id || 7 == id || 8 == id || 9 == id || 10 == id || 11 == id || 12 == id || 13 == id || 14 == id || 15 == id || 16 == id || 18 == id || 19 == id || 20 == id || 22 == id || 37 == id || 38 == id || 39 == id || 40 == id || 41 == id || 44 == id || 45 == id || 46 == id || 47 == id || 48 == id || 49 == id || 54 == id || 55 == id || 59 == id || 60 == id || 61 == id || 67 == id || 69 == id || 70 == id || 71 == id || 72 == id || 73 == id || 74 == id || 75 == id || 76 == id || 77 == id || 79 == id || 127 == id || 90 == id || 93 == id || 96 == id || 100 == id || 102 == id || 103 == id || 105 == id
}
func IsIOSType(id int) bool {

View file

@ -0,0 +1,184 @@
package messages
import (
"bytes"
"fmt"
"io"
"log"
)
// MessageHandler processes one message using service logic
type MessageHandler func(Message)
// MessageIterator iterates by all messages in batch
type MessageIterator interface {
Iterate(batchData []byte, batchInfo *BatchInfo)
}
type messageIteratorImpl struct {
filter map[int]struct{}
preFilter map[int]struct{}
handler MessageHandler
autoDecode bool
version uint64
size uint64
canSkip bool
messageInfo *message
batchInfo *BatchInfo
}
func NewMessageIterator(messageHandler MessageHandler, messageFilter []int, autoDecode bool) MessageIterator {
iter := &messageIteratorImpl{handler: messageHandler, autoDecode: autoDecode}
if len(messageFilter) != 0 {
filter := make(map[int]struct{}, len(messageFilter))
for _, msgType := range messageFilter {
filter[msgType] = struct{}{}
}
iter.filter = filter
}
iter.preFilter = map[int]struct{}{
MsgBatchMetadata: {}, MsgBatchMeta: {}, MsgTimestamp: {},
MsgSessionStart: {}, MsgSessionEnd: {}, MsgSetPageLocation: {}}
return iter
}
func (i *messageIteratorImpl) prepareVars(batchInfo *BatchInfo) {
i.batchInfo = batchInfo
i.messageInfo = &message{batch: batchInfo}
i.version = 0
i.canSkip = false
i.size = 0
}
func (i *messageIteratorImpl) Iterate(batchData []byte, batchInfo *BatchInfo) {
// Prepare iterator before processing messages in batch
i.prepareVars(batchInfo)
// Initialize batch reader
reader := bytes.NewReader(batchData)
// Process until end of batch or parsing error
for {
// Increase message index (can be overwritten by batch info message)
i.messageInfo.Index++
if i.canSkip {
if _, err := reader.Seek(int64(i.size), io.SeekCurrent); err != nil {
log.Printf("seek err: %s", err)
return
}
}
i.canSkip = false
// Read message type
msgType, err := ReadUint(reader)
if err != nil {
if err != io.EOF {
log.Printf("can't read message type: %s", err)
}
return
}
var msg Message
// Read message body (and decode if protocol version less than 1)
if i.version > 0 && messageHasSize(msgType) {
// Read message size if it is a new protocol version
i.size, err = ReadSize(reader)
if err != nil {
log.Printf("can't read message size: %s", err)
return
}
msg = &RawMessage{
tp: msgType,
size: i.size,
reader: reader,
skipped: &i.canSkip,
meta: i.messageInfo,
}
i.canSkip = true
} else {
msg, err = ReadMessage(msgType, reader)
if err != nil {
if err != io.EOF {
log.Printf("Batch Message decoding error on message with index %v, err: %s", i.messageInfo.Index, err)
}
return
}
msg = transformDeprecated(msg)
}
// Preprocess "system" messages
if _, ok := i.preFilter[msg.TypeID()]; ok {
msg = msg.Decode()
if msg == nil {
log.Printf("can't decode message")
return
}
if err := i.preprocessing(msg); err != nil {
log.Printf("message preprocessing err: %s", err)
return
}
}
// Skip messages we don't have in filter
if i.filter != nil {
if _, ok := i.filter[msg.TypeID()]; !ok {
continue
}
}
if i.autoDecode {
msg = msg.Decode()
if msg == nil {
log.Printf("can't decode message")
return
}
}
// Set meta information for message
msg.Meta().SetMeta(i.messageInfo)
// Process message
i.handler(msg)
}
}
func (i *messageIteratorImpl) preprocessing(msg Message) error {
switch m := msg.(type) {
case *BatchMetadata:
if i.messageInfo.Index > 1 { // Might be several 0-0 BatchMeta in a row without an error though
return fmt.Errorf("batchMetadata found at the end of the batch")
}
if m.Version > 1 {
return fmt.Errorf("incorrect batch version: %d, skip current batch", i.version)
}
i.messageInfo.Index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha)
i.messageInfo.Timestamp = m.Timestamp
i.messageInfo.Url = m.Url
i.version = m.Version
case *BatchMeta: // Is not required to be present in batch since IOS doesn't have it (though we might change it)
if i.messageInfo.Index > 1 { // Might be several 0-0 BatchMeta in a row without an error though
return fmt.Errorf("batchMeta found at the end of the batch")
}
i.messageInfo.Index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha)
i.messageInfo.Timestamp = m.Timestamp
case *Timestamp:
i.messageInfo.Timestamp = int64(m.Timestamp)
case *SessionStart:
i.messageInfo.Timestamp = int64(m.Timestamp)
case *SessionEnd:
i.messageInfo.Timestamp = int64(m.Timestamp)
case *SetPageLocation:
i.messageInfo.Url = m.URL
}
return nil
}
func messageHasSize(msgType uint64) bool {
return !(msgType == 80 || msgType == 81 || msgType == 82)
}

View file

@ -1,25 +1,72 @@
package messages
type message struct {
Timestamp int64
Index uint64
Url string
}
func (m *message) Meta() *message {
return m
}
func (m *message) SetMeta(origin *message) {
m.Timestamp = origin.Timestamp
m.Index = origin.Index
m.Url = origin.Url
}
type Message interface {
Encode() []byte
EncodeWithIndex() []byte
Decode() Message
TypeID() int
Meta() *message
SessionID() uint64
}
// BatchInfo represents common information for all messages inside data batch
type BatchInfo struct {
sessionID uint64
id uint64
topic string
timestamp int64
}
func NewBatchInfo(sessID uint64, topic string, id uint64, ts int64) *BatchInfo {
return &BatchInfo{
sessionID: sessID,
id: id,
topic: topic,
timestamp: ts,
}
}
func (b *BatchInfo) SessionID() uint64 {
return b.sessionID
}
func (b *BatchInfo) ID() uint64 {
return b.id
}
func (b *BatchInfo) Timestamp() int64 {
return b.timestamp
}
type message struct {
Timestamp int64
Index uint64
Url string
batch *BatchInfo
}
func (m *message) Batch() *BatchInfo {
return m.batch
}
func (m *message) Meta() *message {
return m
}
func (m *message) SetMeta(origin *message) {
m.batch = origin.batch
m.Timestamp = origin.Timestamp
m.Index = origin.Index
m.Url = origin.Url
}
func (m *message) SessionID() uint64 {
return m.batch.sessionID
}
func (m *message) SetSessionID(sessID uint64) {
if m.batch == nil {
m.batch = &BatchInfo{}
}
m.batch.sessionID = sessID
}

File diff suppressed because it is too large Load diff

View file

@ -67,3 +67,10 @@ func (m *RawMessage) TypeID() int {
func (m *RawMessage) Meta() *message {
return m.meta
}
func (m *RawMessage) SessionID() uint64 {
if m.meta != nil {
return m.meta.SessionID()
}
return 0
}

File diff suppressed because it is too large Load diff

View file

@ -1,12 +1,13 @@
package queue
import (
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/redisstream"
)
func NewConsumer(group string, topics []string, handler types.MessageHandler, _ bool, _ int) types.Consumer {
return redisstream.NewConsumer(group, topics, handler)
func NewConsumer(group string, topics []string, iterator messages.MessageIterator, _ bool, _ int) types.Consumer {
return redisstream.NewConsumer(group, topics, iterator)
}
func NewProducer(_ int, _ bool) types.Producer {

View file

@ -1,12 +0,0 @@
package queue
import (
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue/types"
)
func NewMessageConsumer(group string, topics []string, handler types.RawMessageHandler, autoCommit bool, messageSizeLimit int) types.Consumer {
return NewConsumer(group, topics, func(sessionID uint64, value []byte, meta *types.Meta) {
handler(sessionID, messages.NewIterator(value), meta)
}, autoCommit, messageSizeLimit)
}

View file

@ -1,30 +1,17 @@
package types
import (
"openreplay/backend/pkg/messages"
)
// Consumer reads batches of session data from queue (redis or kafka)
type Consumer interface {
ConsumeNext() error
Commit() error
CommitBack(gap int64) error
Commit() error
Close()
HasFirstPartition() bool
}
// Producer sends batches of session data to queue (redis or kafka)
type Producer interface {
Produce(topic string, key uint64, value []byte) error
ProduceToPartition(topic string, partition, key uint64, value []byte) error
Close(timeout int)
Flush(timeout int)
Close(timeout int)
}
type Meta struct {
ID uint64
Topic string
Timestamp int64
}
type MessageHandler func(uint64, []byte, *Meta)
type DecodedMessageHandler func(uint64, messages.Message, *Meta)
type RawMessageHandler func(uint64, messages.Iterator, *Meta)

View file

@ -3,6 +3,7 @@ package redisstream
import (
"log"
"net"
"openreplay/backend/pkg/messages"
"sort"
"strconv"
"strings"
@ -10,8 +11,6 @@ import (
_redis "github.com/go-redis/redis"
"github.com/pkg/errors"
"openreplay/backend/pkg/queue/types"
)
type idsInfo struct {
@ -21,16 +20,16 @@ type idsInfo struct {
type streamPendingIDsMap map[string]*idsInfo
type Consumer struct {
redis *_redis.Client
streams []string
group string
messageHandler types.MessageHandler
idsPending streamPendingIDsMap
lastTs int64
autoCommit bool
redis *_redis.Client
streams []string
group string
messageIterator messages.MessageIterator
idsPending streamPendingIDsMap
lastTs int64
autoCommit bool
}
func NewConsumer(group string, streams []string, messageHandler types.MessageHandler) *Consumer {
func NewConsumer(group string, streams []string, messageIterator messages.MessageIterator) *Consumer {
redis := getRedisClient()
for _, stream := range streams {
err := redis.XGroupCreateMkStream(stream, group, "0").Err()
@ -52,12 +51,12 @@ func NewConsumer(group string, streams []string, messageHandler types.MessageHan
}
return &Consumer{
redis: redis,
messageHandler: messageHandler,
streams: streams,
group: group,
autoCommit: true,
idsPending: idsPending,
redis: redis,
messageIterator: messageIterator,
streams: streams,
group: group,
autoCommit: true,
idsPending: idsPending,
}
}
@ -102,11 +101,8 @@ func (c *Consumer) ConsumeNext() error {
if idx > 0x1FFF {
return errors.New("Too many messages per ms in redis")
}
c.messageHandler(sessionID, []byte(valueString), &types.Meta{
Topic: r.Stream,
Timestamp: int64(ts),
ID: ts<<13 | (idx & 0x1FFF), // Max: 4096 messages/ms for 69 years
})
bID := ts<<13 | (idx & 0x1FFF) // Max: 4096 messages/ms for 69 years
c.messageIterator.Iterate([]byte(valueString), messages.NewBatchInfo(sessionID, r.Stream, bID, int64(ts)))
if c.autoCommit {
if err = c.redis.XAck(r.Stream, c.group, m.ID).Err(); err != nil {
return errors.Wrapf(err, "Acknoledgment error for messageID %v", m.ID)
@ -161,7 +157,3 @@ func (c *Consumer) CommitBack(gap int64) error {
func (c *Consumer) Close() {
// noop
}
func (c *Consumer) HasFirstPartition() bool {
return false
}

View file

@ -66,6 +66,7 @@ func (b *builder) handleMessage(message Message, messageID uint64) {
b.lastSystemTime = time.Now()
for _, p := range b.processors {
if rm := p.Handle(message, messageID, b.timestamp); rm != nil {
rm.Meta().SetMeta(message.Meta())
b.readyMsgs = append(b.readyMsgs, rm)
}
}

View file

@ -30,7 +30,9 @@ func (m *builderMap) GetBuilder(sessionID uint64) *builder {
return b
}
func (m *builderMap) HandleMessage(sessionID uint64, msg Message, messageID uint64) {
func (m *builderMap) HandleMessage(msg Message) {
sessionID := msg.SessionID()
messageID := msg.Meta().Index
b := m.GetBuilder(sessionID)
b.handleMessage(msg, messageID)
}
@ -39,6 +41,7 @@ func (m *builderMap) iterateSessionReadyMessages(sessionID uint64, b *builder, i
if b.ended || b.lastSystemTime.Add(FORCE_DELETE_TIMEOUT).Before(time.Now()) {
for _, p := range b.processors {
if rm := p.Build(); rm != nil {
rm.Meta().SetSessionID(sessionID)
b.readyMsgs = append(b.readyMsgs, rm)
}
}

View file

@ -6,7 +6,8 @@ import (
"openreplay/backend/pkg/messages"
)
func (mi *Saver) InsertMessage(sessionID uint64, msg messages.Message) error {
func (mi *Saver) InsertMessage(msg messages.Message) error {
sessionID := msg.SessionID()
switch m := msg.(type) {
// Common
case *messages.Metadata:

View file

@ -38,6 +38,6 @@ func (si *Saver) InsertStats(session *types.Session, msg messages.Message) error
return nil
}
func (si *Saver) CommitStats(optimize bool) error {
func (si *Saver) CommitStats() error {
return si.ch.Commit()
}

View file

@ -55,19 +55,16 @@ func NewSessionFinder(cfg *config.Config, stg *storage.Storage) (SessionFinder,
done: make(chan struct{}, 1),
}
finder.producer = queue.NewProducer(cfg.MessageSizeLimit, false)
finder.consumer = queue.NewMessageConsumer(
finder.consumer = queue.NewConsumer(
cfg.GroupFailover,
[]string{
cfg.TopicFailover,
},
func(sessionID uint64, iter messages.Iterator, meta *types.Meta) {
for iter.Next() {
if iter.Type() == 127 {
m := iter.Message().Decode().(*messages.SessionSearch)
finder.findSession(sessionID, m.Timestamp, m.Partition)
}
}
},
messages.NewMessageIterator(
func(msg messages.Message) {
m := msg.(*messages.SessionSearch)
finder.findSession(m.SessionID(), m.Timestamp, m.Partition)
}, []int{messages.MsgSessionSearch}, true),
true,
cfg.MessageSizeLimit,
)
@ -128,7 +125,7 @@ func (s *sessionFinderImpl) nextPartition(partition uint64) uint64 {
// Create sessionSearch message and send it to queue
func (s *sessionFinderImpl) sendSearchMessage(sessionID, timestamp, partition uint64) {
msg := &messages.SessionSearch{Timestamp: timestamp, Partition: partition}
if err := s.producer.ProduceToPartition(s.topicName, partition, sessionID, messages.Encode(msg)); err != nil {
if err := s.producer.ProduceToPartition(s.topicName, partition, sessionID, msg.Encode()); err != nil {
log.Printf("can't send SessionSearch to failover topic: %s; sessID: %d", err, sessionID)
}
}

View file

@ -2,6 +2,7 @@ package kafka
import (
"log"
"openreplay/backend/pkg/messages"
"os"
"time"
@ -9,16 +10,15 @@ import (
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
"openreplay/backend/pkg/env"
"openreplay/backend/pkg/queue/types"
)
type Message = kafka.Message
type Consumer struct {
c *kafka.Consumer
messageHandler types.MessageHandler
commitTicker *time.Ticker
pollTimeout uint
c *kafka.Consumer
messageIterator messages.MessageIterator
commitTicker *time.Ticker
pollTimeout uint
lastReceivedPrtTs map[int32]int64
}
@ -26,7 +26,7 @@ type Consumer struct {
func NewConsumer(
group string,
topics []string,
messageHandler types.MessageHandler,
messageIterator messages.MessageIterator,
autoCommit bool,
messageSizeLimit int,
) *Consumer {
@ -70,7 +70,7 @@ func NewConsumer(
return &Consumer{
c: c,
messageHandler: messageHandler,
messageIterator: messageIterator,
commitTicker: commitTicker,
pollTimeout: 200,
lastReceivedPrtTs: make(map[int32]int64),
@ -171,11 +171,13 @@ func (consumer *Consumer) ConsumeNext() error {
return errors.Wrap(e.TopicPartition.Error, "Consumer Partition Error")
}
ts := e.Timestamp.UnixMilli()
consumer.messageHandler(decodeKey(e.Key), e.Value, &types.Meta{
Topic: *(e.TopicPartition.Topic),
ID: uint64(e.TopicPartition.Offset),
Timestamp: ts,
})
consumer.messageIterator.Iterate(
e.Value,
messages.NewBatchInfo(
decodeKey(e.Key),
*(e.TopicPartition.Topic),
uint64(e.TopicPartition.Offset),
ts))
consumer.lastReceivedPrtTs[e.TopicPartition.Partition] = ts
case kafka.Error:
if e.Code() == kafka.ErrAllBrokersDown || e.Code() == kafka.ErrMaxPollExceeded {
@ -194,16 +196,3 @@ func (consumer *Consumer) Close() {
log.Printf("Kafka consumer close error: %v", err)
}
}
func (consumer *Consumer) HasFirstPartition() bool {
assigned, err := consumer.c.Assignment()
if err != nil {
return false
}
for _, p := range assigned {
if p.Partition == 1 {
return true
}
}
return false
}

View file

@ -3,12 +3,13 @@ package queue
import (
"openreplay/backend/pkg/kafka"
"openreplay/backend/pkg/license"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue/types"
)
func NewConsumer(group string, topics []string, handler types.MessageHandler, autoCommit bool, messageSizeLimit int) types.Consumer {
func NewConsumer(group string, topics []string, iterator messages.MessageIterator, autoCommit bool, messageSizeLimit int) types.Consumer {
license.CheckLicense()
return kafka.NewConsumer(group, topics, handler, autoCommit, messageSizeLimit)
return kafka.NewConsumer(group, topics, iterator, autoCommit, messageSizeLimit)
}
func NewProducer(messageSizeLimit int, useBatch bool) types.Producer {

View file

@ -469,4 +469,9 @@ message 79, 'Zustand' do
string 'State'
end
message 127, 'SessionSearch' do
uint 'Timestamp'
uint 'Partition'
end
# 80 -- 90 reserved

View file

@ -113,7 +113,7 @@ $ids = []
$messages = []
def message(id, name, opts = {}, &block)
raise "id duplicated #{name}" if $ids.include? id
raise "id is too big #{name}" if id > 120
raise "id is too big #{name}" if id > 127
$ids << id
opts[:id] = id
opts[:name] = name