feat(backend): added new trigger which sink should send to storage after session end received (#539)
Co-authored-by: Alexander Zavorotynskiy <alexander@openreplay.com>
This commit is contained in:
parent
883a6f6909
commit
6b5d9d3799
5 changed files with 60 additions and 2 deletions
|
|
@ -62,6 +62,7 @@ func main() {
|
|||
[]string{
|
||||
cfg.TopicRawIOS,
|
||||
cfg.TopicRawWeb,
|
||||
cfg.TopicTrigger,
|
||||
},
|
||||
func(sessionID uint64, message Message, _ *types.Meta) {
|
||||
// Process assets
|
||||
|
|
@ -71,6 +72,16 @@ func main() {
|
|||
|
||||
// Filter message
|
||||
typeID := message.TypeID()
|
||||
|
||||
// Send SessionFinished trigger to storage service
|
||||
switch m := message.(type) {
|
||||
case *SessionEnd:
|
||||
msg := &SessionFinished{Timestamp: m.Timestamp}
|
||||
if err := producer.Produce(cfg.TopicTrigger, sessionID, Encode(msg)); err != nil {
|
||||
log.Printf("can't send SessionFinished to trigger topic: %s; sessID: %d", err, sessionID)
|
||||
}
|
||||
return
|
||||
}
|
||||
if !IsReplayerType(typeID) {
|
||||
return
|
||||
}
|
||||
|
|
|
|||
|
|
@ -44,7 +44,7 @@ func main() {
|
|||
},
|
||||
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
|
||||
switch msg.(type) {
|
||||
case *messages.SessionEnd:
|
||||
case *messages.SessionFinished:
|
||||
srv.UploadKey(strconv.FormatUint(sessionID, 10), 5)
|
||||
// Log timestamp of last processed session
|
||||
counter.Update(sessionID, time.UnixMilli(meta.Timestamp))
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ type Config struct {
|
|||
TopicRawWeb string
|
||||
TopicRawIOS string
|
||||
TopicCache string
|
||||
TopicTrigger string
|
||||
CacheAssets bool
|
||||
AssetsOrigin string
|
||||
ProducerCloseTimeout int
|
||||
|
|
@ -24,6 +25,7 @@ func New() *Config {
|
|||
TopicRawWeb: env.String("TOPIC_RAW_WEB"),
|
||||
TopicRawIOS: env.String("TOPIC_RAW_IOS"),
|
||||
TopicCache: env.String("TOPIC_CACHE"),
|
||||
TopicTrigger: env.String("TOPIC_TRIGGER"),
|
||||
CacheAssets: env.Bool("CACHE_ASSETS"),
|
||||
AssetsOrigin: env.String("ASSETS_ORIGIN"),
|
||||
ProducerCloseTimeout: 15000,
|
||||
|
|
|
|||
|
|
@ -1,7 +1,9 @@
|
|||
package messages
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
|
@ -14,7 +16,15 @@ func ReadBatchReader(reader io.Reader, messageHandler func(Message)) error {
|
|||
if err == io.EOF {
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return errors.Wrapf(err, "Batch Message decoding error on message with index %v", index)
|
||||
if strings.HasPrefix(err.Error(), "Unknown message code:") {
|
||||
code := strings.TrimPrefix(err.Error(), "Unknown message code: ")
|
||||
msg, err = DecodeExtraMessage(code, reader)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't decode msg: %s", err)
|
||||
}
|
||||
} else {
|
||||
return errors.Wrapf(err, "Batch Message decoding error on message with index %v", index)
|
||||
}
|
||||
}
|
||||
msg = transformDeprecated(msg)
|
||||
|
||||
|
|
|
|||
35
backend/pkg/messages/trigger.go
Normal file
35
backend/pkg/messages/trigger.go
Normal file
|
|
@ -0,0 +1,35 @@
|
|||
package messages
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
)
|
||||
|
||||
type SessionFinished struct {
|
||||
message
|
||||
Timestamp uint64
|
||||
}
|
||||
|
||||
func (msg *SessionFinished) Encode() []byte {
|
||||
buf := make([]byte, 11)
|
||||
buf[0] = 127
|
||||
p := 1
|
||||
p = WriteUint(msg.Timestamp, buf, p)
|
||||
return buf[:p]
|
||||
}
|
||||
|
||||
func (msg *SessionFinished) TypeID() int {
|
||||
return 127
|
||||
}
|
||||
|
||||
func DecodeExtraMessage(code string, reader io.Reader) (Message, error) {
|
||||
var err error
|
||||
if code != "127" {
|
||||
return nil, fmt.Errorf("unknown message code: %s", code)
|
||||
}
|
||||
trigger := &SessionFinished{}
|
||||
if trigger.Timestamp, err = ReadUint(reader); err != nil {
|
||||
return nil, fmt.Errorf("can't read message timestamp: %s", err)
|
||||
}
|
||||
return trigger, nil
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue