V1.9.0 sink improvements (#848)
* feat(backend/sink): write big messages directly to file * feat(backend/sink): manually sync all files on partitions rebalanced event (#847) * feat(backend): fix vulnerability in /x/net
This commit is contained in:
parent
3af96c60d3
commit
56c216d2d5
11 changed files with 104 additions and 57 deletions
|
|
@ -14,14 +14,11 @@ import (
|
|||
"openreplay/backend/internal/storage"
|
||||
"openreplay/backend/pkg/messages"
|
||||
"openreplay/backend/pkg/monitoring"
|
||||
"openreplay/backend/pkg/pprof"
|
||||
"openreplay/backend/pkg/queue"
|
||||
"openreplay/backend/pkg/url/assets"
|
||||
)
|
||||
|
||||
func main() {
|
||||
pprof.StartProfilingServer()
|
||||
|
||||
metrics := monitoring.New("sink")
|
||||
|
||||
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
|
||||
|
|
@ -147,6 +144,15 @@ func main() {
|
|||
case <-tickInfo:
|
||||
counter.Print()
|
||||
log.Printf("writer: %s", writer.Info())
|
||||
case <-consumer.Rebalanced():
|
||||
s := time.Now()
|
||||
// Commit now to avoid duplicate reads
|
||||
if err := consumer.Commit(); err != nil {
|
||||
log.Printf("can't commit messages: %s", err)
|
||||
}
|
||||
// Sync all files
|
||||
writer.Sync()
|
||||
log.Printf("manual sync finished, dur: %d", time.Now().Sub(s).Milliseconds())
|
||||
default:
|
||||
err := consumer.ConsumeNext()
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ require (
|
|||
github.com/Masterminds/semver v1.5.0
|
||||
github.com/aws/aws-sdk-go v1.44.98
|
||||
github.com/btcsuite/btcutil v1.0.2
|
||||
github.com/confluentinc/confluent-kafka-go v1.8.2
|
||||
github.com/elastic/go-elasticsearch/v7 v7.13.1
|
||||
github.com/go-redis/redis v6.15.9+incompatible
|
||||
github.com/google/uuid v1.3.0
|
||||
|
|
@ -26,9 +27,8 @@ require (
|
|||
go.opentelemetry.io/otel/exporters/prometheus v0.30.0
|
||||
go.opentelemetry.io/otel/metric v0.30.0
|
||||
go.opentelemetry.io/otel/sdk/metric v0.30.0
|
||||
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2
|
||||
golang.org/x/net v0.0.0-20220906165146-f3363e06e74c
|
||||
google.golang.org/api v0.81.0
|
||||
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.8.2
|
||||
)
|
||||
|
||||
require (
|
||||
|
|
@ -38,7 +38,6 @@ require (
|
|||
cloud.google.com/go/storage v1.14.0 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
||||
github.com/confluentinc/confluent-kafka-go v1.9.0 // indirect
|
||||
github.com/go-logr/logr v1.2.3 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
||||
|
|
@ -53,7 +52,6 @@ require (
|
|||
github.com/jackc/puddle v1.2.2-0.20220404125616-4e959849469a // indirect
|
||||
github.com/jmespath/go-jmespath v0.4.0 // indirect
|
||||
github.com/klauspost/compress v1.15.7 // indirect
|
||||
github.com/kr/pretty v0.3.0 // indirect
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
|
||||
github.com/paulmach/orb v0.7.1 // indirect
|
||||
github.com/pierrec/lz4/v4 v4.1.15 // indirect
|
||||
|
|
@ -69,7 +67,7 @@ require (
|
|||
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
|
||||
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect
|
||||
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 // indirect
|
||||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
|
||||
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 // indirect
|
||||
golang.org/x/text v0.4.0 // indirect
|
||||
golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df // indirect
|
||||
google.golang.org/appengine v1.6.7 // indirect
|
||||
|
|
|
|||
|
|
@ -115,12 +115,11 @@ github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWH
|
|||
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
|
||||
github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I=
|
||||
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
|
||||
github.com/confluentinc/confluent-kafka-go v1.9.0 h1:d1k62oAuQVxgdMdiDQnpkABbtIWTBwXHpDcyGQUw5QQ=
|
||||
github.com/confluentinc/confluent-kafka-go v1.9.0/go.mod h1:WDFs+KlhHITEoCzEfHSNgj5aP7vjajyYbZpvTEGs1sE=
|
||||
github.com/confluentinc/confluent-kafka-go v1.8.2 h1:PBdbvYpyOdFLehj8j+9ba7FL4c4Moxn79gy9cYKxG5E=
|
||||
github.com/confluentinc/confluent-kafka-go v1.8.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
|
||||
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
|
||||
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
|
||||
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
|
||||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||
github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
|
|
@ -328,14 +327,12 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxv
|
|||
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
|
||||
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
|
||||
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw=
|
||||
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
|
||||
github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
|
||||
github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
|
||||
|
|
@ -400,8 +397,6 @@ github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0
|
|||
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
|
||||
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
|
||||
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
|
||||
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
|
||||
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
|
||||
github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU=
|
||||
github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc=
|
||||
|
|
@ -566,8 +561,9 @@ golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su
|
|||
golang.org/x/net v0.0.0-20220325170049-de3da57026de/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
|
||||
golang.org/x/net v0.0.0-20220412020605-290c469a71a5/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
|
||||
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
|
||||
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 h1:NWy5+hlRbC7HK+PmcXVUmW1IMyFce7to56IUvhUFm7Y=
|
||||
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
|
||||
golang.org/x/net v0.0.0-20220906165146-f3363e06e74c h1:yKufUcDwucU5urd+50/Opbt4AYpqthk7wHpHok8f1lo=
|
||||
golang.org/x/net v0.0.0-20220906165146-f3363e06e74c/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
|
|
@ -679,8 +675,8 @@ golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBc
|
|||
golang.org/x/sys v0.0.0-20220429233432-b5fbb4746d32/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220502124256-b6088ccd6cba/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s=
|
||||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 h1:WIoqL4EROvwiPdUtaip4VcDdpZ4kha7wBWZrbVKCIZg=
|
||||
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
|
|
@ -941,8 +937,6 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
|
|||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.8.2 h1:QAgN6OC0o7dwvyz+HML6GYm+0Pk54O91+oxGqJ/5z8I=
|
||||
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.8.2/go.mod h1:ZdI3yfYmdNSLQPNCpO1y00EHyWaHG5EnQEyL/ntAegY=
|
||||
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
|
||||
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
|
||||
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ type Config struct {
|
|||
common.Config
|
||||
FsDir string `env:"FS_DIR,required"`
|
||||
FsUlimit uint16 `env:"FS_ULIMIT,required"`
|
||||
FileBuffer int `env:"FILE_BUFFER,default=32768"`
|
||||
FileBuffer int `env:"FILE_BUFFER,default=16384"`
|
||||
SyncTimeout int `env:"SYNC_TIMEOUT,default=5"`
|
||||
GroupSink string `env:"GROUP_SINK,required"`
|
||||
TopicRawWeb string `env:"TOPIC_RAW_WEB,required"`
|
||||
|
|
|
|||
|
|
@ -2,6 +2,8 @@ package sessionwriter
|
|||
|
||||
import (
|
||||
"bufio"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
)
|
||||
|
||||
|
|
@ -24,15 +26,32 @@ func NewFile(path string, bufSize int) (*File, error) {
|
|||
}
|
||||
|
||||
func (f *File) Write(data []byte) error {
|
||||
f.updated = true
|
||||
if len(data) > f.buffer.Available()+f.buffer.Size() {
|
||||
// Flush buffer to file
|
||||
for i := 0; i < 3; i++ {
|
||||
err := f.buffer.Flush()
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
log.Printf("can't flush buffer: %s", err)
|
||||
}
|
||||
// Write big message directly to file
|
||||
return f.write(f.file, data)
|
||||
}
|
||||
return f.write(f.buffer, data)
|
||||
}
|
||||
|
||||
func (f *File) write(w io.Writer, data []byte) error {
|
||||
leftToWrite := len(data)
|
||||
for leftToWrite > 0 {
|
||||
writtenDown, err := f.buffer.Write(data)
|
||||
from := len(data) - leftToWrite
|
||||
writtenDown, err := w.Write(data[from:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
leftToWrite -= writtenDown
|
||||
}
|
||||
f.updated = true
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -97,17 +97,21 @@ func (w *SessionWriter) Info() string {
|
|||
return fmt.Sprintf("%d sessions", w.meta.Count())
|
||||
}
|
||||
|
||||
func (w *SessionWriter) Sync() {
|
||||
w.sessions.Range(func(sid, lockObj any) bool {
|
||||
if err := w.sync(sid.(uint64)); err != nil {
|
||||
log.Printf("can't sync file descriptor: %s", err)
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func (w *SessionWriter) synchronizer() {
|
||||
tick := time.Tick(w.syncTimeout)
|
||||
for {
|
||||
select {
|
||||
case <-tick:
|
||||
w.sessions.Range(func(sid, lockObj any) bool {
|
||||
if err := w.sync(sid.(uint64)); err != nil {
|
||||
log.Printf("can't sync file descriptor: %s", err)
|
||||
}
|
||||
return true
|
||||
})
|
||||
w.Sync()
|
||||
case <-w.done:
|
||||
w.sessions.Range(func(sid, lockObj any) bool {
|
||||
if err := w.Close(sid.(uint64)); err != nil {
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ type Consumer interface {
|
|||
CommitBack(gap int64) error
|
||||
Commit() error
|
||||
Close()
|
||||
Rebalanced() <-chan interface{}
|
||||
}
|
||||
|
||||
// Producer sends batches of session data to queue (redis or kafka)
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ type Consumer struct {
|
|||
idsPending streamPendingIDsMap
|
||||
lastTs int64
|
||||
autoCommit bool
|
||||
event chan interface{}
|
||||
}
|
||||
|
||||
func NewConsumer(group string, streams []string, messageIterator messages.MessageIterator) *Consumer {
|
||||
|
|
@ -57,11 +58,16 @@ func NewConsumer(group string, streams []string, messageIterator messages.Messag
|
|||
group: group,
|
||||
autoCommit: true,
|
||||
idsPending: idsPending,
|
||||
event: make(chan interface{}, 4),
|
||||
}
|
||||
}
|
||||
|
||||
const READ_COUNT = 10
|
||||
|
||||
func (c *Consumer) Rebalanced() <-chan interface{} {
|
||||
return c.event
|
||||
}
|
||||
|
||||
func (c *Consumer) ConsumeNext() error {
|
||||
// MBTODO: read in go routine, send messages to channel
|
||||
res, err := c.redis.XReadGroup(&_redis.XReadGroupArgs{
|
||||
|
|
|
|||
|
|
@ -2,24 +2,24 @@ package kafka
|
|||
|
||||
import (
|
||||
"log"
|
||||
"openreplay/backend/pkg/messages"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
|
||||
"openreplay/backend/pkg/env"
|
||||
"openreplay/backend/pkg/messages"
|
||||
|
||||
"github.com/confluentinc/confluent-kafka-go/kafka"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type Message = kafka.Message
|
||||
|
||||
type Consumer struct {
|
||||
c *kafka.Consumer
|
||||
messageIterator messages.MessageIterator
|
||||
commitTicker *time.Ticker
|
||||
pollTimeout uint
|
||||
|
||||
c *kafka.Consumer
|
||||
messageIterator messages.MessageIterator
|
||||
commitTicker *time.Ticker
|
||||
pollTimeout uint
|
||||
events chan interface{}
|
||||
lastReceivedPrtTs map[int32]int64
|
||||
}
|
||||
|
||||
|
|
@ -48,7 +48,7 @@ func NewConsumer(
|
|||
kafkaConfig.SetKey("ssl.certificate.location", os.Getenv("KAFKA_SSL_CERT"))
|
||||
}
|
||||
|
||||
// Apply Kerberos configuration
|
||||
// Apply Kerberos configuration
|
||||
if env.Bool("KAFKA_USE_KERBEROS") {
|
||||
kafkaConfig.SetKey("security.protocol", "sasl_plaintext")
|
||||
kafkaConfig.SetKey("sasl.mechanisms", "GSSAPI")
|
||||
|
|
@ -61,6 +61,21 @@ func NewConsumer(
|
|||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
|
||||
var commitTicker *time.Ticker
|
||||
if autoCommit {
|
||||
commitTicker = time.NewTicker(2 * time.Minute)
|
||||
}
|
||||
|
||||
consumer := &Consumer{
|
||||
c: c,
|
||||
messageIterator: messageIterator,
|
||||
commitTicker: commitTicker,
|
||||
pollTimeout: 200,
|
||||
events: make(chan interface{}, 4),
|
||||
lastReceivedPrtTs: make(map[int32]int64, 16),
|
||||
}
|
||||
|
||||
subREx := "^("
|
||||
for i, t := range topics {
|
||||
if i != 0 {
|
||||
|
|
@ -69,22 +84,27 @@ func NewConsumer(
|
|||
subREx += t
|
||||
}
|
||||
subREx += ")$"
|
||||
if err := c.Subscribe(subREx, nil); err != nil {
|
||||
if err := c.Subscribe(subREx, consumer.reBalanceCallback); err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
|
||||
var commitTicker *time.Ticker
|
||||
if autoCommit {
|
||||
commitTicker = time.NewTicker(2 * time.Minute)
|
||||
}
|
||||
return consumer
|
||||
}
|
||||
|
||||
return &Consumer{
|
||||
c: c,
|
||||
messageIterator: messageIterator,
|
||||
commitTicker: commitTicker,
|
||||
pollTimeout: 200,
|
||||
lastReceivedPrtTs: make(map[int32]int64),
|
||||
func (consumer *Consumer) reBalanceCallback(_ *kafka.Consumer, e kafka.Event) error {
|
||||
switch evt := e.(type) {
|
||||
case kafka.RevokedPartitions:
|
||||
// receive before re-balancing partitions; stop consuming messages and commit current state
|
||||
consumer.events <- evt.String()
|
||||
case kafka.AssignedPartitions:
|
||||
// receive after re-balancing partitions; continue consuming messages
|
||||
//consumer.events <- evt.String()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (consumer *Consumer) Rebalanced() <-chan interface{} {
|
||||
return consumer.events
|
||||
}
|
||||
|
||||
func (consumer *Consumer) Commit() error {
|
||||
|
|
|
|||
|
|
@ -1,16 +1,15 @@
|
|||
package kafka
|
||||
|
||||
import (
|
||||
"log"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
|
||||
"github.com/confluentinc/confluent-kafka-go/kafka"
|
||||
)
|
||||
|
||||
|
||||
func logPartitions(s string, prts []kafka.TopicPartition) {
|
||||
for _, p := range prts {
|
||||
s = fmt.Sprintf("%v | %v", s, p.Partition)
|
||||
}
|
||||
log.Println(s)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ import (
|
|||
"log"
|
||||
"os"
|
||||
|
||||
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
|
||||
"github.com/confluentinc/confluent-kafka-go/kafka"
|
||||
"openreplay/backend/pkg/env"
|
||||
)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue