feat(backend): fixes for graceful shutdown in services

This commit is contained in:
Alexander Zavorotynskiy 2022-09-28 17:29:09 +02:00
parent a7487cd371
commit 43434f892a
6 changed files with 30 additions and 24 deletions

View file

@ -68,6 +68,7 @@ func main() {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
// TODO: wait assets workers here
msgConsumer.Close()
os.Exit(0)
case err := <-cacher.Errors:

View file

@ -67,7 +67,7 @@ func main() {
// Handler logic
msgHandler := func(msg messages.Message) {
statsLogger.Collect(msg) // TODO: carefully check message meta and batch meta confusion situation
statsLogger.Collect(msg)
// Just save session data into db without additional checks
if err := saver.InsertMessage(msg); err != nil {
@ -127,33 +127,36 @@ func main() {
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
commitTick := time.Tick(cfg.CommitBatchTimeout)
// Send collected batches to db
commitDBUpdates := func() {
start := time.Now()
pg.CommitBatches()
pgDur := time.Now().Sub(start).Milliseconds()
start = time.Now()
if err := saver.CommitStats(); err != nil {
log.Printf("Error on stats commit: %v", err)
}
chDur := time.Now().Sub(start).Milliseconds()
log.Printf("commit duration(ms), pg: %d, ch: %d", pgDur, chDur)
if err := consumer.Commit(); err != nil {
log.Printf("Error on consumer commit: %v", err)
}
}
for {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
log.Printf("Caught signal %s: terminating\n", sig.String())
commitDBUpdates()
consumer.Close()
os.Exit(0)
case <-commitTick:
// Send collected batches to db
start := time.Now()
pg.CommitBatches()
pgDur := time.Now().Sub(start).Milliseconds()
start = time.Now()
if err := saver.CommitStats(); err != nil {
log.Printf("Error on stats commit: %v", err)
}
chDur := time.Now().Sub(start).Milliseconds()
log.Printf("commit duration(ms), pg: %d, ch: %d", pgDur, chDur)
// TODO: use commit worker to save time each tick
if err := consumer.Commit(); err != nil {
log.Printf("Error on consumer commit: %v", err)
}
commitDBUpdates()
default:
// Handle new message from queue
err := consumer.ConsumeNext()
if err != nil {
if err := consumer.ConsumeNext(); err != nil {
log.Fatalf("Error on consumption: %v", err)
}
}

View file

@ -46,7 +46,7 @@ func main() {
log.Printf("ZERO TS, sessID: %d, msgType: %d", msg.Meta().SessionID(), msg.TypeID())
}
statsLogger.Collect(msg)
sessions.UpdateSession(msg) //TODO: recheck timestamps(sessionID, meta.Timestamp, iter.Message().Meta().Timestamp)
sessions.UpdateSession(msg)
}
consumer := queue.NewConsumer(

View file

@ -49,7 +49,7 @@ func main() {
msgHandler := func(msg messages.Message) {
statsLogger.Collect(msg)
builderMap.HandleMessage(msg) //(sessionID, msg, iter.Message().Meta().Index)
builderMap.HandleMessage(msg)
}
consumer := queue.NewConsumer(

View file

@ -17,7 +17,6 @@ import (
"openreplay/backend/pkg/token"
)
//
func main() {
metrics := monitoring.New("integrations")

View file

@ -115,6 +115,9 @@ func main() {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
if err := writer.SyncAll(); err != nil {
log.Printf("sync error: %v\n", err)
}
if err := consumer.Commit(); err != nil {
log.Printf("can't commit messages: %s", err)
}
@ -122,7 +125,7 @@ func main() {
os.Exit(0)
case <-tick:
if err := writer.SyncAll(); err != nil {
log.Fatalf("Sync error: %v\n", err)
log.Fatalf("sync error: %v\n", err)
}
counter.Print()
if err := consumer.Commit(); err != nil {