feat(backend): call sessions table optimise method only in one db service

This commit is contained in:
Alexander Zavorotynskiy 2022-09-13 15:48:23 +02:00
parent d5c2e262e1
commit 3b045fa864
6 changed files with 24 additions and 3 deletions

View file

@ -145,7 +145,7 @@ func main() {
pgDur := time.Now().Sub(start).Milliseconds()
start = time.Now()
if err := saver.CommitStats(); err != nil {
if err := saver.CommitStats(consumer.HasFirstPartition()); err != nil {
log.Printf("Error on stats commit: %v", err)
}
chDur := time.Now().Sub(start).Milliseconds()

View file

@ -22,6 +22,6 @@ func (si *Saver) InsertStats(session *Session, msg Message) error {
return nil
}
func (si *Saver) CommitStats() error {
func (si *Saver) CommitStats(optimize bool) error {
return nil
}

View file

@ -9,6 +9,7 @@ type Consumer interface {
Commit() error
CommitBack(gap int64) error
Close()
HasFirstPartition() bool
}
type Producer interface {

View file

@ -161,3 +161,7 @@ func (c *Consumer) CommitBack(gap int64) error {
func (c *Consumer) Close() {
// noop
}
func (c *Consumer) HasFirstPartition() bool {
return false
}

View file

@ -43,7 +43,10 @@ func (si *Saver) InsertStats(session *types.Session, msg messages.Message) error
return nil
}
func (si *Saver) CommitStats() error {
func (si *Saver) CommitStats(optimize bool) error {
if !optimize {
return si.ch.Commit()
}
select {
case <-finalizeTicker:
if err := si.ch.FinaliseSessionsTable(); err != nil {

View file

@ -194,3 +194,16 @@ func (consumer *Consumer) Close() {
log.Printf("Kafka consumer close error: %v", err)
}
}
func (consumer *Consumer) HasFirstPartition() bool {
assigned, err := consumer.c.Assignment()
if err != nil {
return false
}
for _, p := range assigned {
if p.Partition == 1 {
return true
}
}
return false
}