codefix(backend): autoCommit as init consumer parameter

This commit is contained in:
ShiKhu 2022-03-30 18:27:16 +02:00
parent ae6f18fafb
commit 8c97576970
11 changed files with 117 additions and 136 deletions

View file

@ -1,15 +1,14 @@
package queue
import (
"openreplay/backend/pkg/redisstream"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/redisstream"
)
func NewConsumer(group string, topics []string, handler types.MessageHandler) types.Consumer {
func NewConsumer(group string, topics []string, handler types.MessageHandler, _ bool) types.Consumer {
return redisstream.NewConsumer(group, topics, handler)
}
func NewProducer() types.Producer {
return redisstream.NewProducer()
}

View file

@ -7,13 +7,12 @@ import (
"openreplay/backend/pkg/queue/types"
)
func NewMessageConsumer(group string, topics []string, handler types.DecodedMessageHandler) types.Consumer {
func NewMessageConsumer(group string, topics []string, handler types.DecodedMessageHandler, autoCommit bool) types.Consumer {
return NewConsumer(group, topics, func(sessionID uint64, value []byte, meta *types.Meta) {
if err := messages.ReadBatch(value, func(msg messages.Message) {
handler(sessionID, msg, meta)
}); err != nil {
log.Printf("Decode error: %v\n", err)
}
})
}, autoCommit)
}

View file

@ -6,26 +6,22 @@ import (
type Consumer interface {
ConsumeNext() error
DisableAutoCommit()
Commit() error
CommitBack(gap int64) error
Close()
}
type Producer interface {
Produce(topic string, key uint64, value []byte) error
Close(timeout int)
Flush(timeout int)
}
type Meta struct {
ID uint64
Topic string
ID uint64
Topic string
Timestamp int64
}
type MessageHandler func(uint64, []byte, *Meta)
type DecodedMessageHandler func(uint64, messages.Message, *Meta)

View file

@ -1,24 +1,22 @@
package redisstream
import (
"log"
"net"
"sort"
"strconv"
"strings"
"log"
"sort"
"time"
"github.com/pkg/errors"
_redis "github.com/go-redis/redis"
"github.com/pkg/errors"
"openreplay/backend/pkg/queue/types"
)
type idsInfo struct{
id []string
ts []int64
type idsInfo struct {
id []string
ts []int64
}
type streamPendingIDsMap map[string]*idsInfo
@ -41,26 +39,25 @@ func NewConsumer(group string, streams []string, messageHandler types.MessageHan
}
}
idsPending := make(streamPendingIDsMap)
streamsCount := len(streams)
for i := 0; i < streamsCount; i++ {
// ">" is for never-delivered messages.
// Otherwise - never acknoledged only
// ">" is for never-delivered messages.
// Otherwise - never acknoledged only
// TODO: understand why in case of "0" it eats 100% cpu
streams = append(streams, ">")
streams = append(streams, ">")
idsPending[streams[i]] = new(idsInfo)
}
return &Consumer{
redis: redis,
redis: redis,
messageHandler: messageHandler,
streams: streams,
group: group,
autoCommit: true,
idsPending: idsPending,
streams: streams,
group: group,
autoCommit: true,
idsPending: idsPending,
}
}
@ -106,9 +103,9 @@ func (c *Consumer) ConsumeNext() error {
return errors.New("Too many messages per ms in redis")
}
c.messageHandler(sessionID, []byte(valueString), &types.Meta{
Topic: r.Stream,
Topic: r.Stream,
Timestamp: int64(ts),
ID: ts << 13 | (idx & 0x1FFF), // Max: 4096 messages/ms for 69 years
ID: ts<<13 | (idx & 0x1FFF), // Max: 4096 messages/ms for 69 years
})
if c.autoCommit {
if err = c.redis.XAck(r.Stream, c.group, m.ID).Err(); err != nil {
@ -119,7 +116,7 @@ func (c *Consumer) ConsumeNext() error {
c.idsPending[r.Stream].id = append(c.idsPending[r.Stream].id, m.ID)
c.idsPending[r.Stream].ts = append(c.idsPending[r.Stream].ts, int64(ts))
}
}
}
return nil
@ -158,13 +155,9 @@ func (c *Consumer) CommitBack(gap int64) error {
c.idsPending[stream].id = idsInfo.id[maxI:]
c.idsPending[stream].ts = idsInfo.ts[maxI:]
}
return nil
}
func (c *Consumer) DisableAutoCommit() {
//c.autoCommit = false
return nil
}
func (c *Consumer) Close() {
// noop
}
}

View file

