diff --git a/backend/cmd/sink/main.go b/backend/cmd/sink/main.go index b6999dca9..0dff365b0 100644 --- a/backend/cmd/sink/main.go +++ b/backend/cmd/sink/main.go @@ -62,6 +62,7 @@ func main() { []string{ cfg.TopicRawIOS, cfg.TopicRawWeb, + cfg.TopicTrigger, }, func(sessionID uint64, message Message, _ *types.Meta) { // Process assets @@ -71,6 +72,16 @@ func main() { // Filter message typeID := message.TypeID() + + // Send SessionFinished trigger to storage service + switch m := message.(type) { + case *SessionEnd: + msg := &SessionFinished{Timestamp: m.Timestamp} + if err := producer.Produce(cfg.TopicTrigger, sessionID, Encode(msg)); err != nil { + log.Printf("can't send SessionFinished to trigger topic: %s; sessID: %d", err, sessionID) + } + return + } if !IsReplayerType(typeID) { return } diff --git a/backend/cmd/storage/main.go b/backend/cmd/storage/main.go index 2f92d0259..db2305d53 100644 --- a/backend/cmd/storage/main.go +++ b/backend/cmd/storage/main.go @@ -44,7 +44,7 @@ func main() { }, func(sessionID uint64, msg messages.Message, meta *types.Meta) { switch msg.(type) { - case *messages.SessionEnd: + case *messages.SessionFinished: srv.UploadKey(strconv.FormatUint(sessionID, 10), 5) // Log timestamp of last processed session counter.Update(sessionID, time.UnixMilli(meta.Timestamp)) diff --git a/backend/internal/config/sink/config.go b/backend/internal/config/sink/config.go index a78bfba63..f87babeb9 100644 --- a/backend/internal/config/sink/config.go +++ b/backend/internal/config/sink/config.go @@ -11,6 +11,7 @@ type Config struct { TopicRawWeb string TopicRawIOS string TopicCache string + TopicTrigger string CacheAssets bool AssetsOrigin string ProducerCloseTimeout int @@ -24,6 +25,7 @@ func New() *Config { TopicRawWeb: env.String("TOPIC_RAW_WEB"), TopicRawIOS: env.String("TOPIC_RAW_IOS"), TopicCache: env.String("TOPIC_CACHE"), + TopicTrigger: env.String("TOPIC_TRIGGER"), CacheAssets: env.Bool("CACHE_ASSETS"), AssetsOrigin: env.String("ASSETS_ORIGIN"), ProducerCloseTimeout: 15000, diff --git a/backend/pkg/messages/batch.go b/backend/pkg/messages/batch.go index 9600f24a5..fe283d25d 100644 --- a/backend/pkg/messages/batch.go +++ b/backend/pkg/messages/batch.go @@ -1,7 +1,9 @@ package messages import ( + "fmt" "io" + "strings" "github.com/pkg/errors" ) @@ -14,7 +16,15 @@ func ReadBatchReader(reader io.Reader, messageHandler func(Message)) error { if err == io.EOF { return nil } else if err != nil { - return errors.Wrapf(err, "Batch Message decoding error on message with index %v", index) + if strings.HasPrefix(err.Error(), "Unknown message code:") { + code := strings.TrimPrefix(err.Error(), "Unknown message code: ") + msg, err = DecodeExtraMessage(code, reader) + if err != nil { + return fmt.Errorf("can't decode msg: %s", err) + } + } else { + return errors.Wrapf(err, "Batch Message decoding error on message with index %v", index) + } } msg = transformDeprecated(msg) diff --git a/backend/pkg/messages/trigger.go b/backend/pkg/messages/trigger.go new file mode 100644 index 000000000..0fe33340e --- /dev/null +++ b/backend/pkg/messages/trigger.go @@ -0,0 +1,35 @@ +package messages + +import ( + "fmt" + "io" +) + +type SessionFinished struct { + message + Timestamp uint64 +} + +func (msg *SessionFinished) Encode() []byte { + buf := make([]byte, 11) + buf[0] = 127 + p := 1 + p = WriteUint(msg.Timestamp, buf, p) + return buf[:p] +} + +func (msg *SessionFinished) TypeID() int { + return 127 +} + +func DecodeExtraMessage(code string, reader io.Reader) (Message, error) { + var err error + if code != "127" { + return nil, fmt.Errorf("unknown message code: %s", code) + } + trigger := &SessionFinished{} + if trigger.Timestamp, err = ReadUint(reader); err != nil { + return nil, fmt.Errorf("can't read message timestamp: %s", err) + } + return trigger, nil +}