From 95c9b6e3f56497f516e7ba5aadb20f5822230b28 Mon Sep 17 00:00:00 2001 From: Alexander Zavorotynskiy Date: Mon, 27 Jun 2022 10:35:05 +0200 Subject: [PATCH] feat(backend): minor fixes after prerelease tests --- backend/internal/http/router/handlers-web.go | 8 +- backend/pkg/db/postgres/messages-web-stats.go | 16 +- backend/pkg/messages/batch.go | 3 +- backend/pkg/sessions/builder.go | 2 +- .../internal/{ => db}/datasaver/stats.go | 0 ee/backend/pkg/db/clickhouse/connector.go | 98 +------ ee/backend/pkg/db/clickhouse/messages-ios.go | 240 +++++++++--------- 7 files changed, 142 insertions(+), 225 deletions(-) rename ee/backend/internal/{ => db}/datasaver/stats.go (100%) diff --git a/backend/internal/http/router/handlers-web.go b/backend/internal/http/router/handlers-web.go index 69cbe4675..a8b47c559 100644 --- a/backend/internal/http/router/handlers-web.go +++ b/backend/internal/http/router/handlers-web.go @@ -118,10 +118,14 @@ func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) } // Save sessionStart to db - e.services.Database.InsertWebSessionStart(sessionID, sessionStart) + if err := e.services.Database.InsertWebSessionStart(sessionID, sessionStart); err != nil { + log.Printf("can't insert session start: %s", err) + } // Send sessionStart message to kafka - e.services.Producer.Produce(e.cfg.TopicRawWeb, tokenData.ID, Encode(sessionStart)) + if err := e.services.Producer.Produce(e.cfg.TopicRawWeb, tokenData.ID, Encode(sessionStart)); err != nil { + log.Printf("can't send session start: %s", err) + } } ResponseWithJSON(w, &StartSessionResponse{ diff --git a/backend/pkg/db/postgres/messages-web-stats.go b/backend/pkg/db/postgres/messages-web-stats.go index 2a5a11750..396f2e74d 100644 --- a/backend/pkg/db/postgres/messages-web-stats.go +++ b/backend/pkg/db/postgres/messages-web-stats.go @@ -1,7 +1,6 @@ package postgres import ( - "log" . "openreplay/backend/pkg/messages" "openreplay/backend/pkg/url" ) @@ -28,25 +27,16 @@ func (conn *Conn) InsertWebStatsPerformance(sessionID uint64, p *PerformanceTrac $10, $11, $12, $13, $14, $15 )` - //conn.batchQueue(sessionID, sqlRequest, - // sessionID, timestamp, timestamp, // ??? TODO: primary key by timestamp+session_id - // p.MinFPS, p.AvgFPS, p.MaxFPS, - // p.MinCPU, p.AvgCPU, p.MinCPU, - // p.MinTotalJSHeapSize, p.AvgTotalJSHeapSize, p.MaxTotalJSHeapSize, - // p.MinUsedJSHeapSize, p.AvgUsedJSHeapSize, p.MaxUsedJSHeapSize, - //) - if err := conn.exec(sqlRequest, + conn.batchQueue(sessionID, sqlRequest, sessionID, timestamp, timestamp, // ??? TODO: primary key by timestamp+session_id p.MinFPS, p.AvgFPS, p.MaxFPS, p.MinCPU, p.AvgCPU, p.MinCPU, p.MinTotalJSHeapSize, p.AvgTotalJSHeapSize, p.MaxTotalJSHeapSize, p.MinUsedJSHeapSize, p.AvgUsedJSHeapSize, p.MaxUsedJSHeapSize, - ); err != nil { - log.Printf("can't insert perf: %s", err) - } + ) // Record approximate message size - //conn.updateBatchSize(sessionID, len(sqlRequest)+8*15) + conn.updateBatchSize(sessionID, len(sqlRequest)+8*15) return nil } diff --git a/backend/pkg/messages/batch.go b/backend/pkg/messages/batch.go index 2d531c429..84f6cde99 100644 --- a/backend/pkg/messages/batch.go +++ b/backend/pkg/messages/batch.go @@ -40,7 +40,8 @@ func ReadBatchReader(reader io.Reader, messageHandler func(Message)) error { // No skipping here for making it easy to encode back the same sequence of message // continue readLoop case *SessionStart: - // Save session start timestamp for collecting "empty" sessions + timestamp = int64(m.Timestamp) + case *SessionEnd: timestamp = int64(m.Timestamp) } msg.Meta().Index = index diff --git a/backend/pkg/sessions/builder.go b/backend/pkg/sessions/builder.go index eed3d8229..c9cb0b6dd 100644 --- a/backend/pkg/sessions/builder.go +++ b/backend/pkg/sessions/builder.go @@ -50,7 +50,7 @@ func (b *builder) handleMessage(message Message, messageID uint64) { timestamp := GetTimestamp(message) if timestamp == 0 { switch message.(type) { - case *SessionEnd, *IssueEvent, *PerformanceTrackAggr: + case *IssueEvent, *PerformanceTrackAggr: break default: log.Printf("skip message with empty timestamp, sessID: %d, msgID: %d, msgType: %d", b.sessionID, messageID, message.TypeID()) diff --git a/ee/backend/internal/datasaver/stats.go b/ee/backend/internal/db/datasaver/stats.go similarity index 100% rename from ee/backend/internal/datasaver/stats.go rename to ee/backend/internal/db/datasaver/stats.go diff --git a/ee/backend/pkg/db/clickhouse/connector.go b/ee/backend/pkg/db/clickhouse/connector.go index 532389425..cc0d20497 100644 --- a/ee/backend/pkg/db/clickhouse/connector.go +++ b/ee/backend/pkg/db/clickhouse/connector.go @@ -9,23 +9,16 @@ import ( ) type Connector struct { - sessionsIOS *bulk - //viewsIOS *bulk - clicksIOS *bulk - inputsIOS *bulk - crashesIOS *bulk - performanceIOS *bulk - resourcesIOS *bulk - sessions *bulk - metadata *bulk // TODO: join sessions, sessions_metadata & sessions_ios - resources *bulk - pages *bulk - clicks *bulk - inputs *bulk - errors *bulk - performance *bulk - longtasks *bulk - db *sql.DB + sessions *bulk + metadata *bulk // TODO: join sessions, sessions_metadata & sessions_ios + resources *bulk + pages *bulk + clicks *bulk + inputs *bulk + errors *bulk + performance *bulk + longtasks *bulk + db *sql.DB } func NewConnector(url string) *Connector { @@ -37,35 +30,6 @@ func NewConnector(url string) *Connector { } return &Connector{ db: db, - // sessionsIOS: newBulk(db, ` - // INSERT INTO sessions_ios (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, datetime, duration, views_count, events_count, crashes_count, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10) - // VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - // `), - // viewsIOS: newBulk(db, ` - // INSERT INTO views_ios (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, datetime, url, request_start, response_start, response_end, dom_content_loaded_event_start, dom_content_loaded_event_end, load_event_start, load_event_end, first_paint, first_contentful_paint, speed_index, visually_complete, time_to_interactive) - // VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - // `), - // clicksIOS: newBulk(db, ` - // INSERT INTO clicks_ios (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, datetime, label) - // VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - // `), - // inputsIOS: newBulk(db, ` - // INSERT INTO inputs_ios (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, datetime, label) - // VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - // `), - // crashesIOS: newBulk(db, ` - // INSERT INTO crashes_ios (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, datetime, name, reason, crash_id) - // VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - // `), - // performanceIOS: newBulk(db, ` - // INSERT INTO performance_ios (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, datetime, min_fps, avg_fps, max_fps, min_cpu, avg_cpu, max_cpu, min_memory, avg_memory, max_memory) - // VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - // `), - // resourcesIOS: newBulk(db, ` - // INSERT INTO resources_ios (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, datetime, url, duration, body_size, success, method, status) - // VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, nullIf(?, ''), ?) - // `), - sessions: newBulk(db, ` INSERT INTO sessions (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, datetime, duration, pages_count, events_count, errors_count, user_browser, user_browser_version) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) @@ -107,27 +71,6 @@ func NewConnector(url string) *Connector { } func (conn *Connector) Prepare() error { - // if err := conn.sessionsIOS.prepare(); err != nil { - // return err - // } - // // if err := conn.viewsIOS.prepare(); err != nil { - // // return err - // // } - // if err := conn.clicksIOS.prepare(); err != nil { - // return err - // } - // if err := conn.inputsIOS.prepare(); err != nil { - // return err - // } - // if err := conn.crashesIOS.prepare(); err != nil { - // return err - // } - // if err := conn.performanceIOS.prepare(); err != nil { - // return err - // } - // if err := conn.resourcesIOS.prepare(); err != nil { - // return err - // } if err := conn.sessions.prepare(); err != nil { return err } @@ -159,27 +102,6 @@ func (conn *Connector) Prepare() error { } func (conn *Connector) Commit() error { - // if err := conn.sessionsIOS.commit(); err != nil { - // return err - // } - // // if err := conn.viewsIOS.commit(); err != nil { - // // return err - // // } - // if err := conn.clicksIOS.commit(); err != nil { - // return err - // } - // if err := conn.inputsIOS.commit(); err != nil { - // return err - // } - // if err := conn.crashesIOS.commit(); err != nil { - // return err - // } - // if err := conn.performanceIOS.commit(); err != nil { - // return err - // } - // if err := conn.resourcesIOS.commit(); err != nil { - // return err - // } if err := conn.sessions.commit(); err != nil { return err } diff --git a/ee/backend/pkg/db/clickhouse/messages-ios.go b/ee/backend/pkg/db/clickhouse/messages-ios.go index f5ca30495..1c7723b2b 100644 --- a/ee/backend/pkg/db/clickhouse/messages-ios.go +++ b/ee/backend/pkg/db/clickhouse/messages-ios.go @@ -2,46 +2,41 @@ package clickhouse import ( "errors" - - "openreplay/backend/pkg/hashid" - "openreplay/backend/pkg/url" - . "openreplay/backend/pkg/db/types" - . "openreplay/backend/pkg/messages" ) - // TODO: join sessions & sessions_ios clcikhouse tables func (conn *Connector) InsertIOSSession(session *Session) error { - if (session.Duration == nil) { + if session.Duration == nil { return errors.New("Clickhouse: trying to insert session with ") } - return conn.sessionsIOS.exec( - session.SessionID, - session.ProjectID, - session.TrackerVersion, - nullableString(session.RevID), - session.UserUUID, - session.UserOS, - nullableString(session.UserOSVersion), - nullableString(session.UserDevice), - session.UserDeviceType, - session.UserCountry, - datetime(session.Timestamp), - uint32(*session.Duration), - session.PagesCount, - session.EventsCount, - session.ErrorsCount, - session.Metadata1, - session.Metadata2, - session.Metadata3, - session.Metadata4, - session.Metadata5, - session.Metadata6, - session.Metadata7, - session.Metadata8, - session.Metadata9, - session.Metadata10, - ) + //return conn.sessionsIOS.exec( + // session.SessionID, + // session.ProjectID, + // session.TrackerVersion, + // nullableString(session.RevID), + // session.UserUUID, + // session.UserOS, + // nullableString(session.UserOSVersion), + // nullableString(session.UserDevice), + // session.UserDeviceType, + // session.UserCountry, + // datetime(session.Timestamp), + // uint32(*session.Duration), + // session.PagesCount, + // session.EventsCount, + // session.ErrorsCount, + // session.Metadata1, + // session.Metadata2, + // session.Metadata3, + // session.Metadata4, + // session.Metadata5, + // session.Metadata6, + // session.Metadata7, + // session.Metadata8, + // session.Metadata9, + // session.Metadata10, + //) + return nil } // func (conn *Connector) IOSScreenEnter(session *Session, msg *PageEvent) error { @@ -76,105 +71,110 @@ func (conn *Connector) InsertIOSClickEvent(session *Session, msg *IOSClickEvent) if msg.Label == "" { return nil } - return conn.clicksIOS.exec( - session.SessionID, - session.ProjectID, - session.TrackerVersion, - nullableString(session.RevID), - session.UserUUID, - session.UserOS, - nullableString(session.UserOSVersion), - nullableString(session.UserDevice), - session.UserDeviceType, - session.UserCountry, - datetime(msg.Timestamp), - msg.Label, - ) + //return conn.clicksIOS.exec( + // session.SessionID, + // session.ProjectID, + // session.TrackerVersion, + // nullableString(session.RevID), + // session.UserUUID, + // session.UserOS, + // nullableString(session.UserOSVersion), + // nullableString(session.UserDevice), + // session.UserDeviceType, + // session.UserCountry, + // datetime(msg.Timestamp), + // msg.Label, + //) + return nil } func (conn *Connector) InsertIOSInputEvent(session *Session, msg *IOSInputEvent) error { if msg.Label == "" { return nil } - return conn.inputsIOS.exec( - session.SessionID, - session.ProjectID, - session.TrackerVersion, - nullableString(session.RevID), - session.UserUUID, - session.UserOS, - nullableString(session.UserOSVersion), - nullableString(session.UserDevice), - session.UserDeviceType, - session.UserCountry, - datetime(msg.Timestamp), - msg.Label, - ) + //return conn.inputsIOS.exec( + // session.SessionID, + // session.ProjectID, + // session.TrackerVersion, + // nullableString(session.RevID), + // session.UserUUID, + // session.UserOS, + // nullableString(session.UserOSVersion), + // nullableString(session.UserDevice), + // session.UserDeviceType, + // session.UserCountry, + // datetime(msg.Timestamp), + // msg.Label, + //) + return nil } func (conn *Connector) InsertIOSCrash(session *Session, msg *IOSCrash) error { - return conn.crashesIOS.exec( - session.SessionID, - session.ProjectID, - session.TrackerVersion, - nullableString(session.RevID), - session.UserUUID, - session.UserOS, - nullableString(session.UserOSVersion), - nullableString(session.UserDevice), - session.UserDeviceType, - session.UserCountry, - datetime(msg.Timestamp), - msg.Name, - msg.Reason, - hashid.IOSCrashID(session.ProjectID, msg), - ) + //return conn.crashesIOS.exec( + // session.SessionID, + // session.ProjectID, + // session.TrackerVersion, + // nullableString(session.RevID), + // session.UserUUID, + // session.UserOS, + // nullableString(session.UserOSVersion), + // nullableString(session.UserDevice), + // session.UserDeviceType, + // session.UserCountry, + // datetime(msg.Timestamp), + // msg.Name, + // msg.Reason, + // hashid.IOSCrashID(session.ProjectID, msg), + //) + return nil } func (conn *Connector) InsertIOSNetworkCall(session *Session, msg *IOSNetworkCall) error { - return conn.resourcesIOS.exec( - session.SessionID, - session.ProjectID, - session.TrackerVersion, - nullableString(session.RevID), - session.UserUUID, - session.UserOS, - nullableString(session.UserOSVersion), - nullableString(session.UserDevice), - session.UserDeviceType, - session.UserCountry, - datetime(msg.Timestamp), - url.DiscardURLQuery(msg.URL), - nullableUint16(uint16(msg.Duration)), - nullableUint32(uint32(len(msg.Body))), - msg.Success, - url.EnsureMethod(msg.Method), // nullableString causes error "unexpected type *string" - nullableUint16(uint16(msg.Status)), - ) + //return conn.resourcesIOS.exec( + // session.SessionID, + // session.ProjectID, + // session.TrackerVersion, + // nullableString(session.RevID), + // session.UserUUID, + // session.UserOS, + // nullableString(session.UserOSVersion), + // nullableString(session.UserDevice), + // session.UserDeviceType, + // session.UserCountry, + // datetime(msg.Timestamp), + // url.DiscardURLQuery(msg.URL), + // nullableUint16(uint16(msg.Duration)), + // nullableUint32(uint32(len(msg.Body))), + // msg.Success, + // url.EnsureMethod(msg.Method), // nullableString causes error "unexpected type *string" + // nullableUint16(uint16(msg.Status)), + //) + return nil } func (conn *Connector) InsertIOSPerformanceAggregated(session *Session, msg *IOSPerformanceAggregated) error { - var timestamp uint64 = (msg.TimestampStart + msg.TimestampEnd) / 2 - return conn.performanceIOS.exec( - session.SessionID, - session.ProjectID, - session.TrackerVersion, - nullableString(session.RevID), - session.UserUUID, - session.UserOS, - nullableString(session.UserOSVersion), - nullableString(session.UserDevice), - session.UserDeviceType, - session.UserCountry, - datetime(timestamp), - uint8(msg.MinFPS), - uint8(msg.AvgFPS), - uint8(msg.MaxFPS), - uint8(msg.MinCPU), - uint8(msg.AvgCPU), - uint8(msg.MaxCPU), - msg.MinMemory, - msg.AvgMemory, - msg.MaxMemory, - ) + //var timestamp uint64 = (msg.TimestampStart + msg.TimestampEnd) / 2 + //return conn.performanceIOS.exec( + // session.SessionID, + // session.ProjectID, + // session.TrackerVersion, + // nullableString(session.RevID), + // session.UserUUID, + // session.UserOS, + // nullableString(session.UserOSVersion), + // nullableString(session.UserDevice), + // session.UserDeviceType, + // session.UserCountry, + // datetime(timestamp), + // uint8(msg.MinFPS), + // uint8(msg.AvgFPS), + // uint8(msg.MaxFPS), + // uint8(msg.MinCPU), + // uint8(msg.AvgCPU), + // uint8(msg.MaxCPU), + // msg.MinMemory, + // msg.AvgMemory, + // msg.MaxMemory, + //) + return nil }