diff --git a/backend/cmd/sink/main.go b/backend/cmd/sink/main.go index a7e2804c4..03f11b200 100644 --- a/backend/cmd/sink/main.go +++ b/backend/cmd/sink/main.go @@ -14,14 +14,11 @@ import ( "openreplay/backend/internal/storage" "openreplay/backend/pkg/messages" "openreplay/backend/pkg/monitoring" - "openreplay/backend/pkg/pprof" "openreplay/backend/pkg/queue" "openreplay/backend/pkg/url/assets" ) func main() { - pprof.StartProfilingServer() - metrics := monitoring.New("sink") log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) @@ -147,6 +144,15 @@ func main() { case <-tickInfo: counter.Print() log.Printf("writer: %s", writer.Info()) + case <-consumer.Rebalanced(): + s := time.Now() + // Commit now to avoid duplicate reads + if err := consumer.Commit(); err != nil { + log.Printf("can't commit messages: %s", err) + } + // Sync all files + writer.Sync() + log.Printf("manual sync finished, dur: %d", time.Now().Sub(s).Milliseconds()) default: err := consumer.ConsumeNext() if err != nil { diff --git a/backend/go.mod b/backend/go.mod index b1046b08e..61d644a17 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -8,6 +8,7 @@ require ( github.com/Masterminds/semver v1.5.0 github.com/aws/aws-sdk-go v1.44.98 github.com/btcsuite/btcutil v1.0.2 + github.com/confluentinc/confluent-kafka-go v1.8.2 github.com/elastic/go-elasticsearch/v7 v7.13.1 github.com/go-redis/redis v6.15.9+incompatible github.com/google/uuid v1.3.0 @@ -26,9 +27,8 @@ require ( go.opentelemetry.io/otel/exporters/prometheus v0.30.0 go.opentelemetry.io/otel/metric v0.30.0 go.opentelemetry.io/otel/sdk/metric v0.30.0 - golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 + golang.org/x/net v0.0.0-20220906165146-f3363e06e74c google.golang.org/api v0.81.0 - gopkg.in/confluentinc/confluent-kafka-go.v1 v1.8.2 ) require ( @@ -38,7 +38,6 @@ require ( cloud.google.com/go/storage v1.14.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect - github.com/confluentinc/confluent-kafka-go v1.9.0 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect @@ -53,7 +52,6 @@ require ( github.com/jackc/puddle v1.2.2-0.20220404125616-4e959849469a // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/klauspost/compress v1.15.7 // indirect - github.com/kr/pretty v0.3.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/paulmach/orb v0.7.1 // indirect github.com/pierrec/lz4/v4 v4.1.15 // indirect @@ -69,7 +67,7 @@ require ( golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 // indirect - golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect + golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 // indirect golang.org/x/text v0.4.0 // indirect golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df // indirect google.golang.org/appengine v1.6.7 // indirect diff --git a/backend/go.sum b/backend/go.sum index fea2aa1a3..c7abea25e 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -115,12 +115,11 @@ github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= -github.com/confluentinc/confluent-kafka-go v1.9.0 h1:d1k62oAuQVxgdMdiDQnpkABbtIWTBwXHpDcyGQUw5QQ= -github.com/confluentinc/confluent-kafka-go v1.9.0/go.mod h1:WDFs+KlhHITEoCzEfHSNgj5aP7vjajyYbZpvTEGs1sE= +github.com/confluentinc/confluent-kafka-go v1.8.2 h1:PBdbvYpyOdFLehj8j+9ba7FL4c4Moxn79gy9cYKxG5E= +github.com/confluentinc/confluent-kafka-go v1.8.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -328,14 +327,12 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxv github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= -github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= @@ -400,8 +397,6 @@ github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0 github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= -github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= -github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc= @@ -566,8 +561,9 @@ golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su golang.org/x/net v0.0.0-20220325170049-de3da57026de/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220412020605-290c469a71a5/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 h1:NWy5+hlRbC7HK+PmcXVUmW1IMyFce7to56IUvhUFm7Y= golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.0.0-20220906165146-f3363e06e74c h1:yKufUcDwucU5urd+50/Opbt4AYpqthk7wHpHok8f1lo= +golang.org/x/net v0.0.0-20220906165146-f3363e06e74c/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -679,8 +675,8 @@ golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220429233432-b5fbb4746d32/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220502124256-b6088ccd6cba/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 h1:WIoqL4EROvwiPdUtaip4VcDdpZ4kha7wBWZrbVKCIZg= +golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -941,8 +937,6 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/confluentinc/confluent-kafka-go.v1 v1.8.2 h1:QAgN6OC0o7dwvyz+HML6GYm+0Pk54O91+oxGqJ/5z8I= -gopkg.in/confluentinc/confluent-kafka-go.v1 v1.8.2/go.mod h1:ZdI3yfYmdNSLQPNCpO1y00EHyWaHG5EnQEyL/ntAegY= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= diff --git a/backend/internal/config/sink/config.go b/backend/internal/config/sink/config.go index 1a2df142e..53e3517a4 100644 --- a/backend/internal/config/sink/config.go +++ b/backend/internal/config/sink/config.go @@ -9,7 +9,7 @@ type Config struct { common.Config FsDir string `env:"FS_DIR,required"` FsUlimit uint16 `env:"FS_ULIMIT,required"` - FileBuffer int `env:"FILE_BUFFER,default=32768"` + FileBuffer int `env:"FILE_BUFFER,default=16384"` SyncTimeout int `env:"SYNC_TIMEOUT,default=5"` GroupSink string `env:"GROUP_SINK,required"` TopicRawWeb string `env:"TOPIC_RAW_WEB,required"` diff --git a/backend/internal/sink/sessionwriter/file.go b/backend/internal/sink/sessionwriter/file.go index 1ad076d72..37b1664a9 100644 --- a/backend/internal/sink/sessionwriter/file.go +++ b/backend/internal/sink/sessionwriter/file.go @@ -2,6 +2,8 @@ package sessionwriter import ( "bufio" + "io" + "log" "os" ) @@ -24,15 +26,32 @@ func NewFile(path string, bufSize int) (*File, error) { } func (f *File) Write(data []byte) error { + f.updated = true + if len(data) > f.buffer.Available()+f.buffer.Size() { + // Flush buffer to file + for i := 0; i < 3; i++ { + err := f.buffer.Flush() + if err == nil { + break + } + log.Printf("can't flush buffer: %s", err) + } + // Write big message directly to file + return f.write(f.file, data) + } + return f.write(f.buffer, data) +} + +func (f *File) write(w io.Writer, data []byte) error { leftToWrite := len(data) for leftToWrite > 0 { - writtenDown, err := f.buffer.Write(data) + from := len(data) - leftToWrite + writtenDown, err := w.Write(data[from:]) if err != nil { return err } leftToWrite -= writtenDown } - f.updated = true return nil } diff --git a/backend/internal/sink/sessionwriter/writer.go b/backend/internal/sink/sessionwriter/writer.go index f2eb052c7..7da1ae878 100644 --- a/backend/internal/sink/sessionwriter/writer.go +++ b/backend/internal/sink/sessionwriter/writer.go @@ -97,17 +97,21 @@ func (w *SessionWriter) Info() string { return fmt.Sprintf("%d sessions", w.meta.Count()) } +func (w *SessionWriter) Sync() { + w.sessions.Range(func(sid, lockObj any) bool { + if err := w.sync(sid.(uint64)); err != nil { + log.Printf("can't sync file descriptor: %s", err) + } + return true + }) +} + func (w *SessionWriter) synchronizer() { tick := time.Tick(w.syncTimeout) for { select { case <-tick: - w.sessions.Range(func(sid, lockObj any) bool { - if err := w.sync(sid.(uint64)); err != nil { - log.Printf("can't sync file descriptor: %s", err) - } - return true - }) + w.Sync() case <-w.done: w.sessions.Range(func(sid, lockObj any) bool { if err := w.Close(sid.(uint64)); err != nil { diff --git a/backend/pkg/queue/types/types.go b/backend/pkg/queue/types/types.go index 48408ce10..21ee49d60 100644 --- a/backend/pkg/queue/types/types.go +++ b/backend/pkg/queue/types/types.go @@ -6,6 +6,7 @@ type Consumer interface { CommitBack(gap int64) error Commit() error Close() + Rebalanced() <-chan interface{} } // Producer sends batches of session data to queue (redis or kafka) diff --git a/backend/pkg/redisstream/consumer.go b/backend/pkg/redisstream/consumer.go index 228b2c7a0..3c5b6d0a4 100644 --- a/backend/pkg/redisstream/consumer.go +++ b/backend/pkg/redisstream/consumer.go @@ -27,6 +27,7 @@ type Consumer struct { idsPending streamPendingIDsMap lastTs int64 autoCommit bool + event chan interface{} } func NewConsumer(group string, streams []string, messageIterator messages.MessageIterator) *Consumer { @@ -57,11 +58,16 @@ func NewConsumer(group string, streams []string, messageIterator messages.Messag group: group, autoCommit: true, idsPending: idsPending, + event: make(chan interface{}, 4), } } const READ_COUNT = 10 +func (c *Consumer) Rebalanced() <-chan interface{} { + return c.event +} + func (c *Consumer) ConsumeNext() error { // MBTODO: read in go routine, send messages to channel res, err := c.redis.XReadGroup(&_redis.XReadGroupArgs{ diff --git a/ee/backend/pkg/kafka/consumer.go b/ee/backend/pkg/kafka/consumer.go index 14f8d5a68..bea1f0604 100644 --- a/ee/backend/pkg/kafka/consumer.go +++ b/ee/backend/pkg/kafka/consumer.go @@ -2,24 +2,24 @@ package kafka import ( "log" - "openreplay/backend/pkg/messages" "os" "time" - "github.com/pkg/errors" - - "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" "openreplay/backend/pkg/env" + "openreplay/backend/pkg/messages" + + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/pkg/errors" ) type Message = kafka.Message type Consumer struct { - c *kafka.Consumer - messageIterator messages.MessageIterator - commitTicker *time.Ticker - pollTimeout uint - + c *kafka.Consumer + messageIterator messages.MessageIterator + commitTicker *time.Ticker + pollTimeout uint + events chan interface{} lastReceivedPrtTs map[int32]int64 } @@ -48,7 +48,7 @@ func NewConsumer( kafkaConfig.SetKey("ssl.certificate.location", os.Getenv("KAFKA_SSL_CERT")) } - // Apply Kerberos configuration + // Apply Kerberos configuration if env.Bool("KAFKA_USE_KERBEROS") { kafkaConfig.SetKey("security.protocol", "sasl_plaintext") kafkaConfig.SetKey("sasl.mechanisms", "GSSAPI") @@ -61,6 +61,21 @@ func NewConsumer( if err != nil { log.Fatalln(err) } + + var commitTicker *time.Ticker + if autoCommit { + commitTicker = time.NewTicker(2 * time.Minute) + } + + consumer := &Consumer{ + c: c, + messageIterator: messageIterator, + commitTicker: commitTicker, + pollTimeout: 200, + events: make(chan interface{}, 4), + lastReceivedPrtTs: make(map[int32]int64, 16), + } + subREx := "^(" for i, t := range topics { if i != 0 { @@ -69,22 +84,27 @@ func NewConsumer( subREx += t } subREx += ")$" - if err := c.Subscribe(subREx, nil); err != nil { + if err := c.Subscribe(subREx, consumer.reBalanceCallback); err != nil { log.Fatalln(err) } - var commitTicker *time.Ticker - if autoCommit { - commitTicker = time.NewTicker(2 * time.Minute) - } + return consumer +} - return &Consumer{ - c: c, - messageIterator: messageIterator, - commitTicker: commitTicker, - pollTimeout: 200, - lastReceivedPrtTs: make(map[int32]int64), +func (consumer *Consumer) reBalanceCallback(_ *kafka.Consumer, e kafka.Event) error { + switch evt := e.(type) { + case kafka.RevokedPartitions: + // receive before re-balancing partitions; stop consuming messages and commit current state + consumer.events <- evt.String() + case kafka.AssignedPartitions: + // receive after re-balancing partitions; continue consuming messages + //consumer.events <- evt.String() } + return nil +} + +func (consumer *Consumer) Rebalanced() <-chan interface{} { + return consumer.events } func (consumer *Consumer) Commit() error { diff --git a/ee/backend/pkg/kafka/log.go b/ee/backend/pkg/kafka/log.go index 0cd80cb6d..c71c6d2bd 100644 --- a/ee/backend/pkg/kafka/log.go +++ b/ee/backend/pkg/kafka/log.go @@ -1,16 +1,15 @@ package kafka import ( - "log" "fmt" + "log" - "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" + "github.com/confluentinc/confluent-kafka-go/kafka" ) - func logPartitions(s string, prts []kafka.TopicPartition) { for _, p := range prts { s = fmt.Sprintf("%v | %v", s, p.Partition) } log.Println(s) -} \ No newline at end of file +} diff --git a/ee/backend/pkg/kafka/producer.go b/ee/backend/pkg/kafka/producer.go index f895241a7..1ec241b8a 100644 --- a/ee/backend/pkg/kafka/producer.go +++ b/ee/backend/pkg/kafka/producer.go @@ -5,7 +5,7 @@ import ( "log" "os" - "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" + "github.com/confluentinc/confluent-kafka-go/kafka" "openreplay/backend/pkg/env" )