[Integrations] small refactoring (#1752)

* feat(backend): small refactoring in integrations

* feat(backend): ignore context timeout error
This commit is contained in:
Alexander 2023-12-14 11:28:05 +01:00 committed by GitHub
parent ef95bdeca5
commit f8a8dfa459
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
32 changed files with 467 additions and 396 deletions

View file

@ -1,17 +1,16 @@
package main
import (
"context"
"log"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/integrations"
"os"
"os/signal"
"syscall"
"time"
"github.com/jackc/pgx/v4"
config "openreplay/backend/internal/config/integrations"
"openreplay/backend/internal/integrations/clientManager"
"openreplay/backend/pkg/intervals"
"openreplay/backend/pkg/integrations"
"openreplay/backend/pkg/metrics"
databaseMetrics "openreplay/backend/pkg/metrics/database"
"openreplay/backend/pkg/queue"
@ -26,89 +25,38 @@ func main() {
cfg := config.New()
// Init postgres connection
pgConn, err := pool.New(cfg.Postgres.String())
pgConn, err := pgx.Connect(context.Background(), cfg.Postgres.String())
if err != nil {
log.Printf("can't init postgres connection: %s", err)
return
log.Fatalf("can't init postgres connection: %s", err)
}
defer pgConn.Close()
tokenizer := token.NewTokenizer(cfg.TokenSecret)
manager := clientManager.NewManager()
defer pgConn.Close(context.Background())
producer := queue.NewProducer(cfg.MessageSizeLimit, true)
defer producer.Close(15000)
// TODO: rework with integration manager
listener, err := integrations.New(pgConn, cfg.Postgres.String())
storage := integrations.NewStorage(pgConn)
if err := storage.Listen(); err != nil {
log.Fatalf("Listener error: %v", err)
}
listener, err := integrations.New(cfg, storage, producer, integrations.NewManager(), token.NewTokenizer(cfg.TokenSecret))
if err != nil {
log.Printf("Postgres listener error: %v\n", err)
log.Fatalf("Postgres listener error")
log.Fatalf("Listener error: %v", err)
}
defer listener.Close()
listener.IterateIntegrationsOrdered(func(i *integrations.Integration, err error) {
if err != nil {
log.Printf("Postgres error: %v\n", err)
return
}
log.Printf("Integration initialization: %v\n", *i)
err = manager.Update(i)
if err != nil {
log.Printf("Integration parse error: %v | Integration: %v\n", err, *i)
return
}
})
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
tick := time.Tick(intervals.INTEGRATIONS_REQUEST_INTERVAL * time.Millisecond)
log.Printf("Integration service started\n")
manager.RequestAll()
for {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
listener.Close()
pgConn.Close()
os.Exit(0)
case <-tick:
log.Printf("Requesting all...\n")
manager.RequestAll()
case event := <-manager.Events:
log.Printf("New integration event: %+v\n", *event.IntegrationEvent)
sessionID := event.SessionID
if sessionID == 0 {
sessData, err := tokenizer.Parse(event.Token)
if err != nil && err != token.EXPIRED {
log.Printf("Error on token parsing: %v; Token: %v", err, event.Token)
continue
}
sessionID = sessData.ID
}
producer.Produce(cfg.TopicAnalytics, sessionID, event.IntegrationEvent.Encode())
case err := <-manager.Errors:
log.Printf("Integration error: %v\n", err)
case i := <-manager.RequestDataUpdates:
// log.Printf("Last request integration update: %v || %v\n", i, string(i.RequestData))
if err := listener.UpdateIntegrationRequestData(&i); err != nil {
log.Printf("Postgres Update request_data error: %v\n", err)
}
case err := <-listener.Errors:
log.Printf("Postgres listen error: %v\n", err)
listener.Close()
pgConn.Close()
log.Printf("Listener error: %v", err)
os.Exit(0)
case iPointer := <-listener.Integrations:
log.Printf("Integration update: %v\n", *iPointer)
err := manager.Update(iPointer)
if err != nil {
log.Printf("Integration parse error: %v | Integration: %v\n", err, *iPointer)
}
}
}
}

View file

@ -1,48 +0,0 @@
package clientManager
import (
"openreplay/backend/internal/integrations/integration"
"openreplay/backend/pkg/integrations"
"strconv"
)
type manager struct {
clientMap integration.ClientMap
Events chan *integration.SessionErrorEvent
Errors chan error
RequestDataUpdates chan integrations.Integration // not pointer because it could change in other thread
}
func NewManager() *manager {
return &manager{
clientMap: make(integration.ClientMap),
RequestDataUpdates: make(chan integrations.Integration, 100),
Events: make(chan *integration.SessionErrorEvent, 100),
Errors: make(chan error, 100),
}
}
func (m *manager) Update(i *integrations.Integration) error {
key := strconv.Itoa(int(i.ProjectID)) + i.Provider
if i.Options == nil {
delete(m.clientMap, key)
return nil
}
c, exists := m.clientMap[key]
if !exists {
c, err := integration.NewClient(i, m.RequestDataUpdates, m.Events, m.Errors)
if err != nil {
return err
}
m.clientMap[key] = c
return nil
}
return c.Update(i)
}
func (m *manager) RequestAll() {
for _, c := range m.clientMap {
go c.Request()
}
}

View file

@ -1,153 +0,0 @@
package integration
import (
"encoding/json"
"fmt"
"log"
"openreplay/backend/pkg/integrations"
"sync"
"time"
"openreplay/backend/pkg/messages"
)
const MAX_ATTEMPTS_IN_A_ROW = 4
const MAX_ATTEMPTS = 40
const ATTEMPTS_INTERVAL = 3 * 60 * 60 * 1000
type requester interface {
Request(*client) error
}
type requestData struct {
LastMessageTimestamp uint64 // `json:"lastMessageTimestamp, string"`
LastMessageId string
UnsuccessfullAttemptsCount int
LastAttemptTimestamp int64
}
type client struct {
requestData
requester
integration *integrations.Integration
// TODO: timeout ?
mux sync.Mutex
updateChan chan<- integrations.Integration
evChan chan<- *SessionErrorEvent
errChan chan<- error
}
type SessionErrorEvent struct {
SessionID uint64
Token string
*messages.IntegrationEvent
}
type ClientMap map[string]*client
func NewClient(i *integrations.Integration, updateChan chan<- integrations.Integration, evChan chan<- *SessionErrorEvent, errChan chan<- error) (*client, error) {
c := new(client)
if err := c.Update(i); err != nil {
return nil, err
}
if err := json.Unmarshal(i.RequestData, &c.requestData); err != nil {
return nil, err
}
c.evChan = evChan
c.errChan = errChan
c.updateChan = updateChan
// TODO: RequestData manager
if c.requestData.LastMessageTimestamp == 0 {
// ?
c.requestData.LastMessageTimestamp = uint64(time.Now().Add(-time.Hour * 24).UnixMilli())
}
return c, nil
}
// from outside
func (c *client) Update(i *integrations.Integration) error {
c.mux.Lock()
defer c.mux.Unlock()
var r requester
switch i.Provider {
case "bugsnag":
r = new(bugsnag)
case "cloudwatch":
r = new(cloudwatch)
case "datadog":
r = new(datadog)
case "elasticsearch":
r = new(elasticsearch)
case "newrelic":
r = new(newrelic)
case "rollbar":
r = new(rollbar)
case "sentry":
r = new(sentry)
case "stackdriver":
r = new(stackdriver)
case "sumologic":
r = new(sumologic)
}
if err := json.Unmarshal(i.Options, r); err != nil {
return err
}
c.integration = i
c.requester = r
return nil
}
// package scope
func (c *client) setLastMessageTimestamp(timestamp uint64) {
if timestamp > c.requestData.LastMessageTimestamp {
c.requestData.LastMessageTimestamp = timestamp
}
}
func (c *client) getLastMessageTimestamp() uint64 {
return c.requestData.LastMessageTimestamp
}
func (c *client) setLastMessageId(timestamp uint64, id string) {
//if timestamp >= c.requestData.LastMessageTimestamp {
c.requestData.LastMessageId = id
c.requestData.LastMessageTimestamp = timestamp
//}
}
func (c *client) getLastMessageId() string {
return c.requestData.LastMessageId
}
func (c *client) handleError(err error) {
c.errChan <- fmt.Errorf("%v | Integration: %v", err, *c.integration)
}
// Thread-safe
func (c *client) Request() {
c.mux.Lock()
defer c.mux.Unlock()
if c.requestData.UnsuccessfullAttemptsCount >= MAX_ATTEMPTS ||
(c.requestData.UnsuccessfullAttemptsCount >= MAX_ATTEMPTS_IN_A_ROW &&
time.Now().UnixMilli()-c.requestData.LastAttemptTimestamp < ATTEMPTS_INTERVAL) {
return
}
c.requestData.LastAttemptTimestamp = time.Now().UnixMilli()
err := c.requester.Request(c)
if err != nil {
log.Println("ERRROR L139")
log.Println(err)
c.handleError(err)
c.requestData.UnsuccessfullAttemptsCount++
} else {
c.requestData.UnsuccessfullAttemptsCount = 0
}
rd, err := json.Marshal(c.requestData)
if err != nil {
c.handleError(err)
}
// RequestData is a byte array (pointer-like type), but it's replacement
// won't affect the previous value sent by channel
c.integration.RequestData = rd
c.updateChan <- *c.integration
}

View file

@ -1,4 +1,4 @@
package integration
package clients
import (
"encoding/json"
@ -34,8 +34,9 @@ type bugsnagEvent struct {
}
}
// need result chan and lastMessageTs
func (b *bugsnag) Request(c *client) error {
sinceTs := c.getLastMessageTimestamp() + 1000 // From next second
sinceTs := c.requestData.GetLastMessageTimestamp() + 1000 // From next second
sinceFormatted := time.UnixMilli(int64(sinceTs)).Format(time.RFC3339)
requestURL := fmt.Sprintf("https://api.bugsnag.com/projects/%v/events", b.BugsnagProjectId)
req, err := http.NewRequest("GET", requestURL, nil)
@ -93,7 +94,7 @@ func (b *bugsnag) Request(c *client) error {
continue
}
timestamp := uint64(parsedTime.UnixMilli())
c.setLastMessageTimestamp(timestamp)
c.requestData.SetLastMessageTimestamp(timestamp)
c.evChan <- &SessionErrorEvent{
SessionID: sessionID,
Token: token,

View file

@ -0,0 +1,110 @@
package clients
import (
"encoding/json"
"fmt"
"openreplay/backend/pkg/integrations/model"
"openreplay/backend/pkg/messages"
"sync"
)
type requester interface {
Request(*client) error
}
type client struct {
requester
requestData *model.RequestInfo
integration *model.Integration
mux sync.Mutex
updateChan chan<- model.Integration
evChan chan<- *SessionErrorEvent
errChan chan<- error
}
type SessionErrorEvent struct {
SessionID uint64
Token string
*messages.IntegrationEvent
}
type ClientMap map[string]*client
func NewClient(i *model.Integration, updateChan chan<- model.Integration, evChan chan<- *SessionErrorEvent, errChan chan<- error) (*client, error) {
ri, err := i.GetRequestInfo()
if err != nil {
return nil, err
}
c := &client{
evChan: evChan,
errChan: errChan,
updateChan: updateChan,
requestData: ri,
}
return c, nil
}
func (c *client) Update(i *model.Integration) error {
var r requester
switch i.Provider {
case "bugsnag":
r = new(bugsnag)
case "cloudwatch":
r = new(cloudwatch)
case "datadog":
r = new(datadog)
case "elasticsearch":
r = new(elasticsearch)
case "newrelic":
r = new(newrelic)
case "rollbar":
r = new(rollbar)
case "sentry":
r = new(sentry)
case "stackdriver":
r = new(stackdriver)
case "sumologic":
r = new(sumologic)
}
if err := json.Unmarshal(i.Options, r); err != nil {
return err
}
c.mux.Lock()
defer c.mux.Unlock()
c.integration = i
c.requester = r
return nil
}
func (c *client) handleError(err error) {
c.errChan <- fmt.Errorf("%v | Integration: %v", err, *c.integration)
}
func (c *client) Request() {
c.mux.Lock()
defer c.mux.Unlock()
if !c.requestData.CanAttempt() {
return
}
c.requestData.UpdateLastAttempt()
err := c.requester.Request(c)
if err != nil {
c.requestData.Inc()
c.handleError(fmt.Errorf("ERRROR L139, err: %s", err))
} else {
c.requestData.Reset()
}
rd, err := c.requestData.Encode()
if err != nil {
c.handleError(err)
}
// RequestData is a byte array (pointer-like type), but it's replacement
// won't affect the previous value sent by channel
c.integration.RequestData = rd
c.updateChan <- *c.integration
}

View file

@ -1,11 +1,10 @@
package integration
package clients
import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"openreplay/backend/pkg/messages"
"regexp"
"strings"
@ -21,7 +20,7 @@ type cloudwatch struct {
}
func (cw *cloudwatch) Request(c *client) error {
startTs := int64(c.getLastMessageTimestamp() + 1) // From next millisecond
startTs := int64(c.requestData.GetLastMessageTimestamp() + 1) // From next millisecond
//endTs := utils.CurrentTimestamp()
sess, err := session.NewSession(aws.NewConfig().
WithRegion(cw.Region).
@ -40,7 +39,7 @@ func (cw *cloudwatch) Request(c *client) error {
// SetLimit(10000). // Default 10000
SetLogGroupName(cw.LogGroupName).
SetFilterPattern("openReplaySessionToken")
//SetFilterPattern("asayer_session_id")
//SetFilterPattern("asayer_session_id")
for {
output, err := svc.FilterLogEvents(filterOptions)
@ -64,7 +63,7 @@ func (cw *cloudwatch) Request(c *client) error {
name = *e.LogStreamName
}
timestamp := uint64(*e.Timestamp)
c.setLastMessageTimestamp(timestamp)
c.requestData.SetLastMessageTimestamp(timestamp)
c.evChan <- &SessionErrorEvent{
//SessionID: sessionID,
Token: token,

View file

@ -1,4 +1,4 @@
package integration
package clients
import (
"bytes"
@ -71,7 +71,7 @@ func (d *datadog) makeRequest(nextLogId *string, fromTs uint64, toTs uint64) (*h
}
func (d *datadog) Request(c *client) error {
fromTs := c.getLastMessageTimestamp() + 1 // From next millisecond
fromTs := c.requestData.GetLastMessageTimestamp() + 1 // From next millisecond
toTs := uint64(time.Now().UnixMilli())
var nextLogId *string
for {
@ -111,7 +111,7 @@ func (d *datadog) Request(c *client) error {
continue
}
timestamp := uint64(parsedTime.UnixMilli())
c.setLastMessageTimestamp(timestamp)
c.requestData.SetLastMessageTimestamp(timestamp)
c.evChan <- &SessionErrorEvent{
//SessionID: sessionID,
Token: token,

View file

@ -1,4 +1,4 @@
package integration
package clients
import (
"bytes"
@ -34,8 +34,6 @@ func (es *elasticsearch) Request(c *client) error {
Addresses: []string{
address,
},
//Username: es.ApiKeyId,
//Password: es.ApiKey,
APIKey: apiKey,
}
esC, err := elasticlib.NewClient(cfg)
@ -46,7 +44,7 @@ func (es *elasticsearch) Request(c *client) error {
return err
}
gteTs := c.getLastMessageTimestamp() + 1000 // Sec or millisec to add ?
gteTs := c.requestData.GetLastMessageTimestamp() + 1000
log.Printf("gteTs: %v ", gteTs)
var buf bytes.Buffer
query := map[string]interface{}{
@ -164,7 +162,7 @@ func (es *elasticsearch) Request(c *client) error {
continue
}
timestamp := uint64(esLog.Time.UnixMilli())
c.setLastMessageTimestamp(timestamp)
c.requestData.SetLastMessageTimestamp(timestamp)
var sessionID uint64
sessionID, err = strconv.ParseUint(token, 10, 64)

View file

@ -1,4 +1,4 @@
package integration
package clients
import (
"encoding/json"
@ -27,18 +27,16 @@ type newrelicResponce struct {
Results []struct {
Events []json.RawMessage
}
// Metadata
}
type newrelicEvent struct {
//AsayerSessionID uint64 `json:"asayer_session_id,string"` // string/int decoder?
OpenReplaySessionToken string `json:"openReplaySessionToken"`
ErrorClass string `json:"error.class"`
Timestamp uint64 `json:"timestamp"`
}
func (nr *newrelic) Request(c *client) error {
sinceTs := c.getLastMessageTimestamp() + 1000 // From next second
sinceTs := c.requestData.GetLastMessageTimestamp() + 1000 // From next second
// In docs - format "yyyy-mm-dd HH:MM:ss", but time.RFC3339 works fine too
sinceFormatted := time.UnixMilli(int64(sinceTs)).Format(time.RFC3339)
// US/EU endpoint ??
@ -86,7 +84,7 @@ func (nr *newrelic) Request(c *client) error {
continue
}
c.setLastMessageTimestamp(e.Timestamp)
c.requestData.SetLastMessageTimestamp(e.Timestamp)
c.evChan <- &SessionErrorEvent{
Token: e.OpenReplaySessionToken,
IntegrationEvent: &messages.IntegrationEvent{

View file

@ -1,4 +1,4 @@
package integration
package clients
import (
"encoding/json"
@ -60,12 +60,12 @@ type rollbarJobStatusResponce struct {
type rollbarEvent map[string]string
/*
It is possible to use /api/1/instances (20 per page)
Jobs for the identical requests are hashed
It is possible to use /api/1/instances (20 per page)
Jobs for the identical requests are hashed
*/
func (rb *rollbar) Request(c *client) error {
fromTs := c.getLastMessageTimestamp() + 1000 // From next second
c.setLastMessageTimestamp(fromTs) // anti-job-hashing
fromTs := c.requestData.GetLastMessageTimestamp() + 1000 // From next second
c.requestData.SetLastMessageTimestamp(fromTs) // anti-job-hashing
fromTsSec := fromTs / 1e3
query := fmt.Sprintf(RB_QUERY, fromTsSec)
jsonBody := fmt.Sprintf(`{
@ -153,7 +153,7 @@ func (rb *rollbar) Request(c *client) error {
continue
}
timestamp := timestampSec * 1000
c.setLastMessageTimestamp(timestamp)
c.requestData.SetLastMessageTimestamp(timestamp)
c.evChan <- &SessionErrorEvent{
Token: e["body.message.openReplaySessionToken"],
IntegrationEvent: &messages.IntegrationEvent{

View file

@ -1,4 +1,4 @@
package integration
package clients
import (
"encoding/json"
@ -46,7 +46,7 @@ func (sn *sentry) Request(c *client) error {
req.Header.Add("Authorization", authHeader)
// by link ?
lastEventId := c.getLastMessageId()
lastEventId := c.requestData.GetLastMessageId()
firstEvent := true
PageLoop:
@ -88,7 +88,7 @@ PageLoop:
timestamp := uint64(parsedTime.UnixMilli())
// TODO: not to receive all the messages (use default integration timestamp)
if firstEvent { // TODO: reverse range?
c.setLastMessageId(timestamp, e.EventID)
c.requestData.SetLastMessageId(timestamp, e.EventID)
firstEvent = false
}

View file

@ -1,10 +1,9 @@
package integration
package clients
import (
"cloud.google.com/go/logging/logadmin"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
//"strconv"
"context"
"encoding/json"
@ -33,7 +32,7 @@ type saCreds struct {
}
func (sd *stackdriver) Request(c *client) error {
fromTs := c.getLastMessageTimestamp() + 1 // Timestamp is RFC3339Nano, so we take the next millisecond
fromTs := c.requestData.GetLastMessageTimestamp() + 1 // Timestamp is RFC3339Nano, so we take the next millisecond
fromFormatted := time.UnixMilli(int64(fromTs)).Format(time.RFC3339Nano)
ctx := context.Background()
@ -85,7 +84,7 @@ func (sd *stackdriver) Request(c *client) error {
continue
}
timestamp := uint64(e.Timestamp.UnixMilli())
c.setLastMessageTimestamp(timestamp)
c.requestData.SetLastMessageTimestamp(timestamp)
c.evChan <- &SessionErrorEvent{
//SessionID: sessionID,
Token: token,

View file

@ -1,4 +1,4 @@
package integration
package clients
import (
"encoding/json"
@ -13,10 +13,10 @@ import (
)
/*
The maximum value for limit is 10,000 messages or 100 MB in total message size,
which means the query may return less than 10,000 messages if you exceed the size limit.
The maximum value for limit is 10,000 messages or 100 MB in total message size,
which means the query may return less than 10,000 messages if you exceed the size limit.
API Documentation: https://help.sumologic.com/APIs/Search-Job-API/About-the-Search-Job-API
API Documentation: https://help.sumologic.com/APIs/Search-Job-API/About-the-Search-Job-API
*/
const SL_LIMIT = 10000
@ -67,7 +67,7 @@ func (sl *sumologic) deleteJob(jobId string, errChan chan<- error) {
}
func (sl *sumologic) Request(c *client) error {
fromTs := c.getLastMessageTimestamp() + 1 // From next millisecond
fromTs := c.requestData.GetLastMessageTimestamp() + 1 // From next millisecond
toTs := time.Now().UnixMilli()
requestURL := fmt.Sprintf("https://api.%vsumologic.com/api/v1/search/jobs", "eu.") // deployment server??
jsonBody := fmt.Sprintf(`{
@ -189,7 +189,7 @@ func (sl *sumologic) Request(c *client) error {
if len(name) > 20 {
name = name[:20] // not sure about that
}
c.setLastMessageTimestamp(e.Timestamp)
c.requestData.SetLastMessageTimestamp(e.Timestamp)
c.evChan <- &SessionErrorEvent{
//SessionID: sessionID,
Token: token,

View file

@ -1,22 +1,11 @@
package integration
package clients
import (
"fmt"
"regexp"
"strconv"
"strings"
)
var reSessionID = regexp.MustCompile(`(?i)asayer_session_id=([0-9]+)`)
func GetAsayerSessionId(s string) (uint64, error) {
matches := reSessionID.FindStringSubmatch(s)
if len(matches) < 2 {
return 0, fmt.Errorf("'asayer_session_id' not found in '%v' ", s)
}
return strconv.ParseUint(matches[1], 10, 64)
}
func GetLinkFromAngularBrackets(s string) string {
beg := strings.Index(s, "<") + 1
end := strings.Index(s, ">")

View file

@ -1,100 +1,94 @@
package integrations
import (
"context"
"encoding/json"
"fmt"
"log"
"strings"
"time"
"github.com/jackc/pgx/v4"
"openreplay/backend/pkg/db/postgres/pool"
config "openreplay/backend/internal/config/integrations"
"openreplay/backend/pkg/intervals"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/token"
)
type Listener struct {
conn *pgx.Conn
db pool.Pool
Integrations chan *Integration
Errors chan error
cfg *config.Config
storage Storage
producer types.Producer
manager *Manager
tokenizer *token.Tokenizer
Errors chan error
}
type Integration struct {
ProjectID uint32 `json:"project_id"`
Provider string `json:"provider"`
RequestData json.RawMessage `json:"request_data"`
Options json.RawMessage `json:"options"`
}
func New(db pool.Pool, url string) (*Listener, error) {
conn, err := pgx.Connect(context.Background(), url)
func New(cfg *config.Config, storage Storage, producer types.Producer, manager *Manager, tokenizer *token.Tokenizer) (*Listener, error) {
listener := &Listener{
cfg: cfg,
storage: storage,
Errors: make(chan error),
producer: producer,
manager: manager,
tokenizer: tokenizer,
}
ints, err := storage.GetAll()
if err != nil {
return nil, err
}
listener := &Listener{
conn: conn,
db: db,
Errors: make(chan error),
for _, i := range ints {
// Add new integration to manager
if err = manager.Update(i); err != nil {
log.Printf("Integration parse error: %v | Integration: %v\n", err, *i)
}
}
listener.Integrations = make(chan *Integration, 50)
if _, err := conn.Exec(context.Background(), "LISTEN integration"); err != nil {
return nil, err
}
go listener.listen()
manager.RequestAll()
go listener.worker()
return listener, nil
}
func (listener *Listener) listen() {
func (l *Listener) worker() {
clientsCheckTick := time.Tick(intervals.INTEGRATIONS_REQUEST_INTERVAL * time.Millisecond)
for {
notification, err := listener.conn.WaitForNotification(context.Background())
if err != nil {
listener.Errors <- err
continue
}
switch notification.Channel {
case "integration":
integrationP := new(Integration)
if err := json.Unmarshal([]byte(notification.Payload), integrationP); err != nil {
listener.Errors <- fmt.Errorf("%v | Payload: %v", err, notification.Payload)
} else {
listener.Integrations <- integrationP
select {
case <-clientsCheckTick:
l.manager.RequestAll()
case event := <-l.manager.Events:
log.Printf("New integration event: %+v\n", *event.IntegrationEvent)
sessionID := event.SessionID
if sessionID == 0 {
sessData, err := l.tokenizer.Parse(event.Token)
if err != nil && err != token.EXPIRED {
log.Printf("Error on token parsing: %v; Token: %v", err, event.Token)
continue
}
sessionID = sessData.ID
}
// Why do we produce integration events to analytics topic
l.producer.Produce(l.cfg.TopicAnalytics, sessionID, event.IntegrationEvent.Encode())
case err := <-l.manager.Errors:
log.Printf("Integration error: %v\n", err)
case i := <-l.manager.RequestDataUpdates:
if err := l.storage.Update(&i); err != nil {
log.Printf("Postgres Update request_data error: %v\n", err)
}
default:
newNotification, err := l.storage.CheckNew()
if err != nil {
if strings.Contains(err.Error(), "context deadline exceeded") {
continue
}
l.Errors <- fmt.Errorf("Integration storage error: %v", err)
continue
}
log.Printf("Integration update: %v\n", *newNotification)
err = l.manager.Update(newNotification)
if err != nil {
log.Printf("Integration parse error: %v | Integration: %v\n", err, *newNotification)
}
}
}
}
func (listener *Listener) Close() error {
return listener.conn.Close(context.Background())
}
func (listener *Listener) IterateIntegrationsOrdered(iter func(integration *Integration, err error)) error {
rows, err := listener.db.Query(`
SELECT project_id, provider, options, request_data
FROM integrations
`)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
i := new(Integration)
if err := rows.Scan(&i.ProjectID, &i.Provider, &i.Options, &i.RequestData); err != nil {
iter(nil, err)
continue
}
iter(i, nil)
}
if err = rows.Err(); err != nil {
return err
}
return nil
}
func (listener *Listener) UpdateIntegrationRequestData(i *Integration) error {
return listener.db.Exec(`
UPDATE integrations
SET request_data = $1
WHERE project_id=$2 AND provider=$3`,
i.RequestData, i.ProjectID, i.Provider,
)
func (l *Listener) Close() error {
return l.storage.UnListen()
}

View file

@ -0,0 +1,48 @@
package integrations
import (
"log"
"openreplay/backend/pkg/integrations/clients"
"openreplay/backend/pkg/integrations/model"
)
type Manager struct {
clientMap clients.ClientMap
Events chan *clients.SessionErrorEvent
Errors chan error
RequestDataUpdates chan model.Integration // not pointer because it could change in other thread
}
func NewManager() *Manager {
return &Manager{
clientMap: make(clients.ClientMap),
RequestDataUpdates: make(chan model.Integration, 100),
Events: make(chan *clients.SessionErrorEvent, 100),
Errors: make(chan error, 100),
}
}
func (m *Manager) Update(i *model.Integration) (err error) {
log.Printf("Integration initialization: %v\n", *i)
key := i.GetKey()
if i.Options == nil {
delete(m.clientMap, key)
return nil
}
c, exists := m.clientMap[key]
if !exists {
c, err = clients.NewClient(i, m.RequestDataUpdates, m.Events, m.Errors)
if err != nil {
return err
}
m.clientMap[key] = c
}
return c.Update(i)
}
func (m *Manager) RequestAll() {
log.Printf("Requesting all...\n")
for _, c := range m.clientMap {
go c.Request()
}
}

View file

@ -0,0 +1,38 @@
package model
import (
"encoding/json"
"fmt"
"time"
)
type Integration struct {
ProjectID uint32 `json:"project_id"`
Provider string `json:"provider"`
RequestData json.RawMessage `json:"request_data"`
Options json.RawMessage `json:"options"`
}
func (i *Integration) Encode() []byte {
b, _ := json.Marshal(i)
return b
}
func (i *Integration) Decode(data []byte) error {
return json.Unmarshal(data, i)
}
func (i *Integration) GetKey() string {
return fmt.Sprintf("%d%s", i.ProjectID, i.Provider)
}
func (i *Integration) GetRequestInfo() (*RequestInfo, error) {
ri := new(RequestInfo)
if err := json.Unmarshal(i.RequestData, ri); err != nil {
return nil, err
}
if ri.LastMessageTimestamp == 0 {
ri.LastMessageTimestamp = uint64(time.Now().Add(-time.Hour * 24).UnixMilli())
}
return ri, nil
}

View file

@ -0,0 +1,58 @@
package model
import (
"encoding/json"
"time"
)
const MAX_ATTEMPTS_IN_A_ROW = 4
const MAX_ATTEMPTS = 40
const ATTEMPTS_INTERVAL = 3 * 60 * 60 * 1000
type RequestInfo struct {
LastMessageId string
LastMessageTimestamp uint64
LastAttemptTimestamp int64
UnsuccessfullAttemptsCount int
}
func (c *RequestInfo) SetLastMessageTimestamp(timestamp uint64) {
if timestamp > c.LastMessageTimestamp {
c.LastMessageTimestamp = timestamp
}
}
func (c *RequestInfo) GetLastMessageTimestamp() uint64 {
return c.LastMessageTimestamp
}
func (c *RequestInfo) SetLastMessageId(timestamp uint64, id string) {
c.LastMessageId = id
c.LastMessageTimestamp = timestamp
}
func (c *RequestInfo) GetLastMessageId() string {
return c.LastMessageId
}
func (c *RequestInfo) CanAttempt() bool {
if c.UnsuccessfullAttemptsCount >= MAX_ATTEMPTS ||
(c.UnsuccessfullAttemptsCount >= MAX_ATTEMPTS_IN_A_ROW &&
time.Now().UnixMilli()-c.LastAttemptTimestamp < ATTEMPTS_INTERVAL) {
return false
}
return true
}
func (c *RequestInfo) UpdateLastAttempt() {
c.LastAttemptTimestamp = time.Now().UnixMilli()
}
func (c *RequestInfo) Inc() {
c.UnsuccessfullAttemptsCount++
}
func (c *RequestInfo) Reset() {
c.UnsuccessfullAttemptsCount = 0
}
func (c *RequestInfo) Encode() ([]byte, error) {
return json.Marshal(c)
}

View file

@ -0,0 +1,92 @@
package integrations
import (
"context"
"encoding/json"
"fmt"
"log"
"openreplay/backend/pkg/integrations/model"
"time"
"github.com/jackc/pgx/v4"
)
type Storage interface {
Listen() error
UnListen() error
CheckNew() (*model.Integration, error)
GetAll() ([]*model.Integration, error)
Update(i *model.Integration) error
}
type storageImpl struct {
conn *pgx.Conn
}
func NewStorage(conn *pgx.Conn) Storage {
return &storageImpl{
conn: conn,
}
}
func (s *storageImpl) Listen() error {
_, err := s.conn.Exec(context.Background(), "LISTEN integration")
return err
}
func (s *storageImpl) UnListen() error {
_, err := s.conn.Exec(context.Background(), "UNLISTEN integration")
return err
}
func (s *storageImpl) CheckNew() (*model.Integration, error) {
ctx, _ := context.WithTimeout(context.Background(), 100*time.Millisecond)
notification, err := s.conn.WaitForNotification(ctx)
if err != nil {
return nil, err
}
if notification.Channel == "integration" {
integrationP := new(model.Integration)
if err := json.Unmarshal([]byte(notification.Payload), integrationP); err != nil {
return nil, err
}
return integrationP, nil
}
return nil, fmt.Errorf("unknown notification channel: %s", notification.Channel)
}
func (s *storageImpl) GetAll() ([]*model.Integration, error) {
rows, err := s.conn.Query(context.Background(), `
SELECT project_id, provider, options, request_data
FROM integrations
`)
if err != nil {
return nil, err
}
defer rows.Close()
integrations := make([]*model.Integration, 0)
for rows.Next() {
i := new(model.Integration)
if err := rows.Scan(&i.ProjectID, &i.Provider, &i.Options, &i.RequestData); err != nil {
log.Printf("Postgres scan error: %v\n", err)
continue
}
integrations = append(integrations, i)
}
if err = rows.Err(); err != nil {
return nil, err
}
return integrations, nil
}
func (s *storageImpl) Update(i *model.Integration) error {
_, err := s.conn.Exec(context.Background(), `
UPDATE integrations
SET request_data = $1
WHERE project_id=$2 AND provider=$3`,
i.RequestData, i.ProjectID, i.Provider,
)
return err
}