diff --git a/backend/cmd/sink/main.go b/backend/cmd/sink/main.go index a7e2804c4..fcd78c3aa 100644 --- a/backend/cmd/sink/main.go +++ b/backend/cmd/sink/main.go @@ -147,6 +147,15 @@ func main() { case <-tickInfo: counter.Print() log.Printf("writer: %s", writer.Info()) + case <-consumer.Rebalanced(): + s := time.Now() + // Commit now to avoid duplicate reads + if err := consumer.Commit(); err != nil { + log.Printf("can't commit messages: %s", err) + } + // Sync all files + writer.Sync() + log.Printf("manual sync finished, dur: %d", time.Now().Sub(s).Milliseconds()) default: err := consumer.ConsumeNext() if err != nil { diff --git a/backend/go.mod b/backend/go.mod index b1046b08e..bc0f1896f 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -8,6 +8,7 @@ require ( github.com/Masterminds/semver v1.5.0 github.com/aws/aws-sdk-go v1.44.98 github.com/btcsuite/btcutil v1.0.2 + github.com/confluentinc/confluent-kafka-go v1.8.2 github.com/elastic/go-elasticsearch/v7 v7.13.1 github.com/go-redis/redis v6.15.9+incompatible github.com/google/uuid v1.3.0 @@ -28,7 +29,6 @@ require ( go.opentelemetry.io/otel/sdk/metric v0.30.0 golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 google.golang.org/api v0.81.0 - gopkg.in/confluentinc/confluent-kafka-go.v1 v1.8.2 ) require ( @@ -38,7 +38,6 @@ require ( cloud.google.com/go/storage v1.14.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect - github.com/confluentinc/confluent-kafka-go v1.9.0 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect @@ -53,7 +52,6 @@ require ( github.com/jackc/puddle v1.2.2-0.20220404125616-4e959849469a // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/klauspost/compress v1.15.7 // indirect - github.com/kr/pretty v0.3.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/paulmach/orb v0.7.1 // indirect github.com/pierrec/lz4/v4 v4.1.15 // indirect diff --git a/backend/go.sum b/backend/go.sum index fea2aa1a3..cef80ac2c 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -115,12 +115,11 @@ github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= -github.com/confluentinc/confluent-kafka-go v1.9.0 h1:d1k62oAuQVxgdMdiDQnpkABbtIWTBwXHpDcyGQUw5QQ= -github.com/confluentinc/confluent-kafka-go v1.9.0/go.mod h1:WDFs+KlhHITEoCzEfHSNgj5aP7vjajyYbZpvTEGs1sE= +github.com/confluentinc/confluent-kafka-go v1.8.2 h1:PBdbvYpyOdFLehj8j+9ba7FL4c4Moxn79gy9cYKxG5E= +github.com/confluentinc/confluent-kafka-go v1.8.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -328,14 +327,12 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxv github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= -github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= @@ -400,8 +397,6 @@ github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0 github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= -github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= -github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc= @@ -941,8 +936,6 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/confluentinc/confluent-kafka-go.v1 v1.8.2 h1:QAgN6OC0o7dwvyz+HML6GYm+0Pk54O91+oxGqJ/5z8I= -gopkg.in/confluentinc/confluent-kafka-go.v1 v1.8.2/go.mod h1:ZdI3yfYmdNSLQPNCpO1y00EHyWaHG5EnQEyL/ntAegY= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= diff --git a/backend/internal/sink/sessionwriter/writer.go b/backend/internal/sink/sessionwriter/writer.go index f2eb052c7..7da1ae878 100644 --- a/backend/internal/sink/sessionwriter/writer.go +++ b/backend/internal/sink/sessionwriter/writer.go @@ -97,17 +97,21 @@ func (w *SessionWriter) Info() string { return fmt.Sprintf("%d sessions", w.meta.Count()) } +func (w *SessionWriter) Sync() { + w.sessions.Range(func(sid, lockObj any) bool { + if err := w.sync(sid.(uint64)); err != nil { + log.Printf("can't sync file descriptor: %s", err) + } + return true + }) +} + func (w *SessionWriter) synchronizer() { tick := time.Tick(w.syncTimeout) for { select { case <-tick: - w.sessions.Range(func(sid, lockObj any) bool { - if err := w.sync(sid.(uint64)); err != nil { - log.Printf("can't sync file descriptor: %s", err) - } - return true - }) + w.Sync() case <-w.done: w.sessions.Range(func(sid, lockObj any) bool { if err := w.Close(sid.(uint64)); err != nil { diff --git a/backend/pkg/queue/types/types.go b/backend/pkg/queue/types/types.go index 48408ce10..21ee49d60 100644 --- a/backend/pkg/queue/types/types.go +++ b/backend/pkg/queue/types/types.go @@ -6,6 +6,7 @@ type Consumer interface { CommitBack(gap int64) error Commit() error Close() + Rebalanced() <-chan interface{} } // Producer sends batches of session data to queue (redis or kafka) diff --git a/backend/pkg/redisstream/consumer.go b/backend/pkg/redisstream/consumer.go index 228b2c7a0..3c5b6d0a4 100644 --- a/backend/pkg/redisstream/consumer.go +++ b/backend/pkg/redisstream/consumer.go @@ -27,6 +27,7 @@ type Consumer struct { idsPending streamPendingIDsMap lastTs int64 autoCommit bool + event chan interface{} } func NewConsumer(group string, streams []string, messageIterator messages.MessageIterator) *Consumer { @@ -57,11 +58,16 @@ func NewConsumer(group string, streams []string, messageIterator messages.Messag group: group, autoCommit: true, idsPending: idsPending, + event: make(chan interface{}, 4), } } const READ_COUNT = 10 +func (c *Consumer) Rebalanced() <-chan interface{} { + return c.event +} + func (c *Consumer) ConsumeNext() error { // MBTODO: read in go routine, send messages to channel res, err := c.redis.XReadGroup(&_redis.XReadGroupArgs{ diff --git a/ee/backend/pkg/kafka/consumer.go b/ee/backend/pkg/kafka/consumer.go index 14f8d5a68..bea1f0604 100644 --- a/ee/backend/pkg/kafka/consumer.go +++ b/ee/backend/pkg/kafka/consumer.go @@ -2,24 +2,24 @@ package kafka import ( "log" - "openreplay/backend/pkg/messages" "os" "time" - "github.com/pkg/errors" - - "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" "openreplay/backend/pkg/env" + "openreplay/backend/pkg/messages" + + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/pkg/errors" ) type Message = kafka.Message type Consumer struct { - c *kafka.Consumer - messageIterator messages.MessageIterator - commitTicker *time.Ticker - pollTimeout uint - + c *kafka.Consumer + messageIterator messages.MessageIterator + commitTicker *time.Ticker + pollTimeout uint + events chan interface{} lastReceivedPrtTs map[int32]int64 } @@ -48,7 +48,7 @@ func NewConsumer( kafkaConfig.SetKey("ssl.certificate.location", os.Getenv("KAFKA_SSL_CERT")) } - // Apply Kerberos configuration + // Apply Kerberos configuration if env.Bool("KAFKA_USE_KERBEROS") { kafkaConfig.SetKey("security.protocol", "sasl_plaintext") kafkaConfig.SetKey("sasl.mechanisms", "GSSAPI") @@ -61,6 +61,21 @@ func NewConsumer( if err != nil { log.Fatalln(err) } + + var commitTicker *time.Ticker + if autoCommit { + commitTicker = time.NewTicker(2 * time.Minute) + } + + consumer := &Consumer{ + c: c, + messageIterator: messageIterator, + commitTicker: commitTicker, + pollTimeout: 200, + events: make(chan interface{}, 4), + lastReceivedPrtTs: make(map[int32]int64, 16), + } + subREx := "^(" for i, t := range topics { if i != 0 { @@ -69,22 +84,27 @@ func NewConsumer( subREx += t } subREx += ")$" - if err := c.Subscribe(subREx, nil); err != nil { + if err := c.Subscribe(subREx, consumer.reBalanceCallback); err != nil { log.Fatalln(err) } - var commitTicker *time.Ticker - if autoCommit { - commitTicker = time.NewTicker(2 * time.Minute) - } + return consumer +} - return &Consumer{ - c: c, - messageIterator: messageIterator, - commitTicker: commitTicker, - pollTimeout: 200, - lastReceivedPrtTs: make(map[int32]int64), +func (consumer *Consumer) reBalanceCallback(_ *kafka.Consumer, e kafka.Event) error { + switch evt := e.(type) { + case kafka.RevokedPartitions: + // receive before re-balancing partitions; stop consuming messages and commit current state + consumer.events <- evt.String() + case kafka.AssignedPartitions: + // receive after re-balancing partitions; continue consuming messages + //consumer.events <- evt.String() } + return nil +} + +func (consumer *Consumer) Rebalanced() <-chan interface{} { + return consumer.events } func (consumer *Consumer) Commit() error { diff --git a/ee/backend/pkg/kafka/log.go b/ee/backend/pkg/kafka/log.go index 0cd80cb6d..c71c6d2bd 100644 --- a/ee/backend/pkg/kafka/log.go +++ b/ee/backend/pkg/kafka/log.go @@ -1,16 +1,15 @@ package kafka import ( - "log" "fmt" + "log" - "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" + "github.com/confluentinc/confluent-kafka-go/kafka" ) - func logPartitions(s string, prts []kafka.TopicPartition) { for _, p := range prts { s = fmt.Sprintf("%v | %v", s, p.Partition) } log.Println(s) -} \ No newline at end of file +} diff --git a/ee/backend/pkg/kafka/producer.go b/ee/backend/pkg/kafka/producer.go index f895241a7..1ec241b8a 100644 --- a/ee/backend/pkg/kafka/producer.go +++ b/ee/backend/pkg/kafka/producer.go @@ -5,7 +5,7 @@ import ( "log" "os" - "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" + "github.com/confluentinc/confluent-kafka-go/kafka" "openreplay/backend/pkg/env" )