diff --git a/backend/pkg/queue/import.go b/backend/pkg/queue/import.go index 2bca9c8fd..623d301ca 100644 --- a/backend/pkg/queue/import.go +++ b/backend/pkg/queue/import.go @@ -1,15 +1,14 @@ package queue import ( - "openreplay/backend/pkg/redisstream" "openreplay/backend/pkg/queue/types" + "openreplay/backend/pkg/redisstream" ) -func NewConsumer(group string, topics []string, handler types.MessageHandler) types.Consumer { +func NewConsumer(group string, topics []string, handler types.MessageHandler, _ bool) types.Consumer { return redisstream.NewConsumer(group, topics, handler) } func NewProducer() types.Producer { return redisstream.NewProducer() } - diff --git a/backend/pkg/queue/messages.go b/backend/pkg/queue/messages.go index eca4a4d49..0ab184ee6 100644 --- a/backend/pkg/queue/messages.go +++ b/backend/pkg/queue/messages.go @@ -7,13 +7,12 @@ import ( "openreplay/backend/pkg/queue/types" ) - -func NewMessageConsumer(group string, topics []string, handler types.DecodedMessageHandler) types.Consumer { +func NewMessageConsumer(group string, topics []string, handler types.DecodedMessageHandler, autoCommit bool) types.Consumer { return NewConsumer(group, topics, func(sessionID uint64, value []byte, meta *types.Meta) { if err := messages.ReadBatch(value, func(msg messages.Message) { handler(sessionID, msg, meta) }); err != nil { log.Printf("Decode error: %v\n", err) } - }) + }, autoCommit) } diff --git a/backend/pkg/queue/types/types.go b/backend/pkg/queue/types/types.go index b671323d0..600babe25 100644 --- a/backend/pkg/queue/types/types.go +++ b/backend/pkg/queue/types/types.go @@ -6,26 +6,22 @@ import ( type Consumer interface { ConsumeNext() error - DisableAutoCommit() Commit() error CommitBack(gap int64) error Close() } - type Producer interface { Produce(topic string, key uint64, value []byte) error Close(timeout int) Flush(timeout int) } - type Meta struct { - ID uint64 - Topic string + ID uint64 + Topic string Timestamp int64 } type MessageHandler func(uint64, []byte, *Meta) type DecodedMessageHandler func(uint64, messages.Message, *Meta) - diff --git a/backend/pkg/redisstream/consumer.go b/backend/pkg/redisstream/consumer.go index 164ee9236..d32972981 100644 --- a/backend/pkg/redisstream/consumer.go +++ b/backend/pkg/redisstream/consumer.go @@ -1,24 +1,22 @@ package redisstream import ( + "log" "net" + "sort" "strconv" "strings" - "log" - "sort" "time" - "github.com/pkg/errors" _redis "github.com/go-redis/redis" + "github.com/pkg/errors" "openreplay/backend/pkg/queue/types" ) - - -type idsInfo struct{ - id []string - ts []int64 +type idsInfo struct { + id []string + ts []int64 } type streamPendingIDsMap map[string]*idsInfo @@ -41,26 +39,25 @@ func NewConsumer(group string, streams []string, messageHandler types.MessageHan } } - idsPending := make(streamPendingIDsMap) streamsCount := len(streams) for i := 0; i < streamsCount; i++ { - // ">" is for never-delivered messages. - // Otherwise - never acknoledged only + // ">" is for never-delivered messages. + // Otherwise - never acknoledged only // TODO: understand why in case of "0" it eats 100% cpu - streams = append(streams, ">") - + streams = append(streams, ">") + idsPending[streams[i]] = new(idsInfo) } return &Consumer{ - redis: redis, + redis: redis, messageHandler: messageHandler, - streams: streams, - group: group, - autoCommit: true, - idsPending: idsPending, + streams: streams, + group: group, + autoCommit: true, + idsPending: idsPending, } } @@ -106,9 +103,9 @@ func (c *Consumer) ConsumeNext() error { return errors.New("Too many messages per ms in redis") } c.messageHandler(sessionID, []byte(valueString), &types.Meta{ - Topic: r.Stream, + Topic: r.Stream, Timestamp: int64(ts), - ID: ts << 13 | (idx & 0x1FFF), // Max: 4096 messages/ms for 69 years + ID: ts<<13 | (idx & 0x1FFF), // Max: 4096 messages/ms for 69 years }) if c.autoCommit { if err = c.redis.XAck(r.Stream, c.group, m.ID).Err(); err != nil { @@ -119,7 +116,7 @@ func (c *Consumer) ConsumeNext() error { c.idsPending[r.Stream].id = append(c.idsPending[r.Stream].id, m.ID) c.idsPending[r.Stream].ts = append(c.idsPending[r.Stream].ts, int64(ts)) } - + } } return nil @@ -158,13 +155,9 @@ func (c *Consumer) CommitBack(gap int64) error { c.idsPending[stream].id = idsInfo.id[maxI:] c.idsPending[stream].ts = idsInfo.ts[maxI:] } - return nil -} - -func (c *Consumer) DisableAutoCommit() { - //c.autoCommit = false + return nil } func (c *Consumer) Close() { // noop -} \ No newline at end of file +} diff --git a/backend/services/assets/main.go b/backend/services/assets/main.go index 450dfc83c..664dc5b09 100644 --- a/backend/services/assets/main.go +++ b/backend/services/assets/main.go @@ -18,7 +18,7 @@ import ( func main() { log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) - GROUP_CACHE := env.String("GROUP_CACHE") + GROUP_CACHE := env.String("GROUP_CACHE") TOPIC_CACHE := env.String("TOPIC_CACHE") cacher := cacher.NewCacher( @@ -29,10 +29,10 @@ func main() { ) consumer := queue.NewMessageConsumer( - GROUP_CACHE, - []string{ TOPIC_CACHE }, + GROUP_CACHE, + []string{TOPIC_CACHE}, func(sessionID uint64, message messages.Message, e *types.Meta) { - switch msg := message.(type) { + switch msg := message.(type) { case *messages.AssetCache: cacher.CacheURL(sessionID, msg.URL) case *messages.ErrorEvent: @@ -47,17 +47,17 @@ func main() { for _, source := range sourceList { cacher.CacheJSFile(source) } - } + } }, + true, ) - tick := time.Tick(20 * time.Minute) sigchan := make(chan os.Signal, 1) - signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) + signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) - log.Printf("Cacher service started\n") + log.Printf("Cacher service started\n") for { select { case sig := <-sigchan: @@ -74,4 +74,4 @@ func main() { } } } -} \ No newline at end of file +} diff --git a/backend/services/db/main.go b/backend/services/db/main.go index d6190a4f0..2ad6e4aa8 100644 --- a/backend/services/db/main.go +++ b/backend/services/db/main.go @@ -74,8 +74,8 @@ func main() { } }) }, + false, ) - consumer.DisableAutoCommit() sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) diff --git a/backend/services/ender/main.go b/backend/services/ender/main.go index e8d739f0e..f0f139dce 100644 --- a/backend/services/ender/main.go +++ b/backend/services/ender/main.go @@ -8,12 +8,12 @@ import ( "os/signal" "syscall" - "openreplay/backend/pkg/intervals" "openreplay/backend/pkg/env" + "openreplay/backend/pkg/intervals" + logger "openreplay/backend/pkg/log" "openreplay/backend/pkg/messages" "openreplay/backend/pkg/queue" "openreplay/backend/pkg/queue/types" - logger "openreplay/backend/pkg/log" "openreplay/backend/services/ender/builder" ) @@ -29,24 +29,24 @@ func main() { producer := queue.NewProducer() consumer := queue.NewMessageConsumer( - GROUP_EVENTS, - []string{ + GROUP_EVENTS, + []string{ env.String("TOPIC_RAW_WEB"), env.String("TOPIC_RAW_IOS"), - }, + }, func(sessionID uint64, msg messages.Message, meta *types.Meta) { statsLogger.HandleAndLog(sessionID, meta) builderMap.HandleMessage(sessionID, msg, msg.Meta().Index) }, + false, ) - consumer.DisableAutoCommit() - + tick := time.Tick(intervals.EVENTS_COMMIT_INTERVAL * time.Millisecond) sigchan := make(chan os.Signal, 1) - signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) + signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) - log.Printf("Ender service started\n") + log.Printf("Ender service started\n") for { select { case sig := <-sigchan: @@ -55,7 +55,7 @@ func main() { consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP) consumer.Close() os.Exit(0) - case <- tick: + case <-tick: builderMap.IterateReadyMessages(time.Now().UnixNano()/1e6, func(sessionID uint64, readyMsg messages.Message) { producer.Produce(TOPIC_TRIGGER, sessionID, messages.Encode(readyMsg)) }) @@ -69,4 +69,3 @@ func main() { } } } - diff --git a/backend/services/sink/main.go b/backend/services/sink/main.go index 5893e93e6..a649bb6ef 100644 --- a/backend/services/sink/main.go +++ b/backend/services/sink/main.go @@ -1,8 +1,8 @@ package main import ( - "log" "encoding/binary" + "log" "time" "os" @@ -10,67 +10,64 @@ import ( "syscall" "openreplay/backend/pkg/env" + . "openreplay/backend/pkg/messages" "openreplay/backend/pkg/queue" "openreplay/backend/pkg/queue/types" - . "openreplay/backend/pkg/messages" ) - - func main() { log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) - FS_DIR := env.String("FS_DIR"); + FS_DIR := env.String("FS_DIR") if _, err := os.Stat(FS_DIR); os.IsNotExist(err) { log.Fatalf("%v doesn't exist. %v", FS_DIR, err) } writer := NewWriter(env.Uint16("FS_ULIMIT"), FS_DIR) - count := 0 + count := 0 consumer := queue.NewMessageConsumer( env.String("GROUP_SINK"), - []string{ + []string{ env.String("TOPIC_RAW_WEB"), env.String("TOPIC_RAW_IOS"), - }, - func(sessionID uint64, message Message, _ *types.Meta) { - //typeID, err := GetMessageTypeID(value) - // if err != nil { - // log.Printf("Message type decoding error: %v", err) - // return - // } - typeID := message.Meta().TypeID - if !IsReplayerType(typeID) { - return - } + }, + func(sessionID uint64, message Message, _ *types.Meta) { + //typeID, err := GetMessageTypeID(value) + // if err != nil { + // log.Printf("Message type decoding error: %v", err) + // return + // } + typeID := message.Meta().TypeID + if !IsReplayerType(typeID) { + return + } - count++ + count++ - value := message.Encode() - var data []byte - if IsIOSType(typeID) { - data = value - } else { + value := message.Encode() + var data []byte + if IsIOSType(typeID) { + data = value + } else { data = make([]byte, len(value)+8) copy(data[8:], value[:]) binary.LittleEndian.PutUint64(data[0:], message.Meta().Index) - } - if err := writer.Write(sessionID, data); err != nil { + } + if err := writer.Write(sessionID, data); err != nil { log.Printf("Writer error: %v\n", err) } - }, + }, + false, ) - consumer.DisableAutoCommit() - sigchan := make(chan os.Signal, 1) - signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) + signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) - tick := time.Tick(30 * time.Second) + tick := time.Tick(30 * time.Second) - log.Printf("Sink service started\n") + log.Printf("Sink service started\n") for { select { case sig := <-sigchan: @@ -85,7 +82,7 @@ func main() { log.Printf("%v messages during 30 sec", count) count = 0 - + consumer.Commit() default: err := consumer.ConsumeNext() @@ -96,4 +93,3 @@ func main() { } } - diff --git a/backend/services/storage/main.go b/backend/services/storage/main.go index 5033fb845..cd585f10e 100644 --- a/backend/services/storage/main.go +++ b/backend/services/storage/main.go @@ -2,45 +2,41 @@ package main import ( "log" - "time" "os" "strconv" + "time" "os/signal" "syscall" "openreplay/backend/pkg/env" - "openreplay/backend/pkg/storage" "openreplay/backend/pkg/messages" "openreplay/backend/pkg/queue" "openreplay/backend/pkg/queue/types" + "openreplay/backend/pkg/storage" ) - - func main() { log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) - - storageWeb := storage.NewS3(env.String("AWS_REGION_WEB"), env.String("S3_BUCKET_WEB")) - //storageIos := storage.NewS3(env.String("AWS_REGION_IOS"), env.String("S3_BUCKET_IOS")) + storage := storage.NewS3(env.String("AWS_REGION_WEB"), env.String("S3_BUCKET_WEB")) FS_DIR := env.String("FS_DIR") FS_CLEAN_HRS := env.Int("FS_CLEAN_HRS") - var uploadKey func(string, int, *storage.S3) - uploadKey = func(key string, retryCount int, s *storage.S3) { + var uploadKey func(string, int) + uploadKey = func(key string, retryCount int) { if retryCount <= 0 { - return; + return } file, err := os.Open(FS_DIR + "/" + key) defer file.Close() if err != nil { log.Printf("File error: %v; Will retry %v more time(s)\n", err, retryCount) time.AfterFunc(2*time.Minute, func() { - uploadKey(key, retryCount - 1, s) + uploadKey(key, retryCount-1) }) } else { - if err := s.Upload(gzipFile(file), key, "application/octet-stream", true); err != nil { + if err := storage.Upload(gzipFile(file), key, "application/octet-stream", true); err != nil { log.Fatalf("Storage upload error: %v\n", err) } } @@ -48,27 +44,24 @@ func main() { consumer := queue.NewMessageConsumer( env.String("GROUP_STORAGE"), - []string{ + []string{ env.String("TOPIC_TRIGGER"), - }, - func(sessionID uint64, msg messages.Message, meta *types.Meta) { - switch msg.(type) { - case *messages.SessionEnd: - uploadKey(strconv.FormatUint(sessionID, 10), 5, storageWeb) - //case *messages.IOSSessionEnd: - // uploadKey(strconv.FormatUint(sessionID, 10), 5, storageIos) - } - }, + }, + func(sessionID uint64, msg messages.Message, meta *types.Meta) { + switch msg.(type) { + case *messages.SessionEnd: + uploadKey(strconv.FormatUint(sessionID, 10), 5) + } + }, + true, ) sigchan := make(chan os.Signal, 1) - signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) + signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) + cleanTick := time.Tick(time.Duration(FS_CLEAN_HRS) * time.Hour) - cleanTick := time.Tick(time.Duration(FS_CLEAN_HRS) * time.Hour) - - - log.Printf("Storage service started\n") + log.Printf("Storage service started\n") for { select { case sig := <-sigchan: @@ -85,4 +78,3 @@ func main() { } } } - diff --git a/ee/backend/pkg/kafka/consumer.go b/ee/backend/pkg/kafka/consumer.go index ca0544923..82aa56d50 100644 --- a/ee/backend/pkg/kafka/consumer.go +++ b/ee/backend/pkg/kafka/consumer.go @@ -25,7 +25,12 @@ type Consumer struct { lastKafkaEventTs int64 } -func NewConsumer(group string, topics []string, messageHandler types.MessageHandler) *Consumer { +func NewConsumer( + group string, + topics []string, + messageHandler types.MessageHandler, + autoCommit bool, +) *Consumer { protocol := "plaintext" if env.Bool("KAFKA_USE_SSL") { protocol = "ssl" @@ -53,18 +58,19 @@ func NewConsumer(group string, topics []string, messageHandler types.MessageHand log.Fatalln(err) } + var commitTicker *time.Ticker + if autoCommit { + commitTicker = time.NewTicker(2 * time.Minute) + } + return &Consumer{ c: c, messageHandler: messageHandler, - commitTicker: time.NewTicker(2 * time.Minute), + commitTicker: commitTicker, pollTimeout: 200, } } -func (consumer *Consumer) DisableAutoCommit() { - consumer.commitTicker.Stop() -} - func (consumer *Consumer) Commit() error { consumer.c.Commit() // TODO: return error if it is not "No offset stored" return nil @@ -128,10 +134,12 @@ func (consumer *Consumer) ConsumeNext() error { return nil } - select { - case <-consumer.commitTicker.C: - consumer.Commit() - default: + if consumer.commitTicker != nil { + select { + case <-consumer.commitTicker.C: + consumer.Commit() + default: + } } switch e := ev.(type) { diff --git a/ee/backend/pkg/queue/import.go b/ee/backend/pkg/queue/import.go index abff07e9a..e95eb11e5 100644 --- a/ee/backend/pkg/queue/import.go +++ b/ee/backend/pkg/queue/import.go @@ -2,17 +2,16 @@ package queue import ( "openreplay/backend/pkg/kafka" - "openreplay/backend/pkg/queue/types" "openreplay/backend/pkg/license" + "openreplay/backend/pkg/queue/types" ) -func NewConsumer(group string, topics []string, handler types.MessageHandler) types.Consumer { +func NewConsumer(group string, topics []string, handler types.MessageHandler, autoCommit bool) types.Consumer { license.CheckLicense() - return kafka.NewConsumer(group, topics, handler) + return kafka.NewConsumer(group, topics, handler, autoCommit) } func NewProducer() types.Producer { license.CheckLicense() return kafka.NewProducer() } -