From 5b256546ebda8a02b66edddfe2999e7705c71dde Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Fri, 28 Jan 2022 17:29:11 +0100 Subject: [PATCH] feat(db): log new events --- backend/pkg/db/cache/messages_web.go | 9 ++- backend/pkg/db/postgres/messages_web.go | 13 ++++- backend/services/db/main.go | 75 +++++++++++++------------ 3 files changed, 58 insertions(+), 39 deletions(-) diff --git a/backend/pkg/db/cache/messages_web.go b/backend/pkg/db/cache/messages_web.go index b259e49da..e28bea0b8 100644 --- a/backend/pkg/db/cache/messages_web.go +++ b/backend/pkg/db/cache/messages_web.go @@ -1,9 +1,10 @@ package cache -import ( +import ( "errors" - . "openreplay/backend/pkg/messages" + "log" . "openreplay/backend/pkg/db/types" + . "openreplay/backend/pkg/messages" ) @@ -44,10 +45,14 @@ func (c *PGCache) InsertWebSessionEnd(sessionID uint64, e *SessionEnd) error { } func (c *PGCache) InsertWebErrorEvent(sessionID uint64, e *ErrorEvent) error { + log.Println(">>InsertWebErrorEvent") session, err := c.GetSession(sessionID) if err != nil { + log.Println("session not found") + log.Println(err) return err } + log.Println(">>InsertWebErrorEvent to DB") if err := c.Conn.InsertWebErrorEvent(sessionID, session.ProjectID, e); err != nil { return err } diff --git a/backend/pkg/db/postgres/messages_web.go b/backend/pkg/db/postgres/messages_web.go index 6e2045d99..39f56492a 100644 --- a/backend/pkg/db/postgres/messages_web.go +++ b/backend/pkg/db/postgres/messages_web.go @@ -1,11 +1,12 @@ package postgres import ( - "math" + "log" + "math" "openreplay/backend/pkg/hashid" - "openreplay/backend/pkg/url" . "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/url" ) // TODO: change messages and replace everywhere to e.Index @@ -172,6 +173,8 @@ func (conn *Conn) InsertWebErrorEvent(sessionID uint64, projectID uint32, e *Err } defer tx.rollback() errorID := hashid.WebErrorID(projectID, e) + log.Println(">>errorID") + log.Println(errorID) if err = tx.exec(` INSERT INTO errors (error_id, project_id, source, name, message, payload) @@ -180,6 +183,8 @@ func (conn *Conn) InsertWebErrorEvent(sessionID uint64, projectID uint32, e *Err ON CONFLICT DO NOTHING`, errorID, projectID, e.Source, e.Name, e.Message, e.Payload, ); err != nil { + log.Println(">>error LVL1") + log.Println(err) return err } if err = tx.exec(` @@ -190,6 +195,8 @@ func (conn *Conn) InsertWebErrorEvent(sessionID uint64, projectID uint32, e *Err `, sessionID, e.MessageID, e.Timestamp, errorID, ); err != nil { + log.Println(">>error LVL2") + log.Println(err) return err } if err = tx.exec(` @@ -197,6 +204,8 @@ func (conn *Conn) InsertWebErrorEvent(sessionID uint64, projectID uint32, e *Err WHERE session_id = $1`, sessionID, ); err != nil { + log.Println(">>error LVL3") + log.Println(err) return err } return tx.commit() diff --git a/backend/services/db/main.go b/backend/services/db/main.go index a14aa7648..275ce8ad5 100644 --- a/backend/services/db/main.go +++ b/backend/services/db/main.go @@ -2,20 +2,21 @@ package main import ( "log" + "reflect" "time" "os" "os/signal" "syscall" + "openreplay/backend/pkg/db/cache" + "openreplay/backend/pkg/db/postgres" "openreplay/backend/pkg/env" + "openreplay/backend/pkg/messages" "openreplay/backend/pkg/queue" "openreplay/backend/pkg/queue/types" - "openreplay/backend/pkg/messages" - "openreplay/backend/pkg/db/postgres" - "openreplay/backend/pkg/db/cache" "openreplay/backend/services/db/heuristics" -) +) var pg *cache.PGCache @@ -23,62 +24,66 @@ func main() { log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) initStats() - pg = cache.NewPGCache(postgres.NewConn(env.String("POSTGRES_STRING")), 1000 * 60 * 20) + pg = cache.NewPGCache(postgres.NewConn(env.String("POSTGRES_STRING")), 1000*60*20) defer pg.Close() heurFinder := heuristics.NewHandler() consumer := queue.NewMessageConsumer( env.String("GROUP_DB"), - []string{ + []string{ env.String("TOPIC_RAW_IOS"), env.String("TOPIC_TRIGGER"), - }, - func(sessionID uint64, msg messages.Message, _ *types.Meta) { - if err := insertMessage(sessionID, msg); err != nil { - if !postgres.IsPkeyViolation(err) { - log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err,sessionID, msg) - } - return - } + }, + func(sessionID uint64, msg messages.Message, _ *types.Meta) { + log.Println(">>Insert message") + log.Println(reflect.TypeOf(msg)) + if err := insertMessage(sessionID, msg); err != nil { + log.Println(">>Error") + log.Println(err) + if !postgres.IsPkeyViolation(err) { + log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg) + } + return + } - session, err := pg.GetSession(sessionID) + session, err := pg.GetSession(sessionID) if err != nil { // Might happen due to the assets-related message TODO: log only if session is necessary for this kind of message log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, sessionID, msg) - return; + return } - err = insertStats(session, msg) - if err != nil { - log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg) - } + err = insertStats(session, msg) + if err != nil { + log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg) + } heurFinder.HandleMessage(session, msg) heurFinder.IterateSessionReadyMessages(sessionID, func(msg messages.Message) { // TODO: DRY code (carefully with the return statement logic) if err := insertMessage(sessionID, msg); err != nil { - if !postgres.IsPkeyViolation(err) { - log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg) - } - return - } + if !postgres.IsPkeyViolation(err) { + log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg) + } + return + } - err = insertStats(session, msg) - if err != nil { - log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg) - } + err = insertStats(session, msg) + if err != nil { + log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg) + } }) }, ) consumer.DisableAutoCommit() sigchan := make(chan os.Signal, 1) - signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) + signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) - tick := time.Tick(15 * time.Second) + tick := time.Tick(15 * time.Second) - log.Printf("Db service started\n") + log.Printf("Db service started\n") for { select { case sig := <-sigchan: @@ -88,11 +93,11 @@ func main() { case <-tick: if err := commitStats(); err != nil { log.Printf("Error on stats commit: %v", err) - } + } // TODO?: separate stats & regular messages if err := consumer.Commit(); err != nil { log.Printf("Error on consumer commit: %v", err) - } + } default: err := consumer.ConsumeNext() if err != nil { @@ -101,4 +106,4 @@ func main() { } } -} \ No newline at end of file +}