@ -18,7 +18,7 @@ import (
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
GROUP_CACHE := env.String("GROUP_CACHE")
GROUP_CACHE := env.String("GROUP_CACHE")
TOPIC_CACHE := env.String("TOPIC_CACHE")
cacher := cacher.NewCacher(
@ -29,10 +29,10 @@ func main() {
)
consumer := queue.NewMessageConsumer(
GROUP_CACHE,
[]string{ TOPIC_CACHE },
GROUP_CACHE,
[]string{TOPIC_CACHE},
func(sessionID uint64, message messages.Message, e *types.Meta) {
switch msg := message.(type) {
switch msg := message.(type) {
case *messages.AssetCache:
cacher.CacheURL(sessionID, msg.URL)
case *messages.ErrorEvent:
@ -47,17 +47,17 @@ func main() {
for _, source := range sourceList {
cacher.CacheJSFile(source)
}
}
}
},
true,
)
tick := time.Tick(20 * time.Minute)
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
log.Printf("Cacher service started\n")
log.Printf("Cacher service started\n")
for {
select {
case sig := <-sigchan:
@ -74,4 +74,4 @@ func main() {
}
}
}
}
}

View file

@ -74,8 +74,8 @@ func main() {
}
})
},
false,
)
consumer.DisableAutoCommit()
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

View file

@ -8,12 +8,12 @@ import (
"os/signal"
"syscall"
"openreplay/backend/pkg/intervals"
"openreplay/backend/pkg/env"
"openreplay/backend/pkg/intervals"
logger "openreplay/backend/pkg/log"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
logger "openreplay/backend/pkg/log"
"openreplay/backend/services/ender/builder"
)
@ -29,24 +29,24 @@ func main() {
producer := queue.NewProducer()
consumer := queue.NewMessageConsumer(
GROUP_EVENTS,
[]string{
GROUP_EVENTS,
[]string{
env.String("TOPIC_RAW_WEB"),
env.String("TOPIC_RAW_IOS"),
},
},
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
statsLogger.HandleAndLog(sessionID, meta)
builderMap.HandleMessage(sessionID, msg, msg.Meta().Index)
},
false,
)
consumer.DisableAutoCommit()
tick := time.Tick(intervals.EVENTS_COMMIT_INTERVAL * time.Millisecond)
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
log.Printf("Ender service started\n")
log.Printf("Ender service started\n")
for {
select {
case sig := <-sigchan:
@ -55,7 +55,7 @@ func main() {
consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP)
consumer.Close()
os.Exit(0)
case <- tick:
case <-tick:
builderMap.IterateReadyMessages(time.Now().UnixNano()/1e6, func(sessionID uint64, readyMsg messages.Message) {
producer.Produce(TOPIC_TRIGGER, sessionID, messages.Encode(readyMsg))
})
@ -69,4 +69,3 @@ func main() {
}
}
}

View file

