diff --git a/ee/backend/pkg/kafka/consumer.go b/ee/backend/pkg/kafka/consumer.go index 1483c2ccf..ca0544923 100644 --- a/ee/backend/pkg/kafka/consumer.go +++ b/ee/backend/pkg/kafka/consumer.go @@ -9,9 +9,9 @@ import ( "github.com/pkg/errors" + "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" "openreplay/backend/pkg/env" "openreplay/backend/pkg/queue/types" - "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" ) type Message = kafka.Message @@ -19,7 +19,7 @@ type Message = kafka.Message type Consumer struct { c *kafka.Consumer messageHandler types.MessageHandler - commitTicker *time.Ticker + commitTicker *time.Ticker pollTimeout uint lastKafkaEventTs int64 @@ -56,7 +56,7 @@ func NewConsumer(group string, topics []string, messageHandler types.MessageHand return &Consumer{ c: c, messageHandler: messageHandler, - commitTicker: time.NewTicker(2 * time.Minute), + commitTicker: time.NewTicker(2 * time.Minute), pollTimeout: 200, } } @@ -65,13 +65,12 @@ func (consumer *Consumer) DisableAutoCommit() { consumer.commitTicker.Stop() } - func (consumer *Consumer) Commit() error { consumer.c.Commit() // TODO: return error if it is not "No offset stored" return nil } -func (consumer *Consumer) CommitAtTimestamp(commitTs int64) error { +func (consumer *Consumer) CommitAtTimestamp(commitTs int64) error { assigned, err := consumer.c.Assignment() if err != nil { return err @@ -84,37 +83,38 @@ func (consumer *Consumer) CommitAtTimestamp(commitTs int64) error { timestamps = append(timestamps, p) } offsets, err := consumer.c.OffsetsForTimes(timestamps, 2000) - if err != nil { + if err != nil { return errors.Wrap(err, "Kafka Consumer back commit error") } // Limiting to already committed committed, err := consumer.c.Committed(assigned, 2000) // memorise? - logPartitions("Actually committed:",committed) + logPartitions("Actually committed:", committed) if err != nil { return errors.Wrap(err, "Kafka Consumer retrieving committed error") } for _, offs := range offsets { for _, comm := range committed { - if comm.Offset == kafka.OffsetStored || + if comm.Offset == kafka.OffsetStored || comm.Offset == kafka.OffsetInvalid || - comm.Offset == kafka.OffsetBeginning || - comm.Offset == kafka.OffsetEnd { continue } - if comm.Partition == offs.Partition && + comm.Offset == kafka.OffsetBeginning || + comm.Offset == kafka.OffsetEnd { + continue + } + if comm.Partition == offs.Partition && (comm.Topic != nil && offs.Topic != nil && *comm.Topic == *offs.Topic) && - comm.Offset > offs.Offset { + comm.Offset > offs.Offset { offs.Offset = comm.Offset } } } - // TODO: check per-partition errors: offsets[i].Error + // TODO: check per-partition errors: offsets[i].Error _, err = consumer.c.CommitOffsets(offsets) return errors.Wrap(err, "Kafka Consumer back commit error") } - -func (consumer *Consumer) CommitBack(gap int64) error { +func (consumer *Consumer) CommitBack(gap int64) error { if consumer.lastKafkaEventTs == 0 { return nil } @@ -135,31 +135,31 @@ func (consumer *Consumer) ConsumeNext() error { } switch e := ev.(type) { - case *kafka.Message: - if e.TopicPartition.Error != nil { - return errors.Wrap(e.TopicPartition.Error, "Consumer Partition Error") - } - ts := e.Timestamp.UnixNano()/ 1e6 - consumer.messageHandler(decodeKey(e.Key), e.Value, &types.Meta{ - Topic: *(e.TopicPartition.Topic), - ID: uint64(e.TopicPartition.Offset), - Timestamp: ts, - }) - consumer.lastKafkaEventTs = ts - // case kafka.AssignedPartitions: - // logPartitions("Kafka Consumer: Partitions Assigned", e.Partitions) - // consumer.partitions = e.Partitions - // consumer.c.Assign(e.Partitions) - // log.Printf("Actually partitions assigned!") - // case kafka.RevokedPartitions: - // log.Println("Kafka Cosumer: Partitions Revoked") - // consumer.partitions = nil - // consumer.c.Unassign() - case kafka.Error: - if e.Code() == kafka.ErrAllBrokersDown { - os.Exit(1) - } - log.Printf("Consumer error: %v\n", e) + case *kafka.Message: + if e.TopicPartition.Error != nil { + return errors.Wrap(e.TopicPartition.Error, "Consumer Partition Error") + } + ts := e.Timestamp.UnixNano() / 1e6 + consumer.messageHandler(decodeKey(e.Key), e.Value, &types.Meta{ + Topic: *(e.TopicPartition.Topic), + ID: uint64(e.TopicPartition.Offset), + Timestamp: ts, + }) + consumer.lastKafkaEventTs = ts + // case kafka.AssignedPartitions: + // logPartitions("Kafka Consumer: Partitions Assigned", e.Partitions) + // consumer.partitions = e.Partitions + // consumer.c.Assign(e.Partitions) + // log.Printf("Actually partitions assigned!") + // case kafka.RevokedPartitions: + // log.Println("Kafka Cosumer: Partitions Revoked") + // consumer.partitions = nil + // consumer.c.Unassign() + case kafka.Error: + if e.Code() == kafka.ErrAllBrokersDown || e.Code() == kafka.ErrMaxPollExceeded { + os.Exit(1) + } + log.Printf("Consumer error: %v\n", e) } return nil } @@ -173,8 +173,6 @@ func (consumer *Consumer) Close() { } } - - // func (consumer *Consumer) consume( // message func(m *kafka.Message) error, // commit func(c *kafka.Consumer) error, @@ -230,7 +228,6 @@ func (consumer *Consumer) Close() { // } // } - // func (consumer *Consumer) Consume( // message func(key uint64, value []byte) error, // ) error {