feat(backend): added rebalancing handler to all services

This commit is contained in:
Alexander Zavorotynskiy 2022-11-29 13:21:16 +01:00
parent 304e98cfac
commit 5e8111e5d4
5 changed files with 10 additions and 0 deletions

View file

@ -73,6 +73,8 @@ func main() {
log.Printf("Error while caching: %v", err)
case <-tick:
cacher.UpdateTimeouts()
case msg := <-msgConsumer.Rebalanced():
log.Println(msg)
default:
if !cacher.CanCache() {
continue

View file

@ -163,6 +163,8 @@ func main() {
os.Exit(0)
case <-commitTick:
commitDBUpdates()
case msg := <-consumer.Rebalanced():
log.Println(msg)
default:
// Handle new message from queue
if err := consumer.ConsumeNext(); err != nil {

View file

@ -98,6 +98,8 @@ func main() {
if err := consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP); err != nil {
log.Printf("can't commit messages with offset: %s", err)
}
case msg := <-consumer.Rebalanced():
log.Println(msg)
default:
if err := consumer.ConsumeNext(); err != nil {
log.Fatalf("Error on consuming: %v", err)

View file

@ -82,6 +82,8 @@ func main() {
})
producer.Flush(cfg.ProducerTimeout)
consumer.Commit()
case msg := <-consumer.Rebalanced():
log.Println(msg)
default:
if err := consumer.ConsumeNext(); err != nil {
log.Fatalf("Error on consuming: %v", err)

View file

@ -73,6 +73,8 @@ func main() {
os.Exit(0)
case <-counterTick:
go counter.Print()
case msg := <-consumer.Rebalanced():
log.Println(msg)
default:
err := consumer.ConsumeNext()
if err != nil {