From 065ecf9e036f41a43b206b6a8eeeb0a103c1357a Mon Sep 17 00:00:00 2001 From: Alexander Zavorotynskiy Date: Thu, 1 Dec 2022 12:47:08 +0100 Subject: [PATCH] feat(backend/ender): logs improvements --- backend/cmd/ender/main.go | 13 ++++++++++--- ee/backend/pkg/kafka/consumer.go | 2 -- ee/backend/pkg/kafka/log.go | 15 --------------- 3 files changed, 10 insertions(+), 20 deletions(-) delete mode 100644 ee/backend/pkg/kafka/log.go diff --git a/backend/cmd/ender/main.go b/backend/cmd/ender/main.go index 187aa33db..89354fe64 100644 --- a/backend/cmd/ender/main.go +++ b/backend/cmd/ender/main.go @@ -64,6 +64,8 @@ func main() { os.Exit(0) case <-tick: failedSessionEnds := make(map[uint64]int64) + duplicatedSessionEnds := make(map[uint64]uint64) + // Find ended sessions and send notification to other services sessions.HandleEndedSessions(func(sessionID uint64, timestamp int64) bool { msg := &messages.SessionEnd{Timestamp: uint64(timestamp)} @@ -82,8 +84,8 @@ func main() { return false } if currDuration == newDuration { - log.Printf("sessionEnd duplicate, sessID: %d, prevDur: %d, newDur: %d", sessionID, - currDuration, newDuration) + // Skip session end duplicate + duplicatedSessionEnds[sessionID] = currDuration return true } if cfg.UseEncryption { @@ -101,7 +103,12 @@ func main() { } return true }) - log.Println("sessions with wrong duration:", failedSessionEnds) + if len(failedSessionEnds) > 0 { + log.Println("sessions with wrong duration:", failedSessionEnds) + } + if len(duplicatedSessionEnds) > 0 { + log.Println("session end duplicates:", duplicatedSessionEnds) + } producer.Flush(cfg.ProducerTimeout) if err := consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP); err != nil { log.Printf("can't commit messages with offset: %s", err) diff --git a/ee/backend/pkg/kafka/consumer.go b/ee/backend/pkg/kafka/consumer.go index bea1f0604..fc8c98eaa 100644 --- a/ee/backend/pkg/kafka/consumer.go +++ b/ee/backend/pkg/kafka/consumer.go @@ -120,7 +120,6 @@ func (consumer *Consumer) commitAtTimestamps( if err != nil { return err } - logPartitions("Actually assigned:", assigned) var timestamps []kafka.TopicPartition for _, p := range assigned { // p is a copy here since it is not a pointer @@ -142,7 +141,6 @@ func (consumer *Consumer) commitAtTimestamps( if err != nil { return errors.Wrap(err, "Kafka Consumer retrieving committed error") } - logPartitions("Actually committed:", committed) for _, comm := range committed { if comm.Offset == kafka.OffsetStored || comm.Offset == kafka.OffsetInvalid || diff --git a/ee/backend/pkg/kafka/log.go b/ee/backend/pkg/kafka/log.go deleted file mode 100644 index c71c6d2bd..000000000 --- a/ee/backend/pkg/kafka/log.go +++ /dev/null @@ -1,15 +0,0 @@ -package kafka - -import ( - "fmt" - "log" - - "github.com/confluentinc/confluent-kafka-go/kafka" -) - -func logPartitions(s string, prts []kafka.TopicPartition) { - for _, p := range prts { - s = fmt.Sprintf("%v | %v", s, p.Partition) - } - log.Println(s) -}