feat(backend): minor fixes after prerelease tests

This commit is contained in:
Alexander Zavorotynskiy 2022-06-27 10:35:05 +02:00
parent 179dbd22d5
commit 95c9b6e3f5
7 changed files with 142 additions and 225 deletions

View file

@ -118,10 +118,14 @@ func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request)
}
// Save sessionStart to db
e.services.Database.InsertWebSessionStart(sessionID, sessionStart)
if err := e.services.Database.InsertWebSessionStart(sessionID, sessionStart); err != nil {
log.Printf("can't insert session start: %s", err)
}
// Send sessionStart message to kafka
e.services.Producer.Produce(e.cfg.TopicRawWeb, tokenData.ID, Encode(sessionStart))
if err := e.services.Producer.Produce(e.cfg.TopicRawWeb, tokenData.ID, Encode(sessionStart)); err != nil {
log.Printf("can't send session start: %s", err)
}
}
ResponseWithJSON(w, &StartSessionResponse{

View file

@ -1,7 +1,6 @@
package postgres
import (
"log"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/url"
)
@ -28,25 +27,16 @@ func (conn *Conn) InsertWebStatsPerformance(sessionID uint64, p *PerformanceTrac
$10, $11, $12,
$13, $14, $15
)`
//conn.batchQueue(sessionID, sqlRequest,
// sessionID, timestamp, timestamp, // ??? TODO: primary key by timestamp+session_id
// p.MinFPS, p.AvgFPS, p.MaxFPS,
// p.MinCPU, p.AvgCPU, p.MinCPU,
// p.MinTotalJSHeapSize, p.AvgTotalJSHeapSize, p.MaxTotalJSHeapSize,
// p.MinUsedJSHeapSize, p.AvgUsedJSHeapSize, p.MaxUsedJSHeapSize,
//)
if err := conn.exec(sqlRequest,
conn.batchQueue(sessionID, sqlRequest,
sessionID, timestamp, timestamp, // ??? TODO: primary key by timestamp+session_id
p.MinFPS, p.AvgFPS, p.MaxFPS,
p.MinCPU, p.AvgCPU, p.MinCPU,
p.MinTotalJSHeapSize, p.AvgTotalJSHeapSize, p.MaxTotalJSHeapSize,
p.MinUsedJSHeapSize, p.AvgUsedJSHeapSize, p.MaxUsedJSHeapSize,
); err != nil {
log.Printf("can't insert perf: %s", err)
}
)
// Record approximate message size
//conn.updateBatchSize(sessionID, len(sqlRequest)+8*15)
conn.updateBatchSize(sessionID, len(sqlRequest)+8*15)
return nil
}

View file

@ -40,7 +40,8 @@ func ReadBatchReader(reader io.Reader, messageHandler func(Message)) error {
// No skipping here for making it easy to encode back the same sequence of message
// continue readLoop
case *SessionStart:
// Save session start timestamp for collecting "empty" sessions
timestamp = int64(m.Timestamp)
case *SessionEnd:
timestamp = int64(m.Timestamp)
}
msg.Meta().Index = index

View file

@ -50,7 +50,7 @@ func (b *builder) handleMessage(message Message, messageID uint64) {
timestamp := GetTimestamp(message)
if timestamp == 0 {
switch message.(type) {
case *SessionEnd, *IssueEvent, *PerformanceTrackAggr:
case *IssueEvent, *PerformanceTrackAggr:
break
default:
log.Printf("skip message with empty timestamp, sessID: %d, msgID: %d, msgType: %d", b.sessionID, messageID, message.TypeID())

View file

@ -9,23 +9,16 @@ import (
)
type Connector struct {
sessionsIOS *bulk
//viewsIOS *bulk
clicksIOS *bulk
inputsIOS *bulk
crashesIOS *bulk
performanceIOS *bulk
resourcesIOS *bulk
sessions *bulk
metadata *bulk // TODO: join sessions, sessions_metadata & sessions_ios
resources *bulk
pages *bulk
clicks *bulk
inputs *bulk
errors *bulk
performance *bulk
longtasks *bulk
db *sql.DB
sessions *bulk
metadata *bulk // TODO: join sessions, sessions_metadata & sessions_ios
resources *bulk
pages *bulk
clicks *bulk
inputs *bulk
errors *bulk
performance *bulk
longtasks *bulk
db *sql.DB
}
func NewConnector(url string) *Connector {
@ -37,35 +30,6 @@ func NewConnector(url string) *Connector {
}
return &Connector{
db: db,
// sessionsIOS: newBulk(db, `
// INSERT INTO sessions_ios (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, datetime, duration, views_count, events_count, crashes_count, metadata_1, metadata_2, metadata_3, metadata_4, metadata_5, metadata_6, metadata_7, metadata_8, metadata_9, metadata_10)
// VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
// `),
// viewsIOS: newBulk(db, `
// INSERT INTO views_ios (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, datetime, url, request_start, response_start, response_end, dom_content_loaded_event_start, dom_content_loaded_event_end, load_event_start, load_event_end, first_paint, first_contentful_paint, speed_index, visually_complete, time_to_interactive)
// VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
// `),
// clicksIOS: newBulk(db, `
// INSERT INTO clicks_ios (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, datetime, label)
// VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
// `),
// inputsIOS: newBulk(db, `
// INSERT INTO inputs_ios (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, datetime, label)
// VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
// `),
// crashesIOS: newBulk(db, `
// INSERT INTO crashes_ios (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, datetime, name, reason, crash_id)
// VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
// `),
// performanceIOS: newBulk(db, `
// INSERT INTO performance_ios (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, datetime, min_fps, avg_fps, max_fps, min_cpu, avg_cpu, max_cpu, min_memory, avg_memory, max_memory)
// VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
// `),
// resourcesIOS: newBulk(db, `
// INSERT INTO resources_ios (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, datetime, url, duration, body_size, success, method, status)
// VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, nullIf(?, ''), ?)
// `),
sessions: newBulk(db, `
INSERT INTO sessions (session_id, project_id, tracker_version, rev_id, user_uuid, user_os, user_os_version, user_device, user_device_type, user_country, datetime, duration, pages_count, events_count, errors_count, user_browser, user_browser_version)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
@ -107,27 +71,6 @@ func NewConnector(url string) *Connector {
}
func (conn *Connector) Prepare() error {
// if err := conn.sessionsIOS.prepare(); err != nil {
// return err
// }
// // if err := conn.viewsIOS.prepare(); err != nil {
// // return err
// // }
// if err := conn.clicksIOS.prepare(); err != nil {
// return err
// }
// if err := conn.inputsIOS.prepare(); err != nil {
// return err
// }
// if err := conn.crashesIOS.prepare(); err != nil {
// return err
// }
// if err := conn.performanceIOS.prepare(); err != nil {
// return err
// }
// if err := conn.resourcesIOS.prepare(); err != nil {
// return err
// }
if err := conn.sessions.prepare(); err != nil {
return err
}
@ -159,27 +102,6 @@ func (conn *Connector) Prepare() error {
}
func (conn *Connector) Commit() error {
// if err := conn.sessionsIOS.commit(); err != nil {
// return err
// }
// // if err := conn.viewsIOS.commit(); err != nil {
// // return err
// // }
// if err := conn.clicksIOS.commit(); err != nil {
// return err
// }
// if err := conn.inputsIOS.commit(); err != nil {
// return err
// }
// if err := conn.crashesIOS.commit(); err != nil {
// return err
// }
// if err := conn.performanceIOS.commit(); err != nil {
// return err
// }
// if err := conn.resourcesIOS.commit(); err != nil {
// return err
// }
if err := conn.sessions.commit(); err != nil {
return err
}

View file

@ -2,46 +2,41 @@ package clickhouse
import (
"errors"
"openreplay/backend/pkg/hashid"
"openreplay/backend/pkg/url"
. "openreplay/backend/pkg/db/types"
. "openreplay/backend/pkg/messages"
)
// TODO: join sessions & sessions_ios clcikhouse tables
func (conn *Connector) InsertIOSSession(session *Session) error {
if (session.Duration == nil) {
if session.Duration == nil {
return errors.New("Clickhouse: trying to insert session with ")
}
return conn.sessionsIOS.exec(
session.SessionID,
session.ProjectID,
session.TrackerVersion,
nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
datetime(session.Timestamp),
uint32(*session.Duration),
session.PagesCount,
session.EventsCount,
session.ErrorsCount,
session.Metadata1,
session.Metadata2,
session.Metadata3,
session.Metadata4,
session.Metadata5,
session.Metadata6,
session.Metadata7,
session.Metadata8,
session.Metadata9,
session.Metadata10,
)
//return conn.sessionsIOS.exec(
// session.SessionID,
// session.ProjectID,
// session.TrackerVersion,
// nullableString(session.RevID),
// session.UserUUID,
// session.UserOS,
// nullableString(session.UserOSVersion),
// nullableString(session.UserDevice),
// session.UserDeviceType,
// session.UserCountry,
// datetime(session.Timestamp),
// uint32(*session.Duration),
// session.PagesCount,
// session.EventsCount,
// session.ErrorsCount,
// session.Metadata1,
// session.Metadata2,
// session.Metadata3,
// session.Metadata4,
// session.Metadata5,
// session.Metadata6,
// session.Metadata7,
// session.Metadata8,
// session.Metadata9,
// session.Metadata10,
//)
return nil
}
// func (conn *Connector) IOSScreenEnter(session *Session, msg *PageEvent) error {
@ -76,105 +71,110 @@ func (conn *Connector) InsertIOSClickEvent(session *Session, msg *IOSClickEvent)
if msg.Label == "" {
return nil
}
return conn.clicksIOS.exec(
session.SessionID,
session.ProjectID,
session.TrackerVersion,
nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
datetime(msg.Timestamp),
msg.Label,
)
//return conn.clicksIOS.exec(
// session.SessionID,
// session.ProjectID,
// session.TrackerVersion,
// nullableString(session.RevID),
// session.UserUUID,
// session.UserOS,
// nullableString(session.UserOSVersion),
// nullableString(session.UserDevice),
// session.UserDeviceType,
// session.UserCountry,
// datetime(msg.Timestamp),
// msg.Label,
//)
return nil
}
func (conn *Connector) InsertIOSInputEvent(session *Session, msg *IOSInputEvent) error {
if msg.Label == "" {
return nil
}
return conn.inputsIOS.exec(
session.SessionID,
session.ProjectID,
session.TrackerVersion,
nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
datetime(msg.Timestamp),
msg.Label,
)
//return conn.inputsIOS.exec(
// session.SessionID,
// session.ProjectID,
// session.TrackerVersion,
// nullableString(session.RevID),
// session.UserUUID,
// session.UserOS,
// nullableString(session.UserOSVersion),
// nullableString(session.UserDevice),
// session.UserDeviceType,
// session.UserCountry,
// datetime(msg.Timestamp),
// msg.Label,
//)
return nil
}
func (conn *Connector) InsertIOSCrash(session *Session, msg *IOSCrash) error {
return conn.crashesIOS.exec(
session.SessionID,
session.ProjectID,
session.TrackerVersion,
nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
datetime(msg.Timestamp),
msg.Name,
msg.Reason,
hashid.IOSCrashID(session.ProjectID, msg),
)
//return conn.crashesIOS.exec(
// session.SessionID,
// session.ProjectID,
// session.TrackerVersion,
// nullableString(session.RevID),
// session.UserUUID,
// session.UserOS,
// nullableString(session.UserOSVersion),
// nullableString(session.UserDevice),
// session.UserDeviceType,
// session.UserCountry,
// datetime(msg.Timestamp),
// msg.Name,
// msg.Reason,
// hashid.IOSCrashID(session.ProjectID, msg),
//)
return nil
}
func (conn *Connector) InsertIOSNetworkCall(session *Session, msg *IOSNetworkCall) error {
return conn.resourcesIOS.exec(
session.SessionID,
session.ProjectID,
session.TrackerVersion,
nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
datetime(msg.Timestamp),
url.DiscardURLQuery(msg.URL),
nullableUint16(uint16(msg.Duration)),
nullableUint32(uint32(len(msg.Body))),
msg.Success,
url.EnsureMethod(msg.Method), // nullableString causes error "unexpected type *string"
nullableUint16(uint16(msg.Status)),
)
//return conn.resourcesIOS.exec(
// session.SessionID,
// session.ProjectID,
// session.TrackerVersion,
// nullableString(session.RevID),
// session.UserUUID,
// session.UserOS,
// nullableString(session.UserOSVersion),
// nullableString(session.UserDevice),
// session.UserDeviceType,
// session.UserCountry,
// datetime(msg.Timestamp),
// url.DiscardURLQuery(msg.URL),
// nullableUint16(uint16(msg.Duration)),
// nullableUint32(uint32(len(msg.Body))),
// msg.Success,
// url.EnsureMethod(msg.Method), // nullableString causes error "unexpected type *string"
// nullableUint16(uint16(msg.Status)),
//)
return nil
}
func (conn *Connector) InsertIOSPerformanceAggregated(session *Session, msg *IOSPerformanceAggregated) error {
var timestamp uint64 = (msg.TimestampStart + msg.TimestampEnd) / 2
return conn.performanceIOS.exec(
session.SessionID,
session.ProjectID,
session.TrackerVersion,
nullableString(session.RevID),
session.UserUUID,
session.UserOS,
nullableString(session.UserOSVersion),
nullableString(session.UserDevice),
session.UserDeviceType,
session.UserCountry,
datetime(timestamp),
uint8(msg.MinFPS),
uint8(msg.AvgFPS),
uint8(msg.MaxFPS),
uint8(msg.MinCPU),
uint8(msg.AvgCPU),
uint8(msg.MaxCPU),
msg.MinMemory,
msg.AvgMemory,
msg.MaxMemory,
)
//var timestamp uint64 = (msg.TimestampStart + msg.TimestampEnd) / 2
//return conn.performanceIOS.exec(
// session.SessionID,
// session.ProjectID,
// session.TrackerVersion,
// nullableString(session.RevID),
// session.UserUUID,
// session.UserOS,
// nullableString(session.UserOSVersion),
// nullableString(session.UserDevice),
// session.UserDeviceType,
// session.UserCountry,
// datetime(timestamp),
// uint8(msg.MinFPS),
// uint8(msg.AvgFPS),
// uint8(msg.MaxFPS),
// uint8(msg.MinCPU),
// uint8(msg.AvgCPU),
// uint8(msg.MaxCPU),
// msg.MinMemory,
// msg.AvgMemory,
// msg.MaxMemory,
//)
return nil
}