openreplay/backend/pkg/db/postgres/bulk.go
Alexander 4b8f3bee25
Sessions refactoring (#1371)
* feat(backend): moved sql requests related to sessions table to one place

* feat(backend): refactoring in db.Saver handler

* feat(backend): hude refactoring in db/postgres module

* fix(backend): workable feature flags

* fix(backend): workable integrations

* fix(backend): workable sessions and projects modules

* fix(backend): added missed projects module to sessions

* feat(backend): renaming

* feat(backend): moved session struct to sessions module and split methods into interface, cache and storage levels

* feat(backend): moved project struct to projects module

* feat(backend): added projects model

* feat(backend): implemented new in memory cache for sessions and projects

* feat(backend): implemented new cache in projects

* feat(backend): there are 2 methods in cache module now: Get() and GetAndRefresh()

* feat(backend): added cache update operations

* fix(backend): fixed import cycle

* fix(backend): fixed panic in db message handler

* fix(backend): fixed panic in projects module

* fix(backend): fixed panic in sessions.GetDuration

* feat(backend): added direct call to get session duration if session is already in cache

* feat(backend): used pg pool everywhere except db service

* fix(backend): added missing part after rebase

* fix(backend): removed old sessions file

* feat(backend): added refactored redis client with produce/consume options

* feat(backend): added cache layer for projects

* fix(backend): added missing redis config

* fix(backend): added missing method for producer

* feat(backend): cache integration for sessions

* feat(backend): temporary method to get session directly from db

* feat(backend): adapt EE version of message handler

* fix(backend): fixed issue in fts realisation

* fix(backend): added redis cache to sessions module

* fix(backend): set 0 duration or hesitation time for inputs without focus event

* feat(backend): added cache for session updates and failover mechanism for batch.Insert() operation

* feat(backend): debug log

* feat(backend): more debug log

* feat(backend): removed debug log

* fix(backend): fixed an issue of tracking input events with empty label

* fix(backend): disabled debug log in projects cache

* fix(backend): renamed session updater

* fix(backend): fixed closed pool issue in DB service

* fix(backend): fixed dead lock in db Stop() method

* fix(backend): fixed panic in heuristics service

* feat(backend): enabled redis cache in projects

* feat(backend): clear cache on each update operation

* feat(backend): fully integrated cache layer with auto switch

* feat(backend): small refactoring in session updates

* fix(backend): fixed wrong events counter issue

* feat(backend): enabled full cache support in ender and http services

* fix(backend/ee): added missed import

* feat(backend): added second cache layer for db to speed up the service

* feat(backend): disable redis cache

* feat(backend): moved redis cache to ee
2023-07-06 10:55:43 +02:00

107 lines
2.5 KiB
Go

package postgres
import (
"bytes"
"errors"
"fmt"
"openreplay/backend/pkg/db/postgres/pool"
"time"
"openreplay/backend/pkg/metrics/database"
)
const (
insertPrefix = `INSERT INTO `
insertValues = ` VALUES `
insertSuffix = ` ON CONFLICT DO NOTHING;`
)
type Bulk interface {
Append(args ...interface{}) error
Send() error
Table() string
}
type bulkImpl struct {
conn pool.Pool
table string
columns string
template string
setSize int
sizeLimit int
values []interface{}
}
func (b *bulkImpl) Append(args ...interface{}) error {
if len(args) != b.setSize {
return fmt.Errorf("wrong number of arguments, waited: %d, got: %d", b.setSize, len(args))
}
b.values = append(b.values, args...)
if len(b.values)/b.setSize >= b.sizeLimit {
return b.send()
}
return nil
}
func (b *bulkImpl) Send() error {
if len(b.values) == 0 {
return nil
}
return b.send()
}
func (b *bulkImpl) Table() string {
return b.table
}
func (b *bulkImpl) send() error {
start := time.Now()
size := len(b.values) / b.setSize
request := bytes.NewBufferString(insertPrefix + b.table + b.columns + insertValues)
args := make([]interface{}, b.setSize)
for i := 0; i < len(b.values)/b.setSize; i++ {
for j := 0; j < b.setSize; j++ {
args[j] = i*b.setSize + j + 1
}
if i > 0 {
request.WriteByte(',')
}
request.WriteString(fmt.Sprintf(b.template, args...))
}
request.WriteString(insertSuffix)
err := b.conn.Exec(request.String(), b.values...)
b.values = make([]interface{}, 0, b.setSize*b.sizeLimit)
if err != nil {
return fmt.Errorf("send bulk err: %s", err)
}
// Save bulk metrics
database.RecordBulkElements(float64(size), "pg", b.table)
database.RecordBulkInsertDuration(float64(time.Now().Sub(start).Milliseconds()), "pg", b.table)
return nil
}
func NewBulk(conn pool.Pool, table, columns, template string, setSize, sizeLimit int) (Bulk, error) {
switch {
case conn == nil:
return nil, errors.New("db conn is empty")
case table == "":
return nil, errors.New("table is empty")
case columns == "":
return nil, errors.New("columns is empty")
case template == "":
return nil, errors.New("template is empty")
case setSize <= 0:
return nil, errors.New("set size is wrong")
case sizeLimit <= 0:
return nil, errors.New("size limit is wrong")
}
return &bulkImpl{
conn: conn,
table: table,
columns: columns,
template: template,
setSize: setSize,
sizeLimit: sizeLimit,
values: make([]interface{}, 0, setSize*sizeLimit),
}, nil
}