Merge pull request #445 from openreplay/db_refactoring

Db refactoring
This commit is contained in:
Alex K 2022-05-05 12:50:40 +02:00 committed by GitHub
commit 50bbd0fe98
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 275 additions and 238 deletions

View file

@ -47,7 +47,13 @@ jobs:
#
# Getting the images to build
#
git diff --name-only HEAD HEAD~1 | grep backend/services | cut -d '/' -f3 | uniq > backend/images_to_build.txt
{
git diff --name-only HEAD HEAD~1 | grep -E "backend/cmd|backend/services" | grep -vE ^ee/ | cut -d '/' -f3
git diff --name-only HEAD HEAD~1 | grep -E "backend/pkg|backend/internal" | grep -vE ^ee/ | cut -d '/' -f3 | uniq | while read -r pkg_name ; do
grep -rl "pkg/$pkg_name" backend/services backend/cmd | cut -d '/' -f3
done
} | uniq > backend/images_to_build.txt
[[ $(cat backend/images_to_build.txt) != "" ]] || (echo "Nothing to build here"; exit 0)
#
# Pushing image to registry

View file

@ -51,8 +51,8 @@ jobs:
{
git diff --name-only HEAD HEAD~1 | grep -E "backend/cmd|backend/services" | grep -vE ^ee/ | cut -d '/' -f3
git diff --name-only HEAD HEAD~1 | grep backend/pkg | grep -vE ^ee/ | cut -d '/' -f3 | uniq | while read -r pkg_name ; do
grep -rl "pkg/$pkg_name" backend/services | cut -d '/' -f3
git diff --name-only HEAD HEAD~1 | grep -E "backend/pkg|backend/internal" | grep -vE ^ee/ | cut -d '/' -f3 | uniq | while read -r pkg_name ; do
grep -rl "pkg/$pkg_name" backend/services backend/cmd | cut -d '/' -f3
done
} | uniq > backend/images_to_build.txt

View file

@ -23,7 +23,7 @@ function build_service() {
image="$1"
echo "BUILDING $image"
case "$image" in
http)
http | db)
echo build http
docker build -t ${DOCKER_REPO:-'local'}/$image:${git_sha1} --build-arg SERVICE_NAME=$image -f ./cmd/Dockerfile .
[[ $PUSH_IMAGE -eq 1 ]] && {

117
backend/cmd/db/main.go Normal file
View file

@ -0,0 +1,117 @@
package main
import (
"log"
"openreplay/backend/internal/config/db"
"openreplay/backend/internal/datasaver"
"openreplay/backend/internal/heuristics"
"time"
"os"
"os/signal"
"syscall"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/postgres"
logger "openreplay/backend/pkg/log"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
)
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := db.New()
// Init database
pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres), cfg.ProjectExpirationTimeoutMs)
defer pg.Close()
// Init modules
heurFinder := heuristics.NewHandler()
saver := datasaver.New(pg)
statsLogger := logger.NewQueueStats(cfg.LoggerTimeout)
// Handler logic
handler := func(sessionID uint64, msg messages.Message, meta *types.Meta) {
statsLogger.Collect(sessionID, meta)
// Just save session data into db without additional checks
if err := saver.InsertMessage(sessionID, msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg)
}
return
}
// Try to get session from db for the following handlers
session, err := pg.GetSession(sessionID)
if err != nil {
// Might happen due to the assets-related message TODO: log only if session is necessary for this kind of message
log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, sessionID, msg)
return
}
// Save statistics to db
err = saver.InsertStats(session, msg)
if err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg)
}
// Handle heuristics and save to temporary queue in memory
heurFinder.HandleMessage(session, msg)
// Process saved heuristics messages as usual messages above in the code
heurFinder.IterateSessionReadyMessages(sessionID, func(msg messages.Message) {
// TODO: DRY code (carefully with the return statement logic)
if err := saver.InsertMessage(sessionID, msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
return
}
if err := saver.InsertStats(session, msg); err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
})
}
// Init consumer
consumer := queue.NewMessageConsumer(
cfg.GroupDB,
[]string{
cfg.TopicRawIOS,
cfg.TopicTrigger,
},
handler,
false,
)
log.Printf("Db service started\n")
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
tick := time.Tick(cfg.CommitBatchTimeout)
for {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
consumer.Close()
os.Exit(0)
case <-tick:
pg.CommitBatches()
// TODO?: separate stats & regular messages
if err := consumer.Commit(); err != nil {
log.Printf("Error on consumer commit: %v", err)
}
default:
err := consumer.ConsumeNext()
if err != nil {
log.Fatalf("Error on consumption: %v", err) // TODO: is always fatal?
}
}
}
}

