feat(db): log new events

This commit is contained in:
Taha Yassine Kraiem 2022-01-28 17:29:11 +01:00
parent b4ced1f28d
commit 5b256546eb
3 changed files with 58 additions and 39 deletions

View file

@ -1,9 +1,10 @@
package cache
import (
import (
"errors"
. "openreplay/backend/pkg/messages"
"log"
. "openreplay/backend/pkg/db/types"
. "openreplay/backend/pkg/messages"
)
@ -44,10 +45,14 @@ func (c *PGCache) InsertWebSessionEnd(sessionID uint64, e *SessionEnd) error {
}
func (c *PGCache) InsertWebErrorEvent(sessionID uint64, e *ErrorEvent) error {
log.Println(">>InsertWebErrorEvent")
session, err := c.GetSession(sessionID)
if err != nil {
log.Println("session not found")
log.Println(err)
return err
}
log.Println(">>InsertWebErrorEvent to DB")
if err := c.Conn.InsertWebErrorEvent(sessionID, session.ProjectID, e); err != nil {
return err
}

View file

@ -1,11 +1,12 @@
package postgres
import (
"math"
"log"
"math"
"openreplay/backend/pkg/hashid"
"openreplay/backend/pkg/url"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/url"
)
// TODO: change messages and replace everywhere to e.Index
@ -172,6 +173,8 @@ func (conn *Conn) InsertWebErrorEvent(sessionID uint64, projectID uint32, e *Err
}
defer tx.rollback()
errorID := hashid.WebErrorID(projectID, e)
log.Println(">>errorID")
log.Println(errorID)
if err = tx.exec(`
INSERT INTO errors
(error_id, project_id, source, name, message, payload)
@ -180,6 +183,8 @@ func (conn *Conn) InsertWebErrorEvent(sessionID uint64, projectID uint32, e *Err
ON CONFLICT DO NOTHING`,
errorID, projectID, e.Source, e.Name, e.Message, e.Payload,
); err != nil {
log.Println(">>error LVL1")
log.Println(err)
return err
}
if err = tx.exec(`
@ -190,6 +195,8 @@ func (conn *Conn) InsertWebErrorEvent(sessionID uint64, projectID uint32, e *Err
`,
sessionID, e.MessageID, e.Timestamp, errorID,
); err != nil {
log.Println(">>error LVL2")
log.Println(err)
return err
}
if err = tx.exec(`
@ -197,6 +204,8 @@ func (conn *Conn) InsertWebErrorEvent(sessionID uint64, projectID uint32, e *Err
WHERE session_id = $1`,
sessionID,
); err != nil {
log.Println(">>error LVL3")
log.Println(err)
return err
}
return tx.commit()

View file

@ -2,20 +2,21 @@ package main
import (
"log"
"reflect"
"time"
"os"
"os/signal"
"syscall"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/env"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/services/db/heuristics"
)
)
var pg *cache.PGCache
@ -23,62 +24,66 @@ func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
initStats()
pg = cache.NewPGCache(postgres.NewConn(env.String("POSTGRES_STRING")), 1000 * 60 * 20)
pg = cache.NewPGCache(postgres.NewConn(env.String("POSTGRES_STRING")), 1000*60*20)
defer pg.Close()
heurFinder := heuristics.NewHandler()
consumer := queue.NewMessageConsumer(
env.String("GROUP_DB"),
[]string{
[]string{
env.String("TOPIC_RAW_IOS"),
env.String("TOPIC_TRIGGER"),
},
func(sessionID uint64, msg messages.Message, _ *types.Meta) {
if err := insertMessage(sessionID, msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err,sessionID, msg)
}
return
}
},
func(sessionID uint64, msg messages.Message, _ *types.Meta) {
log.Println(">>Insert message")
log.Println(reflect.TypeOf(msg))
if err := insertMessage(sessionID, msg); err != nil {
log.Println(">>Error")
log.Println(err)
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg)
}
return
}
session, err := pg.GetSession(sessionID)
session, err := pg.GetSession(sessionID)
if err != nil {
// Might happen due to the assets-related message TODO: log only if session is necessary for this kind of message
log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, sessionID, msg)
return;
return
}
err = insertStats(session, msg)
if err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg)
}
err = insertStats(session, msg)
if err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg)
}
heurFinder.HandleMessage(session, msg)
heurFinder.IterateSessionReadyMessages(sessionID, func(msg messages.Message) {
// TODO: DRY code (carefully with the return statement logic)
if err := insertMessage(sessionID, msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
return
}
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
return
}
err = insertStats(session, msg)
if err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
err = insertStats(session, msg)
if err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
})
},
)
consumer.DisableAutoCommit()
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
tick := time.Tick(15 * time.Second)
tick := time.Tick(15 * time.Second)
log.Printf("Db service started\n")
log.Printf("Db service started\n")
for {
select {
case sig := <-sigchan:
@ -88,11 +93,11 @@ func main() {
case <-tick:
if err := commitStats(); err != nil {
log.Printf("Error on stats commit: %v", err)
}
}
// TODO?: separate stats & regular messages
if err := consumer.Commit(); err != nil {
log.Printf("Error on consumer commit: %v", err)
}
}
default:
err := consumer.ConsumeNext()
if err != nil {
@ -101,4 +106,4 @@ func main() {
}
}
}
}