feat(backend/db): implemented bulk inserts for db service (#619)

This commit is contained in:
Alexander 2022-07-19 15:02:15 +02:00 committed by GitHub
parent 78d7df72a5
commit 4dc82294e9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 588 additions and 413 deletions

View file

@ -84,13 +84,5 @@ func (c *PGCache) InsertIOSCrash(sessionID uint64, crash *IOSCrash) error {
}
func (c *PGCache) InsertIOSIssueEvent(sessionID uint64, issueEvent *IOSIssueEvent) error {
// session, err := c.GetSession(sessionID)
// if err != nil {
// return err
// }
// TODO: unite IssueEvent message for the all platforms
// if err := c.Conn.InsertIssueEvent(sessionID, session.ProjectID, issueEvent); err != nil {
// return err
// }
return nil
}

View file

@ -91,7 +91,7 @@ func (c *PGCache) InsertWebFetchEvent(sessionID uint64, e *FetchEvent) error {
if err != nil {
return err
}
return c.Conn.InsertWebFetchEvent(sessionID, project.SaveRequestPayloads, e)
return c.Conn.InsertWebFetchEvent(sessionID, session.ProjectID, project.SaveRequestPayloads, e)
}
func (c *PGCache) InsertWebGraphQLEvent(sessionID uint64, e *GraphQLEvent) error {
@ -103,5 +103,53 @@ func (c *PGCache) InsertWebGraphQLEvent(sessionID uint64, e *GraphQLEvent) error
if err != nil {
return err
}
return c.Conn.InsertWebGraphQLEvent(sessionID, project.SaveRequestPayloads, e)
return c.Conn.InsertWebGraphQLEvent(sessionID, session.ProjectID, project.SaveRequestPayloads, e)
}
func (c *PGCache) InsertWebCustomEvent(sessionID uint64, e *CustomEvent) error {
session, err := c.GetSession(sessionID)
if err != nil {
return err
}
return c.Conn.InsertWebCustomEvent(sessionID, session.ProjectID, e)
}
func (c *PGCache) InsertWebUserID(sessionID uint64, userID *UserID) error {
session, err := c.GetSession(sessionID)
if err != nil {
return err
}
return c.Conn.InsertWebUserID(sessionID, session.ProjectID, userID)
}
func (c *PGCache) InsertWebUserAnonymousID(sessionID uint64, userAnonymousID *UserAnonymousID) error {
session, err := c.GetSession(sessionID)
if err != nil {
return err
}
return c.Conn.InsertWebUserAnonymousID(sessionID, session.ProjectID, userAnonymousID)
}
func (c *PGCache) InsertWebPageEvent(sessionID uint64, e *PageEvent) error {
session, err := c.GetSession(sessionID)
if err != nil {
return err
}
return c.Conn.InsertWebPageEvent(sessionID, session.ProjectID, e)
}
func (c *PGCache) InsertWebClickEvent(sessionID uint64, e *ClickEvent) error {
session, err := c.GetSession(sessionID)
if err != nil {
return err
}
return c.Conn.InsertWebClickEvent(sessionID, session.ProjectID, e)
}
func (c *PGCache) InsertWebInputEvent(sessionID uint64, e *InputEvent) error {
session, err := c.GetSession(sessionID)
if err != nil {
return err
}
return c.Conn.InsertWebInputEvent(sessionID, session.ProjectID, e)
}

View file

@ -29,10 +29,9 @@ type PGCache struct {
// TODO: create conn automatically
func NewPGCache(pgConn *postgres.Conn, projectExpirationTimeoutMs int64) *PGCache {
return &PGCache{
Conn: pgConn,
sessions: make(map[uint64]*Session),
projects: make(map[uint32]*ProjectMeta),
//projectsByKeys: make(map[string]*ProjectMeta),
Conn: pgConn,
sessions: make(map[uint64]*Session),
projects: make(map[uint32]*ProjectMeta),
projectExpirationTimeout: time.Duration(1000 * projectExpirationTimeoutMs),
}
}

View file

@ -0,0 +1,93 @@
package postgres
import (
"bytes"
"errors"
"fmt"
)
const (
insertPrefix = `INSERT INTO `
insertValues = ` VALUES `
insertSuffix = ` ON CONFLICT DO NOTHING;`
)
type Bulk interface {
Append(args ...interface{}) error
Send() error
}
type bulkImpl struct {
conn Pool
table string
columns string
template string
setSize int
sizeLimit int
values []interface{}
}
func (b *bulkImpl) Append(args ...interface{}) error {
if len(args) != b.setSize {
return fmt.Errorf("wrong number of arguments, waited: %d, got: %d", b.setSize, len(args))
}
b.values = append(b.values, args...)
if len(b.values)/b.setSize >= b.sizeLimit {
return b.send()
}
return nil
}
func (b *bulkImpl) Send() error {
if len(b.values) == 0 {
return nil
}
return b.send()
}
func (b *bulkImpl) send() error {
request := bytes.NewBufferString(insertPrefix + b.table + b.columns + insertValues)
args := make([]interface{}, b.setSize)
for i := 0; i < len(b.values)/b.setSize; i++ {
for j := 0; j < b.setSize; j++ {
args[j] = i*b.setSize + j + 1
}
if i > 0 {
request.WriteByte(',')
}
request.WriteString(fmt.Sprintf(b.template, args...))
}
request.WriteString(insertSuffix)
err := b.conn.Exec(request.String(), b.values...)
b.values = make([]interface{}, 0, b.setSize*b.sizeLimit)
if err != nil {
return fmt.Errorf("send bulk err: %s", err)
}
return nil
}
func NewBulk(conn Pool, table, columns, template string, setSize, sizeLimit int) (Bulk, error) {
switch {
case conn == nil:
return nil, errors.New("db conn is empty")
case table == "":
return nil, errors.New("table is empty")
case columns == "":
return nil, errors.New("columns is empty")
case template == "":
return nil, errors.New("template is empty")
case setSize <= 0:
return nil, errors.New("set size is wrong")
case sizeLimit <= 0:
return nil, errors.New("size limit is wrong")
}
return &bulkImpl{
conn: conn,
table: table,
columns: columns,
template: template,
setSize: setSize,
sizeLimit: sizeLimit,
values: make([]interface{}, 0, setSize*sizeLimit),
}, nil
}

View file

@ -13,21 +13,24 @@ import (
"github.com/jackc/pgx/v4/pgxpool"
)
func getTimeoutContext() context.Context {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(time.Second*30))
return ctx
}
type batchItem struct {
query string
arguments []interface{}
}
// Conn contains batches, bulks and cache for all sessions
type Conn struct {
c *pgxpool.Pool // TODO: conditional usage of Pool/Conn (use interface?)
c Pool
batches map[uint64]*pgx.Batch
batchSizes map[uint64]int
rawBatches map[uint64][]*batchItem
autocompletes Bulk
requests Bulk
customEvents Bulk
webPageEvents Bulk
webInputEvents Bulk
webGraphQLEvents Bulk
sessionUpdates map[uint64]*sessionUpdates
batchQueueLimit int
batchSizeLimit int
batchSizeBytes syncfloat64.Histogram
@ -46,14 +49,19 @@ func NewConn(url string, queueLimit, sizeLimit int, metrics *monitoring.Metrics)
log.Fatalln("pgxpool.Connect Error")
}
conn := &Conn{
c: c,
batches: make(map[uint64]*pgx.Batch),
batchSizes: make(map[uint64]int),
rawBatches: make(map[uint64][]*batchItem),
sessionUpdates: make(map[uint64]*sessionUpdates),
batchQueueLimit: queueLimit,
batchSizeLimit: sizeLimit,
}
conn.initMetrics(metrics)
conn.c, err = NewPool(c, conn.sqlRequestTime, conn.sqlRequestCounter)
if err != nil {
log.Fatalf("can't create new pool wrapper: %s", err)
}
conn.initBulks()
return conn
}
@ -82,6 +90,70 @@ func (conn *Conn) initMetrics(metrics *monitoring.Metrics) {
}
}
func (conn *Conn) initBulks() {
var err error
conn.autocompletes, err = NewBulk(conn.c,
"autocomplete",
"(value, type, project_id)",
"($%d, $%d, $%d)",
3, 100)
if err != nil {
log.Fatalf("can't create autocomplete bulk")
}
conn.requests, err = NewBulk(conn.c,
"events_common.requests",
"(session_id, timestamp, seq_index, url, duration, success)",
"($%d, $%d, $%d, left($%d, 2700), $%d, $%d)",
6, 100)
if err != nil {
log.Fatalf("can't create requests bulk")
}
conn.customEvents, err = NewBulk(conn.c,
"events_common.customs",
"(session_id, timestamp, seq_index, name, payload)",
"($%d, $%d, $%d, left($%d, 2700), $%d)",
5, 100)
if err != nil {
log.Fatalf("can't create customEvents bulk")
}
conn.webPageEvents, err = NewBulk(conn.c,
"events.pages",
"(session_id, message_id, timestamp, referrer, base_referrer, host, path, query, dom_content_loaded_time, "+
"load_time, response_end, first_paint_time, first_contentful_paint_time, speed_index, visually_complete, "+
"time_to_interactive, response_time, dom_building_time)",
"($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0),"+
" NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0))",
18, 100)
if err != nil {
log.Fatalf("can't create webPageEvents bulk")
}
conn.webInputEvents, err = NewBulk(conn.c,
"events.inputs",
"(session_id, message_id, timestamp, value, label)",
"($%d, $%d, $%d, $%d, NULLIF($%d,''))",
5, 100)
if err != nil {
log.Fatalf("can't create webPageEvents bulk")
}
conn.webGraphQLEvents, err = NewBulk(conn.c,
"events.graphql",
"(session_id, timestamp, message_id, name, request_body, response_body)",
"($%d, $%d, $%d, left($%d, 2700), $%d, $%d)",
6, 100)
if err != nil {
log.Fatalf("can't create webPageEvents bulk")
}
}
func (conn *Conn) insertAutocompleteValue(sessionID uint64, projectID uint32, tp string, value string) {
if len(value) == 0 {
return
}
if err := conn.autocompletes.Append(tp, value, projectID); err != nil {
log.Printf("autocomplete bulk err: %s", err)
}
}
func (conn *Conn) batchQueue(sessionID uint64, sql string, args ...interface{}) {
batch, ok := conn.batches[sessionID]
if !ok {
@ -90,6 +162,10 @@ func (conn *Conn) batchQueue(sessionID uint64, sql string, args ...interface{})
batch = conn.batches[sessionID]
}
batch.Queue(sql, args...)
conn.rawBatch(sessionID, sql, args...)
}
func (conn *Conn) rawBatch(sessionID uint64, sql string, args ...interface{}) {
// Temp raw batch store
raw := conn.rawBatches[sessionID]
raw = append(raw, &batchItem{
@ -99,8 +175,45 @@ func (conn *Conn) batchQueue(sessionID uint64, sql string, args ...interface{})
conn.rawBatches[sessionID] = raw
}
func (conn *Conn) updateSessionEvents(sessionID uint64, events, pages int) {
if _, ok := conn.sessionUpdates[sessionID]; !ok {
conn.sessionUpdates[sessionID] = NewSessionUpdates(sessionID)
}
conn.sessionUpdates[sessionID].add(pages, events)
}
func (conn *Conn) sendBulks() {
if err := conn.autocompletes.Send(); err != nil {
log.Printf("autocomplete bulk send err: %s", err)
}
if err := conn.requests.Send(); err != nil {
log.Printf("requests bulk send err: %s", err)
}
if err := conn.customEvents.Send(); err != nil {
log.Printf("customEvents bulk send err: %s", err)
}
if err := conn.webPageEvents.Send(); err != nil {
log.Printf("webPageEvents bulk send err: %s", err)
}
if err := conn.webInputEvents.Send(); err != nil {
log.Printf("webInputEvents bulk send err: %s", err)
}
if err := conn.webGraphQLEvents.Send(); err != nil {
log.Printf("webGraphQLEvents bulk send err: %s", err)
}
}
func (conn *Conn) CommitBatches() {
conn.sendBulks()
for sessID, b := range conn.batches {
// Append session update sql request to the end of batch
if update, ok := conn.sessionUpdates[sessID]; ok {
sql, args := update.request()
if sql != "" {
conn.batchQueue(sessID, sql, args...)
b, _ = conn.batches[sessID]
}
}
// Record batch size in bytes and number of lines
conn.batchSizeBytes.Record(context.Background(), float64(conn.batchSizes[sessID]))
conn.batchSizeLines.Record(context.Background(), float64(b.Len()))
@ -109,7 +222,7 @@ func (conn *Conn) CommitBatches() {
isFailed := false
// Send batch to db and execute
br := conn.c.SendBatch(getTimeoutContext(), b)
br := conn.c.SendBatch(b)
l := b.Len()
for i := 0; i < l; i++ {
if ct, err := br.Exec(); err != nil {
@ -125,10 +238,25 @@ func (conn *Conn) CommitBatches() {
attribute.String("method", "batch"), attribute.Bool("failed", isFailed))
conn.sqlRequestCounter.Add(context.Background(), 1,
attribute.String("method", "batch"), attribute.Bool("failed", isFailed))
if !isFailed {
delete(conn.sessionUpdates, sessID)
}
}
conn.batches = make(map[uint64]*pgx.Batch)
conn.batchSizes = make(map[uint64]int)
conn.rawBatches = make(map[uint64][]*batchItem)
// Session updates
for sessID, su := range conn.sessionUpdates {
sql, args := su.request()
if sql == "" {
continue
}
if err := conn.c.Exec(sql, args...); err != nil {
log.Printf("failed session update, sessID: %d, err: %s", sessID, err)
}
}
conn.sessionUpdates = make(map[uint64]*sessionUpdates)
}
func (conn *Conn) updateBatchSize(sessionID uint64, reqSize int) {
@ -145,6 +273,14 @@ func (conn *Conn) commitBatch(sessionID uint64) {
log.Printf("can't find batch for session: %d", sessionID)
return
}
// Append session update sql request to the end of batch
if update, ok := conn.sessionUpdates[sessionID]; ok {
sql, args := update.request()
if sql != "" {
conn.batchQueue(sessionID, sql, args...)
b, _ = conn.batches[sessionID]
}
}
// Record batch size in bytes and number of lines
conn.batchSizeBytes.Record(context.Background(), float64(conn.batchSizes[sessionID]))
conn.batchSizeLines.Record(context.Background(), float64(b.Len()))
@ -153,7 +289,7 @@ func (conn *Conn) commitBatch(sessionID uint64) {
isFailed := false
// Send batch to db and execute
br := conn.c.SendBatch(getTimeoutContext(), b)
br := conn.c.SendBatch(b)
l := b.Len()
for i := 0; i < l; i++ {
if ct, err := br.Exec(); err != nil {
@ -175,117 +311,5 @@ func (conn *Conn) commitBatch(sessionID uint64) {
delete(conn.batches, sessionID)
delete(conn.batchSizes, sessionID)
delete(conn.rawBatches, sessionID)
}
func (conn *Conn) query(sql string, args ...interface{}) (pgx.Rows, error) {
start := time.Now()
res, err := conn.c.Query(getTimeoutContext(), sql, args...)
method, table := methodName(sql)
conn.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
attribute.String("method", method), attribute.String("table", table))
conn.sqlRequestCounter.Add(context.Background(), 1,
attribute.String("method", method), attribute.String("table", table))
return res, err
}
func (conn *Conn) queryRow(sql string, args ...interface{}) pgx.Row {
start := time.Now()
res := conn.c.QueryRow(getTimeoutContext(), sql, args...)
method, table := methodName(sql)
conn.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
attribute.String("method", method), attribute.String("table", table))
conn.sqlRequestCounter.Add(context.Background(), 1,
attribute.String("method", method), attribute.String("table", table))
return res
}
func (conn *Conn) exec(sql string, args ...interface{}) error {
start := time.Now()
_, err := conn.c.Exec(getTimeoutContext(), sql, args...)
method, table := methodName(sql)
conn.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
attribute.String("method", method), attribute.String("table", table))
conn.sqlRequestCounter.Add(context.Background(), 1,
attribute.String("method", method), attribute.String("table", table))
return err
}
type _Tx struct {
pgx.Tx
sqlRequestTime syncfloat64.Histogram
sqlRequestCounter syncfloat64.Counter
}
func (conn *Conn) begin() (_Tx, error) {
start := time.Now()
tx, err := conn.c.Begin(context.Background())
conn.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
attribute.String("method", "begin"))
conn.sqlRequestCounter.Add(context.Background(), 1,
attribute.String("method", "begin"))
return _Tx{tx, conn.sqlRequestTime, conn.sqlRequestCounter}, err
}
func (tx _Tx) exec(sql string, args ...interface{}) error {
start := time.Now()
_, err := tx.Exec(context.Background(), sql, args...)
method, table := methodName(sql)
tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
attribute.String("method", method), attribute.String("table", table))
tx.sqlRequestCounter.Add(context.Background(), 1,
attribute.String("method", method), attribute.String("table", table))
return err
}
func (tx _Tx) rollback() error {
start := time.Now()
err := tx.Rollback(context.Background())
tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
attribute.String("method", "rollback"))
tx.sqlRequestCounter.Add(context.Background(), 1,
attribute.String("method", "rollback"))
return err
}
func (tx _Tx) commit() error {
start := time.Now()
err := tx.Commit(context.Background())
tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
attribute.String("method", "commit"))
tx.sqlRequestCounter.Add(context.Background(), 1,
attribute.String("method", "commit"))
return err
}
func methodName(sql string) (string, string) {
cmd, table := "unknown", "unknown"
// Prepare sql request for parsing
sql = strings.TrimSpace(sql)
sql = strings.ReplaceAll(sql, "\n", " ")
sql = strings.ReplaceAll(sql, "\t", "")
sql = strings.ToLower(sql)
// Get sql command name
parts := strings.Split(sql, " ")
if parts[0] == "" {
return cmd, table
} else {
cmd = strings.TrimSpace(parts[0])
}
// Get table name
switch cmd {
case "select":
for i, p := range parts {
if strings.TrimSpace(p) == "from" {
table = strings.TrimSpace(parts[i+1])
}
}
case "update":
table = strings.TrimSpace(parts[1])
case "insert":
table = strings.TrimSpace(parts[2])
}
return cmd, table
delete(conn.sessionUpdates, sessionID)
}

View file

@ -15,7 +15,7 @@ type Integration struct {
}
func (conn *Conn) IterateIntegrationsOrdered(iter func(integration *Integration, err error)) error {
rows, err := conn.query(`
rows, err := conn.c.Query(`
SELECT project_id, provider, options, request_data
FROM integrations
`)
@ -40,7 +40,7 @@ func (conn *Conn) IterateIntegrationsOrdered(iter func(integration *Integration,
}
func (conn *Conn) UpdateIntegrationRequestData(i *Integration) error {
return conn.exec(`
return conn.c.Exec(`
UPDATE integrations
SET request_data = $1
WHERE project_id=$2 AND provider=$3`,

View file

@ -18,31 +18,8 @@ func getAutocompleteType(baseType string, platform string) string {
}
func (conn *Conn) insertAutocompleteValue(sessionID uint64, tp string, value string) {
if len(value) == 0 {
return
}
sqlRequest := `
INSERT INTO autocomplete (
value,
type,
project_id
) (SELECT
$1, $2, project_id
FROM sessions
WHERE session_id = $3
) ON CONFLICT DO NOTHING`
if err := conn.exec(sqlRequest, value, tp, sessionID); err != nil {
log.Printf("can't insert autocomplete: %s", err)
}
//conn.batchQueue(sessionID, sqlRequest, value, tp, sessionID)
// Record approximate message size
//conn.updateBatchSize(sessionID, len(sqlRequest)+len(value)+len(tp)+8)
}
func (conn *Conn) InsertSessionStart(sessionID uint64, s *types.Session) error {
return conn.exec(`
return conn.c.Exec(`
INSERT INTO sessions (
session_id, project_id, start_ts,
user_uuid, user_device, user_device_type, user_country,
@ -74,18 +51,18 @@ func (conn *Conn) InsertSessionStart(sessionID uint64, s *types.Session) error {
}
func (conn *Conn) HandleSessionStart(sessionID uint64, s *types.Session) error {
conn.insertAutocompleteValue(sessionID, getAutocompleteType("USEROS", s.Platform), s.UserOS)
conn.insertAutocompleteValue(sessionID, getAutocompleteType("USERDEVICE", s.Platform), s.UserDevice)
conn.insertAutocompleteValue(sessionID, getAutocompleteType("USERCOUNTRY", s.Platform), s.UserCountry)
conn.insertAutocompleteValue(sessionID, getAutocompleteType("REVID", s.Platform), s.RevID)
conn.insertAutocompleteValue(sessionID, s.ProjectID, getAutocompleteType("USEROS", s.Platform), s.UserOS)
conn.insertAutocompleteValue(sessionID, s.ProjectID, getAutocompleteType("USERDEVICE", s.Platform), s.UserDevice)
conn.insertAutocompleteValue(sessionID, s.ProjectID, getAutocompleteType("USERCOUNTRY", s.Platform), s.UserCountry)
conn.insertAutocompleteValue(sessionID, s.ProjectID, getAutocompleteType("REVID", s.Platform), s.RevID)
// s.Platform == "web"
conn.insertAutocompleteValue(sessionID, "USERBROWSER", s.UserBrowser)
conn.insertAutocompleteValue(sessionID, s.ProjectID, "USERBROWSER", s.UserBrowser)
return nil
}
func (conn *Conn) InsertSessionEnd(sessionID uint64, timestamp uint64) (uint64, error) {
var dur uint64
if err := conn.queryRow(`
if err := conn.c.QueryRow(`
UPDATE sessions SET duration=$2 - start_ts
WHERE session_id=$1
RETURNING duration
@ -119,30 +96,16 @@ func (conn *Conn) HandleSessionEnd(sessionID uint64) error {
}
func (conn *Conn) InsertRequest(sessionID uint64, timestamp uint64, index uint64, url string, duration uint64, success bool) error {
sqlRequest := `
INSERT INTO events_common.requests (
session_id, timestamp, seq_index, url, duration, success
) VALUES (
$1, $2, $3, left($4, 2700), $5, $6
)`
conn.batchQueue(sessionID, sqlRequest, sessionID, timestamp, getSqIdx(index), url, duration, success)
// Record approximate message size
conn.updateBatchSize(sessionID, len(sqlRequest)+len(url)+8*4)
if err := conn.requests.Append(sessionID, timestamp, getSqIdx(index), url, duration, success); err != nil {
return fmt.Errorf("insert request in bulk err: %s", err)
}
return nil
}
func (conn *Conn) InsertCustomEvent(sessionID uint64, timestamp uint64, index uint64, name string, payload string) error {
sqlRequest := `
INSERT INTO events_common.customs (
session_id, timestamp, seq_index, name, payload
) VALUES (
$1, $2, $3, left($4, 2700), $5
)`
conn.batchQueue(sessionID, sqlRequest, sessionID, timestamp, getSqIdx(index), name, payload)
// Record approximate message size
conn.updateBatchSize(sessionID, len(sqlRequest)+len(name)+len(payload)+8*3)
if err := conn.customEvents.Append(sessionID, timestamp, getSqIdx(index), name, payload); err != nil {
return fmt.Errorf("insert custom event in bulk err: %s", err)
}
return nil
}
@ -172,15 +135,21 @@ func (conn *Conn) InsertMetadata(sessionID uint64, keyNo uint, value string) err
sqlRequest := `
UPDATE sessions SET metadata_%v = $1
WHERE session_id = $2`
return conn.exec(fmt.Sprintf(sqlRequest, keyNo), value, sessionID)
return conn.c.Exec(fmt.Sprintf(sqlRequest, keyNo), value, sessionID)
}
func (conn *Conn) InsertIssueEvent(sessionID uint64, projectID uint32, e *messages.IssueEvent) error {
tx, err := conn.begin()
func (conn *Conn) InsertIssueEvent(sessionID uint64, projectID uint32, e *messages.IssueEvent) (err error) {
tx, err := conn.c.Begin()
if err != nil {
return err
}
defer tx.rollback()
defer func() {
if err != nil {
if rollbackErr := tx.rollback(); rollbackErr != nil {
log.Printf("rollback err: %s", rollbackErr)
}
}
}()
issueID := hashid.IssueID(projectID, e)
// TEMP. TODO: nullable & json message field type
@ -237,5 +206,6 @@ func (conn *Conn) InsertIssueEvent(sessionID uint64, projectID uint32, e *messag
return err
}
}
return tx.commit()
err = tx.commit()
return
}

View file

@ -9,7 +9,7 @@ import (
func (conn *Conn) InsertIOSCustomEvent(sessionID uint64, e *messages.IOSCustomEvent) error {
err := conn.InsertCustomEvent(sessionID, e.Timestamp, e.Index, e.Name, e.Payload)
if err == nil {
conn.insertAutocompleteValue(sessionID, "CUSTOM_IOS", e.Name)
conn.insertAutocompleteValue(sessionID, 0, "CUSTOM_IOS", e.Name)
}
return err
}
@ -17,7 +17,7 @@ func (conn *Conn) InsertIOSCustomEvent(sessionID uint64, e *messages.IOSCustomEv
func (conn *Conn) InsertIOSUserID(sessionID uint64, userID *messages.IOSUserID) error {
err := conn.InsertUserID(sessionID, userID.Value)
if err == nil {
conn.insertAutocompleteValue(sessionID, "USERID_IOS", userID.Value)
conn.insertAutocompleteValue(sessionID, 0, "USERID_IOS", userID.Value)
}
return err
}
@ -25,7 +25,7 @@ func (conn *Conn) InsertIOSUserID(sessionID uint64, userID *messages.IOSUserID)
func (conn *Conn) InsertIOSUserAnonymousID(sessionID uint64, userAnonymousID *messages.IOSUserAnonymousID) error {
err := conn.InsertUserAnonymousID(sessionID, userAnonymousID.Value)
if err == nil {
conn.insertAutocompleteValue(sessionID, "USERANONYMOUSID_IOS", userAnonymousID.Value)
conn.insertAutocompleteValue(sessionID, 0, "USERANONYMOUSID_IOS", userAnonymousID.Value)
}
return err
}
@ -33,13 +33,13 @@ func (conn *Conn) InsertIOSUserAnonymousID(sessionID uint64, userAnonymousID *me
func (conn *Conn) InsertIOSNetworkCall(sessionID uint64, e *messages.IOSNetworkCall) error {
err := conn.InsertRequest(sessionID, e.Timestamp, e.Index, e.URL, e.Duration, e.Success)
if err == nil {
conn.insertAutocompleteValue(sessionID, "REQUEST_IOS", url.DiscardURLQuery(e.URL))
conn.insertAutocompleteValue(sessionID, 0, "REQUEST_IOS", url.DiscardURLQuery(e.URL))
}
return err
}
func (conn *Conn) InsertIOSScreenEnter(sessionID uint64, screenEnter *messages.IOSScreenEnter) error {
tx, err := conn.begin()
tx, err := conn.c.Begin()
if err != nil {
return err
}
@ -65,12 +65,12 @@ func (conn *Conn) InsertIOSScreenEnter(sessionID uint64, screenEnter *messages.I
if err = tx.commit(); err != nil {
return err
}
conn.insertAutocompleteValue(sessionID, "VIEW_IOS", screenEnter.ViewName)
conn.insertAutocompleteValue(sessionID, 0, "VIEW_IOS", screenEnter.ViewName)
return nil
}
func (conn *Conn) InsertIOSClickEvent(sessionID uint64, clickEvent *messages.IOSClickEvent) error {
tx, err := conn.begin()
tx, err := conn.c.Begin()
if err != nil {
return err
}
@ -96,12 +96,12 @@ func (conn *Conn) InsertIOSClickEvent(sessionID uint64, clickEvent *messages.IOS
if err = tx.commit(); err != nil {
return err
}
conn.insertAutocompleteValue(sessionID, "CLICK_IOS", clickEvent.Label)
conn.insertAutocompleteValue(sessionID, 0, "CLICK_IOS", clickEvent.Label)
return nil
}
func (conn *Conn) InsertIOSInputEvent(sessionID uint64, inputEvent *messages.IOSInputEvent) error {
tx, err := conn.begin()
tx, err := conn.c.Begin()
if err != nil {
return err
}
@ -132,13 +132,13 @@ func (conn *Conn) InsertIOSInputEvent(sessionID uint64, inputEvent *messages.IOS
if err = tx.commit(); err != nil {
return err
}
conn.insertAutocompleteValue(sessionID, "INPUT_IOS", inputEvent.Label)
conn.insertAutocompleteValue(sessionID, 0, "INPUT_IOS", inputEvent.Label)
// conn.insertAutocompleteValue(sessionID, "INPUT_VALUE", inputEvent.Label)
return nil
}
func (conn *Conn) InsertIOSCrash(sessionID uint64, projectID uint32, crash *messages.IOSCrash) error {
tx, err := conn.begin()
tx, err := conn.c.Begin()
if err != nil {
return err
}

View file

@ -1,6 +1,7 @@
package postgres
import (
"log"
"math"
"openreplay/backend/pkg/hashid"
@ -13,104 +14,54 @@ func getSqIdx(messageID uint64) uint {
return uint(messageID % math.MaxInt32)
}
func (conn *Conn) InsertWebCustomEvent(sessionID uint64, e *CustomEvent) error {
func (conn *Conn) InsertWebCustomEvent(sessionID uint64, projectID uint32, e *CustomEvent) error {
err := conn.InsertCustomEvent(sessionID, e.Timestamp,
e.MessageID,
e.Name, e.Payload)
if err == nil {
conn.insertAutocompleteValue(sessionID, "CUSTOM", e.Name)
conn.insertAutocompleteValue(sessionID, projectID, "CUSTOM", e.Name)
}
return err
}
func (conn *Conn) InsertWebUserID(sessionID uint64, userID *UserID) error {
func (conn *Conn) InsertWebUserID(sessionID uint64, projectID uint32, userID *UserID) error {
err := conn.InsertUserID(sessionID, userID.ID)
if err == nil {
conn.insertAutocompleteValue(sessionID, "USERID", userID.ID)
conn.insertAutocompleteValue(sessionID, projectID, "USERID", userID.ID)
}
return err
}
func (conn *Conn) InsertWebUserAnonymousID(sessionID uint64, userAnonymousID *UserAnonymousID) error {
func (conn *Conn) InsertWebUserAnonymousID(sessionID uint64, projectID uint32, userAnonymousID *UserAnonymousID) error {
err := conn.InsertUserAnonymousID(sessionID, userAnonymousID.ID)
if err == nil {
conn.insertAutocompleteValue(sessionID, "USERANONYMOUSID", userAnonymousID.ID)
conn.insertAutocompleteValue(sessionID, projectID, "USERANONYMOUSID", userAnonymousID.ID)
}
return err
}
// func (conn *Conn) InsertWebResourceEvent(sessionID uint64, e *ResourceEvent) error {
// if e.Type != "fetch" {
// return nil
// }
// err := conn.InsertRequest(sessionID, e.Timestamp,
// e.MessageID,
// e.URL, e.Duration, e.Success,
// )
// if err == nil {
// conn.insertAutocompleteValue(sessionID, "REQUEST", url.DiscardURLQuery(e.URL))
// }
// return err
// }
// TODO: fix column "dom_content_loaded_event_end" of relation "pages"
func (conn *Conn) InsertWebPageEvent(sessionID uint64, e *PageEvent) error {
func (conn *Conn) InsertWebPageEvent(sessionID uint64, projectID uint32, e *PageEvent) error {
host, path, query, err := url.GetURLParts(e.URL)
if err != nil {
return err
}
tx, err := conn.begin()
if err != nil {
return err
// base_path is deprecated
if err = conn.webPageEvents.Append(sessionID, e.MessageID, e.Timestamp, e.Referrer, url.DiscardURLQuery(e.Referrer),
host, path, query, e.DomContentLoadedEventEnd, e.LoadEventEnd, e.ResponseEnd, e.FirstPaint, e.FirstContentfulPaint,
e.SpeedIndex, e.VisuallyComplete, e.TimeToInteractive, calcResponseTime(e), calcDomBuildingTime(e)); err != nil {
log.Printf("insert web page event in bulk err: %s", err)
}
defer tx.rollback()
// base_path is depricated
if err := tx.exec(`
INSERT INTO events.pages (
session_id, message_id, timestamp, referrer, base_referrer, host, path, query,
dom_content_loaded_time, load_time, response_end, first_paint_time, first_contentful_paint_time,
speed_index, visually_complete, time_to_interactive,
response_time, dom_building_time
) VALUES (
$1, $2, $3,
$4, $5,
$6, $7, $8,
NULLIF($9, 0), NULLIF($10, 0), NULLIF($11, 0), NULLIF($12, 0), NULLIF($13, 0),
NULLIF($14, 0), NULLIF($15, 0), NULLIF($16, 0),
NULLIF($17, 0), NULLIF($18, 0)
)
`,
sessionID, e.MessageID, e.Timestamp,
e.Referrer, url.DiscardURLQuery(e.Referrer),
host, path, query,
e.DomContentLoadedEventEnd, e.LoadEventEnd, e.ResponseEnd, e.FirstPaint, e.FirstContentfulPaint,
e.SpeedIndex, e.VisuallyComplete, e.TimeToInteractive,
calcResponseTime(e), calcDomBuildingTime(e),
); err != nil {
return err
}
if err = tx.exec(`
UPDATE sessions SET (pages_count, events_count) = (pages_count + 1, events_count + 1)
WHERE session_id = $1`,
sessionID,
); err != nil {
return err
}
if err = tx.commit(); err != nil {
return err
}
conn.insertAutocompleteValue(sessionID, "LOCATION", url.DiscardURLQuery(path))
conn.insertAutocompleteValue(sessionID, "REFERRER", url.DiscardURLQuery(e.Referrer))
// Accumulate session updates and exec inside batch with another sql commands
conn.updateSessionEvents(sessionID, 1, 1)
// Add new value set to autocomplete bulk
conn.insertAutocompleteValue(sessionID, projectID, "LOCATION", url.DiscardURLQuery(path))
conn.insertAutocompleteValue(sessionID, projectID, "REFERRER", url.DiscardURLQuery(e.Referrer))
return nil
}
func (conn *Conn) InsertWebClickEvent(sessionID uint64, e *ClickEvent) error {
tx, err := conn.begin()
if err != nil {
return err
}
defer tx.rollback()
if err = tx.exec(`
func (conn *Conn) InsertWebClickEvent(sessionID uint64, projectID uint32, e *ClickEvent) error {
sqlRequest := `
INSERT INTO events.clicks
(session_id, message_id, timestamp, label, selector, url)
(SELECT
@ -118,65 +69,40 @@ func (conn *Conn) InsertWebClickEvent(sessionID uint64, e *ClickEvent) error {
FROM events.pages
WHERE session_id = $1 AND timestamp <= $3 ORDER BY timestamp DESC LIMIT 1
)
`,
sessionID, e.MessageID, e.Timestamp, e.Label, e.Selector,
); err != nil {
return err
}
if err = tx.exec(`
UPDATE sessions SET events_count = events_count + 1
WHERE session_id = $1`,
sessionID,
); err != nil {
return err
}
if err = tx.commit(); err != nil {
return err
}
conn.insertAutocompleteValue(sessionID, "CLICK", e.Label)
`
conn.batchQueue(sessionID, sqlRequest, sessionID, e.MessageID, e.Timestamp, e.Label, e.Selector)
// Accumulate session updates and exec inside batch with another sql commands
conn.updateSessionEvents(sessionID, 1, 0)
// Add new value set to autocomplete bulk
conn.insertAutocompleteValue(sessionID, projectID, "CLICK", e.Label)
return nil
}
func (conn *Conn) InsertWebInputEvent(sessionID uint64, e *InputEvent) error {
tx, err := conn.begin()
if err != nil {
return err
}
defer tx.rollback()
func (conn *Conn) InsertWebInputEvent(sessionID uint64, projectID uint32, e *InputEvent) error {
value := &e.Value
if e.ValueMasked {
value = nil
}
if err = tx.exec(`
INSERT INTO events.inputs
(session_id, message_id, timestamp, value, label)
VALUES
($1, $2, $3, $4, NULLIF($5,''))
`,
sessionID, e.MessageID, e.Timestamp, value, e.Label,
); err != nil {
return err
if err := conn.webInputEvents.Append(sessionID, e.MessageID, e.Timestamp, value, e.Label); err != nil {
log.Printf("insert web input event err: %s", err)
}
if err = tx.exec(`
UPDATE sessions SET events_count = events_count + 1
WHERE session_id = $1`,
sessionID,
); err != nil {
return err
}
if err = tx.commit(); err != nil {
return err
}
conn.insertAutocompleteValue(sessionID, "INPUT", e.Label)
conn.updateSessionEvents(sessionID, 1, 0)
conn.insertAutocompleteValue(sessionID, projectID, "INPUT", e.Label)
return nil
}
func (conn *Conn) InsertWebErrorEvent(sessionID uint64, projectID uint32, e *ErrorEvent) error {
tx, err := conn.begin()
func (conn *Conn) InsertWebErrorEvent(sessionID uint64, projectID uint32, e *ErrorEvent) (err error) {
tx, err := conn.c.Begin()
if err != nil {
return err
}
defer tx.rollback()
defer func() {
if err != nil {
if rollbackErr := tx.rollback(); rollbackErr != nil {
log.Printf("rollback err: %s", rollbackErr)
}
}
}()
errorID := hashid.WebErrorID(projectID, e)
if err = tx.exec(`
@ -206,17 +132,18 @@ func (conn *Conn) InsertWebErrorEvent(sessionID uint64, projectID uint32, e *Err
); err != nil {
return err
}
return tx.commit()
err = tx.commit()
return
}
func (conn *Conn) InsertWebFetchEvent(sessionID uint64, savePayload bool, e *FetchEvent) error {
func (conn *Conn) InsertWebFetchEvent(sessionID uint64, projectID uint32, savePayload bool, e *FetchEvent) error {
var request, response *string
if savePayload {
request = &e.Request
response = &e.Response
}
host, path, query, err := url.GetURLParts(e.URL)
conn.insertAutocompleteValue(sessionID, "REQUEST", path)
conn.insertAutocompleteValue(sessionID, projectID, "REQUEST", path)
if err != nil {
return err
}
@ -246,29 +173,15 @@ func (conn *Conn) InsertWebFetchEvent(sessionID uint64, savePayload bool, e *Fet
return nil
}
func (conn *Conn) InsertWebGraphQLEvent(sessionID uint64, savePayload bool, e *GraphQLEvent) error {
func (conn *Conn) InsertWebGraphQLEvent(sessionID uint64, projectID uint32, savePayload bool, e *GraphQLEvent) error {
var request, response *string
if savePayload {
request = &e.Variables
response = &e.Response
}
conn.insertAutocompleteValue(sessionID, "GRAPHQL", e.OperationName)
sqlRequest := `
INSERT INTO events.graphql (
session_id, timestamp, message_id,
name,
request_body, response_body
) VALUES (
$1, $2, $3,
left($4, 2700),
$5, $6
) ON CONFLICT DO NOTHING`
conn.batchQueue(sessionID, sqlRequest, sessionID, e.Timestamp, e.MessageID,
e.OperationName, request, response,
)
// Record approximate message size
conn.updateBatchSize(sessionID, len(sqlRequest)+len(e.OperationName)+len(e.Variables)+len(e.Response)+8*3)
if err := conn.webGraphQLEvents.Append(sessionID, e.Timestamp, e.MessageID, e.OperationName, request, response); err != nil {
log.Printf("insert web graphQL event err: %s", err)
}
conn.insertAutocompleteValue(sessionID, projectID, "GRAPHQL", e.OperationName)
return nil
}

View file

@ -0,0 +1,175 @@
package postgres
import (
"context"
"errors"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"strings"
"time"
)
// Pool is a pgx.Pool wrapper with metrics integration
type Pool interface {
Query(sql string, args ...interface{}) (pgx.Rows, error)
QueryRow(sql string, args ...interface{}) pgx.Row
Exec(sql string, arguments ...interface{}) error
SendBatch(b *pgx.Batch) pgx.BatchResults
Begin() (*_Tx, error)
Close()
}
type poolImpl struct {
conn *pgxpool.Pool
sqlRequestTime syncfloat64.Histogram
sqlRequestCounter syncfloat64.Counter
}
func (p *poolImpl) Query(sql string, args ...interface{}) (pgx.Rows, error) {
start := time.Now()
res, err := p.conn.Query(getTimeoutContext(), sql, args...)
method, table := methodName(sql)
p.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
attribute.String("method", method), attribute.String("table", table))
p.sqlRequestCounter.Add(context.Background(), 1,
attribute.String("method", method), attribute.String("table", table))
return res, err
}
func (p *poolImpl) QueryRow(sql string, args ...interface{}) pgx.Row {
start := time.Now()
res := p.conn.QueryRow(getTimeoutContext(), sql, args...)
method, table := methodName(sql)
p.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
attribute.String("method", method), attribute.String("table", table))
p.sqlRequestCounter.Add(context.Background(), 1,
attribute.String("method", method), attribute.String("table", table))
return res
}
func (p *poolImpl) Exec(sql string, arguments ...interface{}) error {
start := time.Now()
_, err := p.conn.Exec(getTimeoutContext(), sql, arguments...)
method, table := methodName(sql)
p.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
attribute.String("method", method), attribute.String("table", table))
p.sqlRequestCounter.Add(context.Background(), 1,
attribute.String("method", method), attribute.String("table", table))
return err
}
func (p *poolImpl) SendBatch(b *pgx.Batch) pgx.BatchResults {
start := time.Now()
res := p.conn.SendBatch(getTimeoutContext(), b)
p.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
attribute.String("method", "sendBatch"))
p.sqlRequestCounter.Add(context.Background(), 1,
attribute.String("method", "sendBatch"))
return res
}
func (p *poolImpl) Begin() (*_Tx, error) {
start := time.Now()
tx, err := p.conn.Begin(context.Background())
p.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
attribute.String("method", "begin"))
p.sqlRequestCounter.Add(context.Background(), 1,
attribute.String("method", "begin"))
return &_Tx{tx, p.sqlRequestTime, p.sqlRequestCounter}, err
}
func (p *poolImpl) Close() {
p.conn.Close()
}
func NewPool(conn *pgxpool.Pool, sqlRequestTime syncfloat64.Histogram, sqlRequestCounter syncfloat64.Counter) (Pool, error) {
if conn == nil {
return nil, errors.New("conn is empty")
}
return &poolImpl{
conn: conn,
sqlRequestTime: sqlRequestTime,
sqlRequestCounter: sqlRequestCounter,
}, nil
}
// TX - start
type _Tx struct {
pgx.Tx
sqlRequestTime syncfloat64.Histogram
sqlRequestCounter syncfloat64.Counter
}
func (tx *_Tx) exec(sql string, args ...interface{}) error {
start := time.Now()
_, err := tx.Exec(context.Background(), sql, args...)
method, table := methodName(sql)
tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
attribute.String("method", method), attribute.String("table", table))
tx.sqlRequestCounter.Add(context.Background(), 1,
attribute.String("method", method), attribute.String("table", table))
return err
}
func (tx *_Tx) rollback() error {
start := time.Now()
err := tx.Rollback(context.Background())
tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
attribute.String("method", "rollback"))
tx.sqlRequestCounter.Add(context.Background(), 1,
attribute.String("method", "rollback"))
return err
}
func (tx *_Tx) commit() error {
start := time.Now()
err := tx.Commit(context.Background())
tx.sqlRequestTime.Record(context.Background(), float64(time.Now().Sub(start).Milliseconds()),
attribute.String("method", "commit"))
tx.sqlRequestCounter.Add(context.Background(), 1,
attribute.String("method", "commit"))
return err
}
// TX - end
func getTimeoutContext() context.Context {
ctx, _ := context.WithTimeout(context.Background(), time.Second*30)
return ctx
}
func methodName(sql string) (string, string) {
cmd, table := "unknown", "unknown"
// Prepare sql request for parsing
sql = strings.TrimSpace(sql)
sql = strings.ReplaceAll(sql, "\n", " ")
sql = strings.ReplaceAll(sql, "\t", "")
sql = strings.ToLower(sql)
// Get sql command name
parts := strings.Split(sql, " ")
if parts[0] == "" {
return cmd, table
} else {
cmd = strings.TrimSpace(parts[0])
}
// Get table name
switch cmd {
case "select":
for i, p := range parts {
if strings.TrimSpace(p) == "from" {
table = strings.TrimSpace(parts[i+1])
}
}
case "update":
table = strings.TrimSpace(parts[1])
case "insert":
table = strings.TrimSpace(parts[2])
}
return cmd, table
}

View file

@ -6,7 +6,7 @@ import (
func (conn *Conn) GetProjectByKey(projectKey string) (*Project, error) {
p := &Project{ProjectKey: projectKey}
if err := conn.queryRow(`
if err := conn.c.QueryRow(`
SELECT max_session_duration, sample_rate, project_id
FROM projects
WHERE project_key=$1 AND active = true
@ -21,7 +21,7 @@ func (conn *Conn) GetProjectByKey(projectKey string) (*Project, error) {
// TODO: logical separation of metadata
func (conn *Conn) GetProject(projectID uint32) (*Project, error) {
p := &Project{ProjectID: projectID}
if err := conn.queryRow(`
if err := conn.c.QueryRow(`
SELECT project_key, max_session_duration, save_request_payloads,
metadata_1, metadata_2, metadata_3, metadata_4, metadata_5,
metadata_6, metadata_7, metadata_8, metadata_9, metadata_10

View file

@ -0,0 +1,30 @@
package postgres
// Mechanism of combination several session updates into one
const sessionUpdateReq = `UPDATE sessions SET (pages_count, events_count) = (pages_count + $1, events_count + $2) WHERE session_id = $3`
type sessionUpdates struct {
sessionID uint64
pages int
events int
}
func NewSessionUpdates(sessionID uint64) *sessionUpdates {
return &sessionUpdates{
sessionID: sessionID,
pages: 0,
events: 0,
}
}
func (su *sessionUpdates) add(pages, events int) {
su.pages += pages
su.events += events
}
func (su *sessionUpdates) request() (string, []interface{}) {
if su.pages == 0 && su.events == 0 {
return "", nil
}
return sessionUpdateReq, []interface{}{su.pages, su.events, su.sessionID}
}

View file

@ -1,14 +1,11 @@
package postgres
//import . "openreplay/backend/pkg/messages"
import . "openreplay/backend/pkg/db/types"
//import "log"
func (conn *Conn) GetSession(sessionID uint64) (*Session, error) {
s := &Session{SessionID: sessionID}
var revID, userOSVersion *string
if err := conn.queryRow(`
if err := conn.c.QueryRow(`
SELECT platform,
duration, project_id, start_ts,
user_uuid, user_os, user_os_version,
@ -39,69 +36,3 @@ func (conn *Conn) GetSession(sessionID uint64) (*Session, error) {
}
return s, nil
}
// func (conn *Conn) GetSessionClickEvents(sessionID uint64) (list []IOSClickEvent, err error) {
// rows, err := conn.query(`
// SELECT
// timestamp, seq_index, label
// FROM events_ios.clicks
// WHERE session_id=$1
// `, sessionID)
// if err != nil {
// return err
// }
// defer rows.Close()
// for rows.Next() {
// e := new(IOSClickEvent)
// if err = rows.Scan(&e.Timestamp, &e.Index, &e.Label); err != nil {
// log.Printf("Error while scanning click events: %v", err)
// } else {
// list = append(list, e)
// }
// }
// return list
// }
// func (conn *Conn) GetSessionInputEvents(sessionID uint64) (list []IOSInputEvent, err error) {
// rows, err := conn.query(`
// SELECT
// timestamp, seq_index, label, value
// FROM events_ios.inputs
// WHERE session_id=$1
// `, sessionID)
// if err != nil {
// return err
// }
// defer rows.Close()
// for rows.Next() {
// e := new(IOSInputEvent)
// if err = rows.Scan(&e.Timestamp, &e.Index, &e.Label, &e.Value); err != nil {
// log.Printf("Error while scanning click events: %v", err)
// } else {
// list = append(list, e)
// }
// }
// return list
// }
// func (conn *Conn) GetSessionCrashEvents(sessionID uint64) (list []IOSCrash, err error) {
// rows, err := conn.query(`
// SELECT
// timestamp, seq_index
// FROM events_ios.crashes
// WHERE session_id=$1
// `, sessionID)
// if err != nil {
// return err
// }
// defer rows.Close()
// for rows.Next() {
// e := new(IOSCrash)
// if err = rows.Scan(&e.Timestamp, &e.Index, &e.Label, &e.Value); err != nil {
// log.Printf("Error while scanning click events: %v", err)
// } else {
// list = append(list, e)
// }
// }
// return list
// }

View file

@ -16,7 +16,7 @@ type UnstartedSession struct {
}
func (conn *Conn) InsertUnstartedSession(s UnstartedSession) error {
return conn.exec(`
return conn.c.Exec(`
INSERT INTO unstarted_sessions (
project_id,
tracker_version, do_not_track,