View file

@ -0,0 +1,28 @@
package db
import (
"openreplay/backend/pkg/env"
"time"
)
type Config struct {
Postgres string
ProjectExpirationTimeoutMs int64
LoggerTimeout int
GroupDB string
TopicRawIOS string
TopicTrigger string
CommitBatchTimeout time.Duration
}
func New() *Config {
return &Config{
Postgres: env.String("POSTGRES_STRING"),
ProjectExpirationTimeoutMs: 1000 * 60 * 20,
LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"),
GroupDB: env.String("GROUP_DB"),
TopicRawIOS: env.String("TOPIC_RAW_IOS"),
TopicTrigger: env.String("TOPIC_TRIGGER"),
CommitBatchTimeout: 15 * time.Second,
}
}

View file

@ -0,0 +1,66 @@
package datasaver
import (
. "openreplay/backend/pkg/messages"
)
func (mi *Saver) InsertMessage(sessionID uint64, msg Message) error {
switch m := msg.(type) {
// Common
case *Metadata:
return mi.pg.InsertMetadata(sessionID, m)
case *IssueEvent:
return mi.pg.InsertIssueEvent(sessionID, m)
//TODO: message adapter (transformer) (at the level of pkg/message) for types: *IOSMetadata, *IOSIssueEvent and others
// Web
case *SessionStart:
return mi.pg.InsertWebSessionStart(sessionID, m)
case *SessionEnd:
return mi.pg.InsertWebSessionEnd(sessionID, m)
case *UserID:
return mi.pg.InsertWebUserID(sessionID, m)
case *UserAnonymousID:
return mi.pg.InsertWebUserAnonymousID(sessionID, m)
case *CustomEvent:
return mi.pg.InsertWebCustomEvent(sessionID, m)
case *ClickEvent:
return mi.pg.InsertWebClickEvent(sessionID, m)
case *InputEvent:
return mi.pg.InsertWebInputEvent(sessionID, m)
// Unique Web messages
case *PageEvent:
return mi.pg.InsertWebPageEvent(sessionID, m)
case *ErrorEvent:
return mi.pg.InsertWebErrorEvent(sessionID, m)
case *FetchEvent:
return mi.pg.InsertWebFetchEvent(sessionID, m)
case *GraphQLEvent:
return mi.pg.InsertWebGraphQLEvent(sessionID, m)
// IOS
case *IOSSessionStart:
return mi.pg.InsertIOSSessionStart(sessionID, m)
case *IOSSessionEnd:
return mi.pg.InsertIOSSessionEnd(sessionID, m)
case *IOSUserID:
return mi.pg.InsertIOSUserID(sessionID, m)
case *IOSUserAnonymousID:
return mi.pg.InsertIOSUserAnonymousID(sessionID, m)
case *IOSCustomEvent:
return mi.pg.InsertIOSCustomEvent(sessionID, m)
case *IOSClickEvent:
return mi.pg.InsertIOSClickEvent(sessionID, m)
case *IOSInputEvent:
return mi.pg.InsertIOSInputEvent(sessionID, m)
// Unique IOS messages
case *IOSNetworkCall:
return mi.pg.InsertIOSNetworkCall(sessionID, m)
case *IOSScreenEnter:
return mi.pg.InsertIOSScreenEnter(sessionID, m)
case *IOSCrash:
return mi.pg.InsertIOSCrash(sessionID, m)
}
return nil // "Not implemented"
}

View file

@ -0,0 +1,11 @@
package datasaver
import "openreplay/backend/pkg/db/cache"
type Saver struct {
pg *cache.PGCache
}
func New(pg *cache.PGCache) *Saver {
return &Saver{pg: pg}
}

View file

@ -0,0 +1,19 @@
package datasaver
import (
. "openreplay/backend/pkg/db/types"
. "openreplay/backend/pkg/messages"
)
func (si *Saver) InsertStats(session *Session, msg Message) error {
switch m := msg.(type) {
// Web
case *PerformanceTrackAggr:
return si.pg.InsertWebStatsPerformance(session.SessionID, m)
case *ResourceEvent:
return si.pg.InsertWebStatsResourceEvent(session.SessionID, m)
case *LongTask:
return si.pg.InsertWebStatsLongtask(session.SessionID, m)
}
return nil
}

