feat(backend): add user's timestamp to ender logic, removed some messages from db batches

This commit is contained in:
Alexander Zavorotynskiy 2022-06-24 10:10:35 +02:00
parent 3b6cb3ee0e
commit 2c0880a161
8 changed files with 74 additions and 18 deletions

View file

@ -49,12 +49,17 @@ func main() {
cfg.TopicRawWeb,
},
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
if msg.TypeID() == 3 {
// Skip message end
switch msg.(type) {
case *messages.SessionStart, *messages.SessionEnd, *messages.RawErrorEvent:
// Skip several message types
return
}
// Test debug
if msg.Meta().Timestamp == 0 {
log.Printf("ZERO TS, sessID: %d, msgType: %d", sessionID, msg.TypeID())
}
statsLogger.Collect(sessionID, meta)
sessions.UpdateSession(sessionID, meta.Timestamp)
sessions.UpdateSession(sessionID, meta.Timestamp, msg.Meta().Timestamp)
},
false,
)

View file

@ -16,6 +16,7 @@ type EndedSessionHandler func(sessionID uint64, timestamp int64) bool
type session struct {
lastTimestamp int64
lastUpdate int64
lastUserTime int64
isEnded bool
}
@ -51,7 +52,7 @@ func New(metrics *monitoring.Metrics, timeout int64, parts int) (*SessionEnder,
}
// UpdateSession save timestamp for new sessions and update for existing sessions
func (se *SessionEnder) UpdateSession(sessionID uint64, timestamp int64) {
func (se *SessionEnder) UpdateSession(sessionID uint64, timestamp, msgTimestamp int64) {
localTS := time.Now().UnixMilli()
currTS := timestamp
if currTS == 0 {
@ -62,11 +63,12 @@ func (se *SessionEnder) UpdateSession(sessionID uint64, timestamp int64) {
sess, ok := se.sessions[sessionID]
if !ok {
se.sessions[sessionID] = &session{
lastTimestamp: currTS, // timestamp from message broker
lastUpdate: localTS, // local timestamp
lastTimestamp: currTS, // timestamp from message broker
lastUpdate: localTS, // local timestamp
lastUserTime: msgTimestamp, // last timestamp from user's machine
isEnded: false,
}
log.Printf("added new session: %d", sessionID)
//log.Printf("added new session: %d", sessionID)
se.activeSessions.Add(context.Background(), 1)
se.totalSessions.Add(context.Background(), 1)
return
@ -74,6 +76,7 @@ func (se *SessionEnder) UpdateSession(sessionID uint64, timestamp int64) {
if currTS > sess.lastTimestamp {
sess.lastTimestamp = currTS
sess.lastUpdate = localTS
sess.lastUserTime = msgTimestamp
sess.isEnded = false
}
}
@ -86,10 +89,12 @@ func (se *SessionEnder) HandleEndedSessions(handler EndedSessionHandler) {
if sess.isEnded || (se.timeCtrl.LastTimestamp(sessID)-sess.lastTimestamp > se.timeout) ||
(currTime-sess.lastUpdate > se.timeout) {
sess.isEnded = true
if handler(sessID, sess.lastTimestamp) {
if handler(sessID, sess.lastUserTime) {
delete(se.sessions, sessID)
se.activeSessions.Add(context.Background(), -1)
removedSessions++
} else {
log.Printf("sessID: %d, userTime: %d", sessID, sess.lastUserTime)
}
}
}

View file

@ -16,6 +16,9 @@ func (c *PGCache) InsertSessionEnd(sessionID uint64, timestamp uint64) error {
}
func (c *PGCache) HandleSessionEnd(sessionID uint64) error {
if err := c.Conn.HandleSessionEnd(sessionID); err != nil {
log.Printf("can't handle session end: %s", err)
}
c.DeleteSession(sessionID)
return nil
}

View file

@ -3,6 +3,7 @@ package postgres
import (
"context"
"log"
"strings"
"time"
"github.com/jackc/pgx/v4"
@ -14,10 +15,16 @@ func getTimeoutContext() context.Context {
return ctx
}
type batchItem struct {
query string
arguments []interface{}
}
type Conn struct {
c *pgxpool.Pool // TODO: conditional usage of Pool/Conn (use interface?)
batches map[uint64]*pgx.Batch
batchSizes map[uint64]int
rawBatches map[uint64][]*batchItem
batchQueueLimit int
batchSizeLimit int
}
@ -32,6 +39,7 @@ func NewConn(url string, queueLimit, sizeLimit int) *Conn {
c: c,
batches: make(map[uint64]*pgx.Batch),
batchSizes: make(map[uint64]int),
rawBatches: make(map[uint64][]*batchItem),
batchQueueLimit: queueLimit,
batchSizeLimit: sizeLimit,
}
@ -46,9 +54,17 @@ func (conn *Conn) batchQueue(sessionID uint64, sql string, args ...interface{})
batch, ok := conn.batches[sessionID]
if !ok {
conn.batches[sessionID] = &pgx.Batch{}
conn.rawBatches[sessionID] = make([]*batchItem, 0)
batch = conn.batches[sessionID]
}
batch.Queue(sql, args...)
// Temp raw batch store
raw := conn.rawBatches[sessionID]
raw = append(raw, &batchItem{
query: sql,
arguments: args,
})
conn.rawBatches[sessionID] = raw
}
func (conn *Conn) CommitBatches() {
@ -58,12 +74,16 @@ func (conn *Conn) CommitBatches() {
for i := 0; i < l; i++ {
if ct, err := br.Exec(); err != nil {
log.Printf("Error in PG batch (command tag %s, session: %d): %v \n", ct.String(), sessID, err)
failedSql := conn.rawBatches[sessID][i]
query := strings.ReplaceAll(failedSql.query, "\n", " ")
log.Println("failed sql req:", query, failedSql.arguments)
}
}
br.Close() // returns err
}
conn.batches = make(map[uint64]*pgx.Batch)
conn.batchSizes = make(map[uint64]int)
conn.rawBatches = make(map[uint64][]*batchItem)
}
func (conn *Conn) updateBatchSize(sessionID uint64, reqSize int) {
@ -85,6 +105,9 @@ func (conn *Conn) commitBatch(sessionID uint64) {
for i := 0; i < l; i++ {
if ct, err := br.Exec(); err != nil {
log.Printf("Error in PG batch (command tag %s, session: %d): %v \n", ct.String(), sessionID, err)
failedSql := conn.rawBatches[sessionID][i]
query := strings.ReplaceAll(failedSql.query, "\n", " ")
log.Println("failed sql req:", query, failedSql.arguments)
}
}
br.Close()
@ -92,6 +115,7 @@ func (conn *Conn) commitBatch(sessionID uint64) {
// Clean batch info
delete(conn.batches, sessionID)
delete(conn.batchSizes, sessionID)
delete(conn.rawBatches, sessionID)
}
func (conn *Conn) query(sql string, args ...interface{}) (pgx.Rows, error) {

View file

@ -2,6 +2,7 @@ package postgres
import (
"fmt"
"log"
"strings"
"openreplay/backend/pkg/db/types"
@ -31,10 +32,13 @@ func (conn *Conn) insertAutocompleteValue(sessionID uint64, tp string, value str
FROM sessions
WHERE session_id = $3
) ON CONFLICT DO NOTHING`
conn.batchQueue(sessionID, sqlRequest, value, tp, sessionID)
if err := conn.exec(sqlRequest, value, tp, sessionID); err != nil {
log.Printf("can't insert autocomplete: %s", err)
}
//conn.batchQueue(sessionID, sqlRequest, value, tp, sessionID)
// Record approximate message size
conn.updateBatchSize(sessionID, len(sqlRequest)+len(value)+len(tp)+8)
//conn.updateBatchSize(sessionID, len(sqlRequest)+len(value)+len(tp)+8)
}
func (conn *Conn) InsertSessionStart(sessionID uint64, s *types.Session) error {
@ -93,7 +97,7 @@ func (conn *Conn) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64,
return dur, nil
}
func (conn *Conn) HandleSessionEnd(sessionID uint64, timestamp uint64) error {
func (conn *Conn) HandleSessionEnd(sessionID uint64) error {
// TODO: search acceleration?
sqlRequest := `
UPDATE sessions

View file

@ -1,6 +1,7 @@
package postgres
import (
"log"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/url"
)
@ -27,16 +28,25 @@ func (conn *Conn) InsertWebStatsPerformance(sessionID uint64, p *PerformanceTrac
$10, $11, $12,
$13, $14, $15
)`
conn.batchQueue(sessionID, sqlRequest,
//conn.batchQueue(sessionID, sqlRequest,
// sessionID, timestamp, timestamp, // ??? TODO: primary key by timestamp+session_id
// p.MinFPS, p.AvgFPS, p.MaxFPS,
// p.MinCPU, p.AvgCPU, p.MinCPU,
// p.MinTotalJSHeapSize, p.AvgTotalJSHeapSize, p.MaxTotalJSHeapSize,
// p.MinUsedJSHeapSize, p.AvgUsedJSHeapSize, p.MaxUsedJSHeapSize,
//)
if err := conn.exec(sqlRequest,
sessionID, timestamp, timestamp, // ??? TODO: primary key by timestamp+session_id
p.MinFPS, p.AvgFPS, p.MaxFPS,
p.MinCPU, p.AvgCPU, p.MinCPU,
p.MinTotalJSHeapSize, p.AvgTotalJSHeapSize, p.MaxTotalJSHeapSize,
p.MinUsedJSHeapSize, p.AvgUsedJSHeapSize, p.MaxUsedJSHeapSize,
)
); err != nil {
log.Printf("can't insert perf: %s", err)
}
// Record approximate message size
conn.updateBatchSize(sessionID, len(sqlRequest)+8*15)
//conn.updateBatchSize(sessionID, len(sqlRequest)+8*15)
return nil
}

View file

@ -242,7 +242,7 @@ func (conn *Conn) InsertWebFetchEvent(sessionID uint64, savePayload bool, e *Fet
// Record approximate message size
conn.updateBatchSize(sessionID, len(sqlRequest)+len(e.URL)+len(host)+len(path)+len(query)+
len(*request)+len(*response)+len(url.EnsureMethod(e.Method))+8*5+1)
len(e.Request)+len(e.Response)+len(url.EnsureMethod(e.Method))+8*5+1)
return nil
}
@ -269,6 +269,6 @@ func (conn *Conn) InsertWebGraphQLEvent(sessionID uint64, savePayload bool, e *G
)
// Record approximate message size
conn.updateBatchSize(sessionID, len(sqlRequest)+len(e.OperationName)+len(*request)+len(*response)+8*3)
conn.updateBatchSize(sessionID, len(sqlRequest)+len(e.OperationName)+len(e.Variables)+len(e.Response)+8*3)
return nil
}

View file

@ -49,11 +49,16 @@ func (b *builder) handleMessage(message Message, messageID uint64) {
}
timestamp := GetTimestamp(message)
if timestamp == 0 {
log.Printf("skip message with empty timestamp, sessID: %d, msgID: %d, msgType: %d", b.sessionID, messageID, message.TypeID())
switch message.(type) {
case *SessionEnd, *IssueEvent, *PerformanceTrackAggr:
break
default:
log.Printf("skip message with empty timestamp, sessID: %d, msgID: %d, msgType: %d", b.sessionID, messageID, message.TypeID())
}
return
}
if timestamp < b.timestamp {
log.Printf("skip message with wrong timestamp, sessID: %d, msgID: %d, type: %d, msgTS: %d, lastTS: %d", b.sessionID, messageID, message.TypeID(), timestamp, b.timestamp)
//log.Printf("skip message with wrong timestamp, sessID: %d, msgID: %d, type: %d, msgTS: %d, lastTS: %d", b.sessionID, messageID, message.TypeID(), timestamp, b.timestamp)
} else {
b.timestamp = timestamp
}