@ -1,8 +1,8 @@
package main
import (
"log"
"encoding/binary"
"log"
"time"
"os"
@ -10,67 +10,64 @@ import (
"syscall"
"openreplay/backend/pkg/env"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
. "openreplay/backend/pkg/messages"
)
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
FS_DIR := env.String("FS_DIR");
FS_DIR := env.String("FS_DIR")
if _, err := os.Stat(FS_DIR); os.IsNotExist(err) {
log.Fatalf("%v doesn't exist. %v", FS_DIR, err)
}
writer := NewWriter(env.Uint16("FS_ULIMIT"), FS_DIR)
count := 0
count := 0
consumer := queue.NewMessageConsumer(
env.String("GROUP_SINK"),
[]string{
[]string{
env.String("TOPIC_RAW_WEB"),
env.String("TOPIC_RAW_IOS"),
},
func(sessionID uint64, message Message, _ *types.Meta) {
//typeID, err := GetMessageTypeID(value)
// if err != nil {
// log.Printf("Message type decoding error: %v", err)
// return
// }
typeID := message.Meta().TypeID
if !IsReplayerType(typeID) {
return
}
},
func(sessionID uint64, message Message, _ *types.Meta) {
//typeID, err := GetMessageTypeID(value)
// if err != nil {
// log.Printf("Message type decoding error: %v", err)
// return
// }
typeID := message.Meta().TypeID
if !IsReplayerType(typeID) {
return
}
count++
count++
value := message.Encode()
var data []byte
if IsIOSType(typeID) {
data = value
} else {
value := message.Encode()
var data []byte
if IsIOSType(typeID) {
data = value
} else {
data = make([]byte, len(value)+8)
copy(data[8:], value[:])
binary.LittleEndian.PutUint64(data[0:], message.Meta().Index)
}
if err := writer.Write(sessionID, data); err != nil {
}
if err := writer.Write(sessionID, data); err != nil {
log.Printf("Writer error: %v\n", err)
}
},
},
false,
)
consumer.DisableAutoCommit()
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
tick := time.Tick(30 * time.Second)
tick := time.Tick(30 * time.Second)
log.Printf("Sink service started\n")
log.Printf("Sink service started\n")
for {
select {
case sig := <-sigchan:
@ -85,7 +82,7 @@ func main() {
log.Printf("%v messages during 30 sec", count)
count = 0
consumer.Commit()
default:
err := consumer.ConsumeNext()
@ -96,4 +93,3 @@ func main() {
}
}

View file

@ -2,45 +2,41 @@ package main
import (
"log"
"time"
"os"
"strconv"
"time"
"os/signal"
"syscall"
"openreplay/backend/pkg/env"
"openreplay/backend/pkg/storage"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/storage"
)
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
storageWeb := storage.NewS3(env.String("AWS_REGION_WEB"), env.String("S3_BUCKET_WEB"))
//storageIos := storage.NewS3(env.String("AWS_REGION_IOS"), env.String("S3_BUCKET_IOS"))
storage := storage.NewS3(env.String("AWS_REGION_WEB"), env.String("S3_BUCKET_WEB"))
FS_DIR := env.String("FS_DIR")
FS_CLEAN_HRS := env.Int("FS_CLEAN_HRS")
var uploadKey func(string, int, *storage.S3)
uploadKey = func(key string, retryCount int, s *storage.S3) {
var uploadKey func(string, int)
uploadKey = func(key string, retryCount int) {
if retryCount <= 0 {
return;
return
}
file, err := os.Open(FS_DIR + "/" + key)
defer file.Close()
if err != nil {
log.Printf("File error: %v; Will retry %v more time(s)\n", err, retryCount)
time.AfterFunc(2*time.Minute, func() {
uploadKey(key, retryCount - 1, s)
uploadKey(key, retryCount-1)
})
} else {
if err := s.Upload(gzipFile(file), key, "application/octet-stream", true); err != nil {
if err := storage.Upload(gzipFile(file), key, "application/octet-stream", true); err != nil {
log.Fatalf("Storage upload error: %v\n", err)
}
}
@ -48,27 +44,24 @@ func main() {
consumer := queue.NewMessageConsumer(
env.String("GROUP_STORAGE"),
[]string{
[]string{
env.String("TOPIC_TRIGGER"),
},
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
switch msg.(type) {
case *messages.SessionEnd:
uploadKey(strconv.FormatUint(sessionID, 10), 5, storageWeb)
//case *messages.IOSSessionEnd:
// uploadKey(strconv.FormatUint(sessionID, 10), 5, storageIos)
}
},
},
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
switch msg.(type) {
case *messages.SessionEnd:
uploadKey(strconv.FormatUint(sessionID, 10), 5)
}
},
true,
)
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
cleanTick := time.Tick(time.Duration(FS_CLEAN_HRS) * time.Hour)
cleanTick := time.Tick(time.Duration(FS_CLEAN_HRS) * time.Hour)
log.Printf("Storage service started\n")
log.Printf("Storage service started\n")
for {
select {
case sig := <-sigchan:
@ -85,4 +78,3 @@ func main() {
}
}
}

View file

@ -25,7 +25,12 @@ type Consumer struct {
lastKafkaEventTs int64
}
func NewConsumer(group string, topics []string, messageHandler types.MessageHandler) *Consumer {
func NewConsumer(
group string,
topics []string,
messageHandler types.MessageHandler,
autoCommit bool,
) *Consumer {
protocol := "plaintext"
if env.Bool("KAFKA_USE_SSL") {
protocol = "ssl"
@ -53,18 +58,19 @@ func NewConsumer(group string, topics []string, messageHandler types.MessageHand
log.Fatalln(err)
}
var commitTicker *time.Ticker
if autoCommit {
commitTicker = time.NewTicker(2 * time.Minute)
}
return &Consumer{
c: c,
messageHandler: messageHandler,
commitTicker: time.NewTicker(2 * time.Minute),
commitTicker: commitTicker,
pollTimeout: 200,
}
}
func (consumer *Consumer) DisableAutoCommit() {
consumer.commitTicker.Stop()
}
func (consumer *Consumer) Commit() error {
consumer.c.Commit() // TODO: return error if it is not "No offset stored"
return nil
@ -128,10 +134,12 @@ func (consumer *Consumer) ConsumeNext() error {
return nil
}
select {
case <-consumer.commitTicker.C:
consumer.Commit()
default:
if consumer.commitTicker != nil {
select {
case <-consumer.commitTicker.C:
consumer.Commit()
default:
}
}
switch e := ev.(type) {

View file

@ -2,17 +2,16 @@ package queue
import (
"openreplay/backend/pkg/kafka"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/license"
"openreplay/backend/pkg/queue/types"
)
func NewConsumer(group string, topics []string, handler types.MessageHandler) types.Consumer {
func NewConsumer(group string, topics []string, handler types.MessageHandler, autoCommit bool) types.Consumer {
license.CheckLicense()
return kafka.NewConsumer(group, topics, handler)
return kafka.NewConsumer(group, topics, handler, autoCommit)
}
func NewProducer() types.Producer {
license.CheckLicense()
return kafka.NewProducer()
}