View file

@ -18,8 +18,6 @@ func (h *anr) buildIf(timestamp uint64) {
m := &IOSIssueEvent{
Type: "anr",
ContextString: h.lastLabel,
//Context: "{}",
//Payload: fmt.SPrint
}
m.Timestamp = h.lastHeartbeatTimestamp
m.Index = h.lastHeartbeatIndex // Associated Index/ MessageID ?

View file

@ -21,8 +21,6 @@ func (h *clickrage) build() {
m := &IOSIssueEvent{
Type: "click_rage",
ContextString: h.lastLabel,
//Context: "{}",
//Payload: fmt.SPrint
}
m.Timestamp = h.firstInARawTimestamp
m.Index = h.firstInARawSeqIndex // Associated Index/ MessageID ?

View file

@ -5,7 +5,6 @@ import (
"log"
"time"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue/types"
//"openreplay/backend/pkg/env"
)
@ -18,6 +17,19 @@ type partitionStats struct {
count int
}
// Update partition statistic
func (prt *partitionStats) update(m *types.Meta) {
if prt.maxts < m.Timestamp {
prt.maxts = m.Timestamp
}
if prt.mints > m.Timestamp || prt.mints == 0 {
prt.mints = m.Timestamp
}
prt.lastts = m.Timestamp
prt.lastID = m.ID
prt.count += 1
}
type queueStats struct {
prts map[int32]*partitionStats
tick <-chan time.Time
@ -30,43 +42,35 @@ func NewQueueStats(sec int) *queueStats {
}
}
func (qs *queueStats) HandleAndLog(sessionID uint64, m *types.Meta) {
// Collect writes new data to partition statistic
func (qs *queueStats) Collect(sessionID uint64, m *types.Meta) {
prti := int32(sessionID % 16) // TODO use GetKeyPartition from kafka/key.go
prt, ok := qs.prts[prti]
if !ok {
qs.prts[prti] = &partitionStats{}
prt = qs.prts[prti]
}
if prt.maxts < m.Timestamp {
prt.maxts = m.Timestamp
}
if prt.mints > m.Timestamp || prt.mints == 0 {
prt.mints = m.Timestamp
}
prt.lastts = m.Timestamp
prt.lastID = m.ID
prt.count += 1
prt.update(m)
select {
case <-qs.tick:
qs.LogThenReset()
qs.log()
qs.reset()
default:
}
}
func (qs *queueStats) LogThenReset() {
// Print to console collected statistics
func (qs *queueStats) log() {
s := "Queue Statistics: "
for i, p := range qs.prts {
s = fmt.Sprintf("%v | %v:: lastTS %v, lastID %v, count %v, maxTS %v, minTS %v",
s, i, p.lastts, p.lastID, p.count, p.maxts, p.mints)
}
log.Println(s)
// reset
qs.prts = make(map[int32]*partitionStats)
}
// TODO: list of message id to log (mb filter function with callback in messages/utils.go or something)
func LogMessage(s string, sessionID uint64, msg messages.Message, m *types.Meta) {
log.Printf("%v | SessionID: %v, Queue info: %v, Message: %v", s, sessionID, m, msg)
// Clear all queue partitions
func (qs *queueStats) reset() {
qs.prts = make(map[int32]*partitionStats)
}

View file

View file

@ -1,109 +0,0 @@
package main
import (
"log"
"time"
"os"
"os/signal"
"syscall"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/env"
logger "openreplay/backend/pkg/log"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/services/db/heuristics"
)
var pg *cache.PGCache
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
initStats()
pg = cache.NewPGCache(postgres.NewConn(env.String("POSTGRES_STRING")), 1000*60*20)
defer pg.Close()
heurFinder := heuristics.NewHandler()
statsLogger := logger.NewQueueStats(env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"))
consumer := queue.NewMessageConsumer(
env.String("GROUP_DB"),
[]string{
env.String("TOPIC_RAW_IOS"),
env.String("TOPIC_TRIGGER"),
},
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
statsLogger.HandleAndLog(sessionID, meta)
if err := insertMessage(sessionID, msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg)
}
return
}
session, err := pg.GetSession(sessionID)
if err != nil {
// Might happen due to the assets-related message TODO: log only if session is necessary for this kind of message
log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, sessionID, msg)
return
}
err = insertStats(session, msg)
if err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg)
}
heurFinder.HandleMessage(session, msg)
heurFinder.IterateSessionReadyMessages(sessionID, func(msg messages.Message) {
// TODO: DRY code (carefully with the return statement logic)
if err := insertMessage(sessionID, msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
return
}
if err := insertStats(session, msg); err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
})
},
false,
)
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
tick := time.Tick(15 * time.Second)
log.Printf("Db service started\n")
for {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
consumer.Close()
os.Exit(0)
case <-tick:
pg.CommitBatches()
if err := commitStats(); err != nil {
log.Printf("Error on stats commit: %v", err)
}
// TODO?: separate stats & regular messages
if err := consumer.Commit(); err != nil {
log.Printf("Error on consumer commit: %v", err)
}
default:
err := consumer.ConsumeNext()
if err != nil {
log.Fatalf("Error on consumption: %v", err) // TODO: is always fatal?
}
}
}
}

