diff --git a/backend/cmd/ender/main.go b/backend/cmd/ender/main.go index 9513dc16c..8271e6e38 100644 --- a/backend/cmd/ender/main.go +++ b/backend/cmd/ender/main.go @@ -49,12 +49,17 @@ func main() { cfg.TopicRawWeb, }, func(sessionID uint64, msg messages.Message, meta *types.Meta) { - if msg.TypeID() == 3 { - // Skip message end + switch msg.(type) { + case *messages.SessionStart, *messages.SessionEnd, *messages.RawErrorEvent: + // Skip several message types return } + // Test debug + if msg.Meta().Timestamp == 0 { + log.Printf("ZERO TS, sessID: %d, msgType: %d", sessionID, msg.TypeID()) + } statsLogger.Collect(sessionID, meta) - sessions.UpdateSession(sessionID, meta.Timestamp) + sessions.UpdateSession(sessionID, meta.Timestamp, msg.Meta().Timestamp) }, false, ) diff --git a/backend/internal/sessionender/ender.go b/backend/internal/sessionender/ender.go index 107618f59..f43a9029b 100644 --- a/backend/internal/sessionender/ender.go +++ b/backend/internal/sessionender/ender.go @@ -16,6 +16,7 @@ type EndedSessionHandler func(sessionID uint64, timestamp int64) bool type session struct { lastTimestamp int64 lastUpdate int64 + lastUserTime int64 isEnded bool } @@ -51,7 +52,7 @@ func New(metrics *monitoring.Metrics, timeout int64, parts int) (*SessionEnder, } // UpdateSession save timestamp for new sessions and update for existing sessions -func (se *SessionEnder) UpdateSession(sessionID uint64, timestamp int64) { +func (se *SessionEnder) UpdateSession(sessionID uint64, timestamp, msgTimestamp int64) { localTS := time.Now().UnixMilli() currTS := timestamp if currTS == 0 { @@ -62,11 +63,12 @@ func (se *SessionEnder) UpdateSession(sessionID uint64, timestamp int64) { sess, ok := se.sessions[sessionID] if !ok { se.sessions[sessionID] = &session{ - lastTimestamp: currTS, // timestamp from message broker - lastUpdate: localTS, // local timestamp + lastTimestamp: currTS, // timestamp from message broker + lastUpdate: localTS, // local timestamp + lastUserTime: msgTimestamp, // last timestamp from user's machine isEnded: false, } - log.Printf("added new session: %d", sessionID) + //log.Printf("added new session: %d", sessionID) se.activeSessions.Add(context.Background(), 1) se.totalSessions.Add(context.Background(), 1) return @@ -74,6 +76,7 @@ func (se *SessionEnder) UpdateSession(sessionID uint64, timestamp int64) { if currTS > sess.lastTimestamp { sess.lastTimestamp = currTS sess.lastUpdate = localTS + sess.lastUserTime = msgTimestamp sess.isEnded = false } } @@ -86,10 +89,12 @@ func (se *SessionEnder) HandleEndedSessions(handler EndedSessionHandler) { if sess.isEnded || (se.timeCtrl.LastTimestamp(sessID)-sess.lastTimestamp > se.timeout) || (currTime-sess.lastUpdate > se.timeout) { sess.isEnded = true - if handler(sessID, sess.lastTimestamp) { + if handler(sessID, sess.lastUserTime) { delete(se.sessions, sessID) se.activeSessions.Add(context.Background(), -1) removedSessions++ + } else { + log.Printf("sessID: %d, userTime: %d", sessID, sess.lastUserTime) } } } diff --git a/backend/pkg/db/cache/messages-common.go b/backend/pkg/db/cache/messages-common.go index eb4365a61..cebdaf5e7 100644 --- a/backend/pkg/db/cache/messages-common.go +++ b/backend/pkg/db/cache/messages-common.go @@ -16,6 +16,9 @@ func (c *PGCache) InsertSessionEnd(sessionID uint64, timestamp uint64) error { } func (c *PGCache) HandleSessionEnd(sessionID uint64) error { + if err := c.Conn.HandleSessionEnd(sessionID); err != nil { + log.Printf("can't handle session end: %s", err) + } c.DeleteSession(sessionID) return nil } diff --git a/backend/pkg/db/postgres/connector.go b/backend/pkg/db/postgres/connector.go index 532db3336..61f274aed 100644 --- a/backend/pkg/db/postgres/connector.go +++ b/backend/pkg/db/postgres/connector.go @@ -3,6 +3,7 @@ package postgres import ( "context" "log" + "strings" "time" "github.com/jackc/pgx/v4" @@ -14,10 +15,16 @@ func getTimeoutContext() context.Context { return ctx } +type batchItem struct { + query string + arguments []interface{} +} + type Conn struct { c *pgxpool.Pool // TODO: conditional usage of Pool/Conn (use interface?) batches map[uint64]*pgx.Batch batchSizes map[uint64]int + rawBatches map[uint64][]*batchItem batchQueueLimit int batchSizeLimit int } @@ -32,6 +39,7 @@ func NewConn(url string, queueLimit, sizeLimit int) *Conn { c: c, batches: make(map[uint64]*pgx.Batch), batchSizes: make(map[uint64]int), + rawBatches: make(map[uint64][]*batchItem), batchQueueLimit: queueLimit, batchSizeLimit: sizeLimit, } @@ -46,9 +54,17 @@ func (conn *Conn) batchQueue(sessionID uint64, sql string, args ...interface{}) batch, ok := conn.batches[sessionID] if !ok { conn.batches[sessionID] = &pgx.Batch{} + conn.rawBatches[sessionID] = make([]*batchItem, 0) batch = conn.batches[sessionID] } batch.Queue(sql, args...) + // Temp raw batch store + raw := conn.rawBatches[sessionID] + raw = append(raw, &batchItem{ + query: sql, + arguments: args, + }) + conn.rawBatches[sessionID] = raw } func (conn *Conn) CommitBatches() { @@ -58,12 +74,16 @@ func (conn *Conn) CommitBatches() { for i := 0; i < l; i++ { if ct, err := br.Exec(); err != nil { log.Printf("Error in PG batch (command tag %s, session: %d): %v \n", ct.String(), sessID, err) + failedSql := conn.rawBatches[sessID][i] + query := strings.ReplaceAll(failedSql.query, "\n", " ") + log.Println("failed sql req:", query, failedSql.arguments) } } br.Close() // returns err } conn.batches = make(map[uint64]*pgx.Batch) conn.batchSizes = make(map[uint64]int) + conn.rawBatches = make(map[uint64][]*batchItem) } func (conn *Conn) updateBatchSize(sessionID uint64, reqSize int) { @@ -85,6 +105,9 @@ func (conn *Conn) commitBatch(sessionID uint64) { for i := 0; i < l; i++ { if ct, err := br.Exec(); err != nil { log.Printf("Error in PG batch (command tag %s, session: %d): %v \n", ct.String(), sessionID, err) + failedSql := conn.rawBatches[sessionID][i] + query := strings.ReplaceAll(failedSql.query, "\n", " ") + log.Println("failed sql req:", query, failedSql.arguments) } } br.Close() @@ -92,6 +115,7 @@ func (conn *Conn) commitBatch(sessionID uint64) { // Clean batch info delete(conn.batches, sessionID) delete(conn.batchSizes, sessionID) + delete(conn.rawBatches, sessionID) } func (conn *Conn) query(sql string, args ...interface{}) (pgx.Rows, error) { diff --git a/backend/pkg/db/postgres/messages-common.go b/backend/pkg/db/postgres/messages-common.go index b92e4fd6f..a68d2c814 100644 --- a/backend/pkg/db/postgres/messages-common.go +++ b/backend/pkg/db/postgres/messages-common.go @@ -2,6 +2,7 @@ package postgres import ( "fmt" + "log" "strings" "openreplay/backend/pkg/db/types" @@ -31,10 +32,13 @@ func (conn *Conn) insertAutocompleteValue(sessionID uint64, tp string, value str FROM sessions WHERE session_id = $3 ) ON CONFLICT DO NOTHING` - conn.batchQueue(sessionID, sqlRequest, value, tp, sessionID) + if err := conn.exec(sqlRequest, value, tp, sessionID); err != nil { + log.Printf("can't insert autocomplete: %s", err) + } + //conn.batchQueue(sessionID, sqlRequest, value, tp, sessionID) // Record approximate message size - conn.updateBatchSize(sessionID, len(sqlRequest)+len(value)+len(tp)+8) + //conn.updateBatchSize(sessionID, len(sqlRequest)+len(value)+len(tp)+8) } func (conn *Conn) InsertSessionStart(sessionID uint64, s *types.Session) error { @@ -93,7 +97,7 @@ func (conn *Conn) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64, return dur, nil } -func (conn *Conn) HandleSessionEnd(sessionID uint64, timestamp uint64) error { +func (conn *Conn) HandleSessionEnd(sessionID uint64) error { // TODO: search acceleration? sqlRequest := ` UPDATE sessions diff --git a/backend/pkg/db/postgres/messages-web-stats.go b/backend/pkg/db/postgres/messages-web-stats.go index 396f2e74d..2a5a11750 100644 --- a/backend/pkg/db/postgres/messages-web-stats.go +++ b/backend/pkg/db/postgres/messages-web-stats.go @@ -1,6 +1,7 @@ package postgres import ( + "log" . "openreplay/backend/pkg/messages" "openreplay/backend/pkg/url" ) @@ -27,16 +28,25 @@ func (conn *Conn) InsertWebStatsPerformance(sessionID uint64, p *PerformanceTrac $10, $11, $12, $13, $14, $15 )` - conn.batchQueue(sessionID, sqlRequest, + //conn.batchQueue(sessionID, sqlRequest, + // sessionID, timestamp, timestamp, // ??? TODO: primary key by timestamp+session_id + // p.MinFPS, p.AvgFPS, p.MaxFPS, + // p.MinCPU, p.AvgCPU, p.MinCPU, + // p.MinTotalJSHeapSize, p.AvgTotalJSHeapSize, p.MaxTotalJSHeapSize, + // p.MinUsedJSHeapSize, p.AvgUsedJSHeapSize, p.MaxUsedJSHeapSize, + //) + if err := conn.exec(sqlRequest, sessionID, timestamp, timestamp, // ??? TODO: primary key by timestamp+session_id p.MinFPS, p.AvgFPS, p.MaxFPS, p.MinCPU, p.AvgCPU, p.MinCPU, p.MinTotalJSHeapSize, p.AvgTotalJSHeapSize, p.MaxTotalJSHeapSize, p.MinUsedJSHeapSize, p.AvgUsedJSHeapSize, p.MaxUsedJSHeapSize, - ) + ); err != nil { + log.Printf("can't insert perf: %s", err) + } // Record approximate message size - conn.updateBatchSize(sessionID, len(sqlRequest)+8*15) + //conn.updateBatchSize(sessionID, len(sqlRequest)+8*15) return nil } diff --git a/backend/pkg/db/postgres/messages-web.go b/backend/pkg/db/postgres/messages-web.go index db5b9d80c..e703ee933 100644 --- a/backend/pkg/db/postgres/messages-web.go +++ b/backend/pkg/db/postgres/messages-web.go @@ -242,7 +242,7 @@ func (conn *Conn) InsertWebFetchEvent(sessionID uint64, savePayload bool, e *Fet // Record approximate message size conn.updateBatchSize(sessionID, len(sqlRequest)+len(e.URL)+len(host)+len(path)+len(query)+ - len(*request)+len(*response)+len(url.EnsureMethod(e.Method))+8*5+1) + len(e.Request)+len(e.Response)+len(url.EnsureMethod(e.Method))+8*5+1) return nil } @@ -269,6 +269,6 @@ func (conn *Conn) InsertWebGraphQLEvent(sessionID uint64, savePayload bool, e *G ) // Record approximate message size - conn.updateBatchSize(sessionID, len(sqlRequest)+len(e.OperationName)+len(*request)+len(*response)+8*3) + conn.updateBatchSize(sessionID, len(sqlRequest)+len(e.OperationName)+len(e.Variables)+len(e.Response)+8*3) return nil } diff --git a/backend/pkg/sessions/builder.go b/backend/pkg/sessions/builder.go index e764a7f20..eed3d8229 100644 --- a/backend/pkg/sessions/builder.go +++ b/backend/pkg/sessions/builder.go @@ -49,11 +49,16 @@ func (b *builder) handleMessage(message Message, messageID uint64) { } timestamp := GetTimestamp(message) if timestamp == 0 { - log.Printf("skip message with empty timestamp, sessID: %d, msgID: %d, msgType: %d", b.sessionID, messageID, message.TypeID()) + switch message.(type) { + case *SessionEnd, *IssueEvent, *PerformanceTrackAggr: + break + default: + log.Printf("skip message with empty timestamp, sessID: %d, msgID: %d, msgType: %d", b.sessionID, messageID, message.TypeID()) + } return } if timestamp < b.timestamp { - log.Printf("skip message with wrong timestamp, sessID: %d, msgID: %d, type: %d, msgTS: %d, lastTS: %d", b.sessionID, messageID, message.TypeID(), timestamp, b.timestamp) + //log.Printf("skip message with wrong timestamp, sessID: %d, msgID: %d, type: %d, msgTS: %d, lastTS: %d", b.sessionID, messageID, message.TypeID(), timestamp, b.timestamp) } else { b.timestamp = timestamp }