openreplay/backend/internal/integrations/integration/client.go
Alexander 4b8f3bee25
Sessions refactoring (#1371)
* feat(backend): moved sql requests related to sessions table to one place

* feat(backend): refactoring in db.Saver handler

* feat(backend): hude refactoring in db/postgres module

* fix(backend): workable feature flags

* fix(backend): workable integrations

* fix(backend): workable sessions and projects modules

* fix(backend): added missed projects module to sessions

* feat(backend): renaming

* feat(backend): moved session struct to sessions module and split methods into interface, cache and storage levels

* feat(backend): moved project struct to projects module

* feat(backend): added projects model

* feat(backend): implemented new in memory cache for sessions and projects

* feat(backend): implemented new cache in projects

* feat(backend): there are 2 methods in cache module now: Get() and GetAndRefresh()

* feat(backend): added cache update operations

* fix(backend): fixed import cycle

* fix(backend): fixed panic in db message handler

* fix(backend): fixed panic in projects module

* fix(backend): fixed panic in sessions.GetDuration

* feat(backend): added direct call to get session duration if session is already in cache

* feat(backend): used pg pool everywhere except db service

* fix(backend): added missing part after rebase

* fix(backend): removed old sessions file

* feat(backend): added refactored redis client with produce/consume options

* feat(backend): added cache layer for projects

* fix(backend): added missing redis config

* fix(backend): added missing method for producer

* feat(backend): cache integration for sessions

* feat(backend): temporary method to get session directly from db

* feat(backend): adapt EE version of message handler

* fix(backend): fixed issue in fts realisation

* fix(backend): added redis cache to sessions module

* fix(backend): set 0 duration or hesitation time for inputs without focus event

* feat(backend): added cache for session updates and failover mechanism for batch.Insert() operation

* feat(backend): debug log

* feat(backend): more debug log

* feat(backend): removed debug log

* fix(backend): fixed an issue of tracking input events with empty label

* fix(backend): disabled debug log in projects cache

* fix(backend): renamed session updater

* fix(backend): fixed closed pool issue in DB service

* fix(backend): fixed dead lock in db Stop() method

* fix(backend): fixed panic in heuristics service

* feat(backend): enabled redis cache in projects

* feat(backend): clear cache on each update operation

* feat(backend): fully integrated cache layer with auto switch

* feat(backend): small refactoring in session updates

* fix(backend): fixed wrong events counter issue

* feat(backend): enabled full cache support in ender and http services

* fix(backend/ee): added missed import

* feat(backend): added second cache layer for db to speed up the service

* feat(backend): disable redis cache

* feat(backend): moved redis cache to ee
2023-07-06 10:55:43 +02:00

153 lines
3.6 KiB
Go

package integration
import (
"encoding/json"
"fmt"
"log"
"openreplay/backend/pkg/integrations"
"sync"
"time"
"openreplay/backend/pkg/messages"
)
const MAX_ATTEMPTS_IN_A_ROW = 4
const MAX_ATTEMPTS = 40
const ATTEMPTS_INTERVAL = 3 * 60 * 60 * 1000
type requester interface {
Request(*client) error
}
type requestData struct {
LastMessageTimestamp uint64 // `json:"lastMessageTimestamp, string"`
LastMessageId string
UnsuccessfullAttemptsCount int
LastAttemptTimestamp int64
}
type client struct {
requestData
requester
integration *integrations.Integration
// TODO: timeout ?
mux sync.Mutex
updateChan chan<- integrations.Integration
evChan chan<- *SessionErrorEvent
errChan chan<- error
}
type SessionErrorEvent struct {
SessionID uint64
Token string
*messages.IntegrationEvent
}
type ClientMap map[string]*client
func NewClient(i *integrations.Integration, updateChan chan<- integrations.Integration, evChan chan<- *SessionErrorEvent, errChan chan<- error) (*client, error) {
c := new(client)
if err := c.Update(i); err != nil {
return nil, err
}
if err := json.Unmarshal(i.RequestData, &c.requestData); err != nil {
return nil, err
}
c.evChan = evChan
c.errChan = errChan
c.updateChan = updateChan
// TODO: RequestData manager
if c.requestData.LastMessageTimestamp == 0 {
// ?
c.requestData.LastMessageTimestamp = uint64(time.Now().Add(-time.Hour * 24).UnixMilli())
}
return c, nil
}
// from outside
func (c *client) Update(i *integrations.Integration) error {
c.mux.Lock()
defer c.mux.Unlock()
var r requester
switch i.Provider {
case "bugsnag":
r = new(bugsnag)
case "cloudwatch":
r = new(cloudwatch)
case "datadog":
r = new(datadog)
case "elasticsearch":
r = new(elasticsearch)
case "newrelic":
r = new(newrelic)
case "rollbar":
r = new(rollbar)
case "sentry":
r = new(sentry)
case "stackdriver":
r = new(stackdriver)
case "sumologic":
r = new(sumologic)
}
if err := json.Unmarshal(i.Options, r); err != nil {
return err
}
c.integration = i
c.requester = r
return nil
}
// package scope
func (c *client) setLastMessageTimestamp(timestamp uint64) {
if timestamp > c.requestData.LastMessageTimestamp {
c.requestData.LastMessageTimestamp = timestamp
}
}
func (c *client) getLastMessageTimestamp() uint64 {
return c.requestData.LastMessageTimestamp
}
func (c *client) setLastMessageId(timestamp uint64, id string) {
//if timestamp >= c.requestData.LastMessageTimestamp {
c.requestData.LastMessageId = id
c.requestData.LastMessageTimestamp = timestamp
//}
}
func (c *client) getLastMessageId() string {
return c.requestData.LastMessageId
}
func (c *client) handleError(err error) {
c.errChan <- fmt.Errorf("%v | Integration: %v", err, *c.integration)
}
// Thread-safe
func (c *client) Request() {
c.mux.Lock()
defer c.mux.Unlock()
if c.requestData.UnsuccessfullAttemptsCount >= MAX_ATTEMPTS ||
(c.requestData.UnsuccessfullAttemptsCount >= MAX_ATTEMPTS_IN_A_ROW &&
time.Now().UnixMilli()-c.requestData.LastAttemptTimestamp < ATTEMPTS_INTERVAL) {
return
}
c.requestData.LastAttemptTimestamp = time.Now().UnixMilli()
err := c.requester.Request(c)
if err != nil {
log.Println("ERRROR L139")
log.Println(err)
c.handleError(err)
c.requestData.UnsuccessfullAttemptsCount++
} else {
c.requestData.UnsuccessfullAttemptsCount = 0
}
rd, err := json.Marshal(c.requestData)
if err != nil {
c.handleError(err)
}
// RequestData is a byte array (pointer-like type), but it's replacement
// won't affect the previous value sent by channel
c.integration.RequestData = rd
c.updateChan <- *c.integration
}