View file

@ -1,68 +0,0 @@
package main
import (
. "openreplay/backend/pkg/messages"
)
func insertMessage(sessionID uint64, msg Message) error {
switch m := msg.(type) {
// Common
case *Metadata:
return pg.InsertMetadata(sessionID, m)
case *IssueEvent:
return pg.InsertIssueEvent(sessionID, m)
//TODO: message adapter (transformer) (at the level of pkg/message) for types:
// case *IOSMetadata, *IOSIssueEvent and others
// Web
case *SessionStart:
return pg.InsertWebSessionStart(sessionID, m)
case *SessionEnd:
return pg.InsertWebSessionEnd(sessionID, m)
case *UserID:
return pg.InsertWebUserID(sessionID, m)
case *UserAnonymousID:
return pg.InsertWebUserAnonymousID(sessionID, m)
case *CustomEvent:
return pg.InsertWebCustomEvent(sessionID, m)
case *ClickEvent:
return pg.InsertWebClickEvent(sessionID, m)
case *InputEvent:
return pg.InsertWebInputEvent(sessionID, m)
// Unique Web messages
// case *ResourceEvent:
// return pg.InsertWebResourceEvent(sessionID, m)
case *PageEvent:
return pg.InsertWebPageEvent(sessionID, m)
case *ErrorEvent:
return pg.InsertWebErrorEvent(sessionID, m)
case *FetchEvent:
return pg.InsertWebFetchEvent(sessionID, m)
case *GraphQLEvent:
return pg.InsertWebGraphQLEvent(sessionID, m)
// IOS
case *IOSSessionStart:
return pg.InsertIOSSessionStart(sessionID, m)
case *IOSSessionEnd:
return pg.InsertIOSSessionEnd(sessionID, m)
case *IOSUserID:
return pg.InsertIOSUserID(sessionID, m)
case *IOSUserAnonymousID:
return pg.InsertIOSUserAnonymousID(sessionID, m)
case *IOSCustomEvent:
return pg.InsertIOSCustomEvent(sessionID, m)
case *IOSClickEvent:
return pg.InsertIOSClickEvent(sessionID, m)
case *IOSInputEvent:
return pg.InsertIOSInputEvent(sessionID, m)
// Unique IOS messages
case *IOSNetworkCall:
return pg.InsertIOSNetworkCall(sessionID, m)
case *IOSScreenEnter:
return pg.InsertIOSScreenEnter(sessionID, m)
case *IOSCrash:
return pg.InsertIOSCrash(sessionID, m)
}
return nil // "Not implemented"
}

View file

@ -1,33 +0,0 @@
package main
import (
. "openreplay/backend/pkg/db/types"
. "openreplay/backend/pkg/messages"
)
func initStats() {
// noop
}
func insertStats(session *Session, msg Message) error {
switch m := msg.(type) {
// Web
case *PerformanceTrackAggr:
return pg.InsertWebStatsPerformance(session.SessionID, m)
case *ResourceEvent:
return pg.InsertWebStatsResourceEvent(session.SessionID, m)
case *LongTask:
return pg.InsertWebStatsLongtask(session.SessionID, m)
// IOS
// case *IOSPerformanceAggregated:
// return pg.InsertIOSPerformanceAggregated(session, m)
// case *IOSNetworkCall:
// return pg.InsertIOSNetworkCall(session, m)
}
return nil
}
func commitStats() error {
return nil
}

View file

@ -35,7 +35,7 @@ func main() {
env.String("TOPIC_RAW_IOS"),
},
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
statsLogger.HandleAndLog(sessionID, meta)
statsLogger.Collect(sessionID, meta)
builderMap.HandleMessage(sessionID, msg, msg.Meta().Index)
},
false,