- fixed alerts worker
- fixed ender worder
- fixed http worker
This commit is contained in:
KRAIEM Taha Yassine 2021-05-12 14:14:16 +02:00
parent 565a3b0a6a
commit 3085153d33
4 changed files with 94 additions and 101 deletions

View file

@ -8,8 +8,8 @@ import (
"syscall"
"time"
"openreplay/backend/pkg/env"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/env"
)
func main() {
@ -25,8 +25,7 @@ func main() {
log.Fatal(err)
}
manager := NewManager(NOTIFICATIONS_STRING, pgs, pg)
manager := NewManager(NOTIFICATIONS_STRING, POSTGRES_STRING, pgs, pg)
if err := pg.IterateAlerts(func(a *postgres.Alert, err error) {
if err != nil {
log.Printf("Postgres error: %v\n", err)

View file

@ -16,6 +16,7 @@ const PGParallelLimit = 2
var pgCount int64
type manager struct {
postgresString string
notificationsUrl string
alertsCache map[uint32]*postgres.Alert
cacheMutex sync.Mutex
@ -28,8 +29,9 @@ type manager struct {
notificationsMutex sync.Mutex
}
func NewManager(notificationsUrl string, pg *sql.DB, pg *postgres.Conn) *manager {
func NewManager(notificationsUrl string, postgresString string, pgs *sql.DB, pg *postgres.Conn) *manager {
return &manager{
postgresString: postgresString,
notificationsUrl: notificationsUrl,
alertsCache: make(map[uint32]*postgres.Alert),
cacheMutex: sync.Mutex{},
@ -157,10 +159,10 @@ func (m *manager) ProcessNotifications(allNotifications map[uint32]*postgres.Ten
if err := m.pg.SaveLastNotification(allIds); err != nil {
log.Printf("Error saving LastNotification time: %v", err)
if err.Error() == "conn closed" {
m.pg, err = postgres.NewPostgres(POSTGRES_STRING)
if err != nil {
panic(fmt.Sprintf("Postgres renew notifications connection error: %v\n", err))
}
m.pg = postgres.NewConn(m.postgresString)
//if err != nil {
// panic(fmt.Sprintf("Postgres renew notifications connection error: %v\n", err))
//}
if err := m.pg.SaveLastNotification(allIds); err != nil {
panic(fmt.Sprintf("Error saving LastNotification time, suicide: %v", err))
}

View file

@ -4,8 +4,6 @@ import (
"net/url"
"strings"
"log"
"openreplay/backend/pkg/intervals"
. "openreplay/backend/pkg/messages"
)

View file

@ -3,36 +3,36 @@ package main
import (
"encoding/json"
"errors"
"time"
"log"
"io"
"io/ioutil"
"net/http"
"log"
"math/rand"
"net/http"
"strconv"
"time"
gzip "github.com/klauspost/pgzip"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/token"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/token"
)
const JSON_SIZE_LIMIT int64 = 1e3 // 1Kb
const BATCH_SIZE_LIMIT int64 = 1e6 // 1Mb
const JSON_SIZE_LIMIT int64 = 1e3 // 1Kb
const BATCH_SIZE_LIMIT int64 = 1e6 // 1Mb
func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
type request struct {
Token string `json:"token"`
UserUUID *string `json:"userUUID"`
RevID string `json:"revID"`
Timestamp uint64 `json:"timestamp"`
TrackerVersion string `json:"trackerVersion"`
IsSnippet bool `json:"isSnippet"`
DeviceMemory uint64 `json:"deviceMemory"`
JsHeapSizeLimit uint64 `json:"jsHeapSizeLimit"`
ProjectKey *string `json:"projectKey"`
Reset boolean `json:"reset"`
Token string `json:"token"`
UserUUID *string `json:"userUUID"`
RevID string `json:"revID"`
Timestamp uint64 `json:"timestamp"`
TrackerVersion string `json:"trackerVersion"`
IsSnippet bool `json:"isSnippet"`
DeviceMemory uint64 `json:"deviceMemory"`
JsHeapSizeLimit uint64 `json:"jsHeapSizeLimit"`
ProjectKey *string `json:"projectKey"`
Reset bool `json:"reset"`
}
type response struct {
Timestamp int64 `json:"timestamp"`
@ -91,21 +91,21 @@ func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
country := geoIP.ExtractISOCodeFromHTTPRequest(r)
producer.Produce(topicRaw, tokenData.ID, Encode(&SessionStart{
Timestamp: req.Timestamp,
ProjectID: uint64(p.ProjectID),
TrackerVersion: req.TrackerVersion,
RevID: req.RevID,
UserUUID: userUUID,
UserAgent: r.Header.Get("User-Agent"),
UserOS: ua.OS,
UserOSVersion: ua.OSVersion,
UserBrowser: ua.Browser,
UserBrowserVersion: ua.BrowserVersion,
UserDevice: ua.Device,
UserDeviceType: ua.DeviceType,
UserCountry: country,
Timestamp: req.Timestamp,
ProjectID: uint64(p.ProjectID),
TrackerVersion: req.TrackerVersion,
RevID: req.RevID,
UserUUID: userUUID,
UserAgent: r.Header.Get("User-Agent"),
UserOS: ua.OS,
UserOSVersion: ua.OSVersion,
UserBrowser: ua.Browser,
UserBrowserVersion: ua.BrowserVersion,
UserDevice: ua.Device,
UserDeviceType: ua.DeviceType,
UserCountry: country,
UserDeviceMemorySize: req.DeviceMemory,
UserDeviceHeapSize: req.JsHeapSizeLimit,
UserDeviceHeapSize: req.JsHeapSizeLimit,
}))
}
@ -119,8 +119,6 @@ func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
})
}
func pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64) {
body := http.MaxBytesReader(w, r.Body, BATCH_SIZE_LIMIT)
//defer body.Close()
@ -132,7 +130,7 @@ func pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64) {
responseWithError(w, http.StatusInternalServerError, err) // TODO: stage-dependent responce
return
}
defer reader.Close()
defer reader.Close()
default:
reader = body
}
@ -170,7 +168,7 @@ func pushMessagesSeparatelyHandler(w http.ResponseWriter, r *http.Request) {
//log.Printf("Sending batch...")
//startTime := time.Now()
// analyticsMessages := make([]Message, 0, 200)
// analyticsMessages := make([]Message, 0, 200)
rewritenBuf, err := RewriteBatch(buf, func(msg Message) Message {
switch m := msg.(type) {
@ -178,63 +176,61 @@ func pushMessagesSeparatelyHandler(w http.ResponseWriter, r *http.Request) {
if m.Name == "src" || m.Name == "href" {
sendAssetForCache(sessionData.ID, m.BaseURL, m.Value)
msg = &SetNodeAttribute{
ID: m.ID,
Name: m.Name,
ID: m.ID,
Name: m.Name,
Value: rewriter.RewriteURL(sessionData.ID, m.BaseURL, m.Value),
}
} else if m.Name == "style" {
sendAssetsForCacheFromCSS(sessionData.ID, m.BaseURL, m.Value)
msg = &SetNodeAttribute{
ID: m.ID,
Name: m.Name,
ID: m.ID,
Name: m.Name,
Value: rewriter.RewriteCSS(sessionData.ID, m.BaseURL, m.Value),
}
}
case *SetCSSDataURLBased:
sendAssetsForCacheFromCSS(sessionData.ID, m.BaseURL, m.Data)
msg = &SetCSSData{
ID: m.ID,
ID: m.ID,
Data: rewriter.RewriteCSS(sessionData.ID, m.BaseURL, m.Data),
}
case *CSSInsertRuleURLBased:
case *CSSInsertRuleURLBased:
sendAssetsForCacheFromCSS(sessionData.ID, m.BaseURL, m.Rule)
msg = &CSSInsertRule{
ID: m.ID,
ID: m.ID,
Index: m.Index,
Rule: rewriter.RewriteCSS(sessionData.ID, m.BaseURL, m.Rule),
}
Rule: rewriter.RewriteCSS(sessionData.ID, m.BaseURL, m.Rule),
}
}
// switch msg.(type) {
// case *BatchMeta, // TODO: watchout! Meta().Index'es are changed here (though it is still unique for the topic-session pair)
// *SetPageLocation,
// *PageLoadTiming,
// *PageRenderTiming,
// *PerformanceTrack,
// *SetInputTarget,
// *SetInputValue,
// *MouseClick,
// *RawErrorEvent,
// *JSException,
// *ResourceTiming,
// *RawCustomEvent,
// *CustomIssue,
// *Fetch,
// *StateAction,
// *GraphQL,
// *CreateElementNode,
// *CreateTextNode,
// *RemoveNode,
// *CreateDocument,
// *RemoveNodeAttribute,
// *MoveNode,
// *SetCSSData,
// *CSSInsertRule,
// *CSSDeleteRule:
// analyticsMessages = append(analyticsMessages, msg)
//}
// switch msg.(type) {
// case *BatchMeta, // TODO: watchout! Meta().Index'es are changed here (though it is still unique for the topic-session pair)
// *SetPageLocation,
// *PageLoadTiming,
// *PageRenderTiming,
// *PerformanceTrack,
// *SetInputTarget,
// *SetInputValue,
// *MouseClick,
// *RawErrorEvent,
// *JSException,
// *ResourceTiming,
// *RawCustomEvent,
// *CustomIssue,
// *Fetch,
// *StateAction,
// *GraphQL,
// *CreateElementNode,
// *CreateTextNode,
// *RemoveNode,
// *CreateDocument,
// *RemoveNodeAttribute,
// *MoveNode,
// *SetCSSData,
// *CSSInsertRule,
// *CSSDeleteRule:
// analyticsMessages = append(analyticsMessages, msg)
//}
return msg
})
@ -251,9 +247,9 @@ func pushMessagesSeparatelyHandler(w http.ResponseWriter, r *http.Request) {
func notStartedHandler(w http.ResponseWriter, r *http.Request) {
type request struct {
ProjectKey *string `json:"projectKey"`
TrackerVersion string `json:"trackerVersion"`
DoNotTrack bool `json:"DoNotTrack"`
ProjectKey *string `json:"projectKey"`
TrackerVersion string `json:"trackerVersion"`
DoNotTrack bool `json:"DoNotTrack"`
// RevID string `json:"revID"`
}
req := &request{}
@ -274,23 +270,21 @@ func notStartedHandler(w http.ResponseWriter, r *http.Request) {
}
country := geoIP.ExtractISOCodeFromHTTPRequest(r)
err := pgconn.InsertUnstartedSession(postgres.UnstartedSession{
ProjectKey: *req.ProjectKey,
TrackerVersion: req.TrackerVersion,
DoNotTrack: req.DoNotTrack,
Platform: "web",
UserAgent: r.Header.Get("User-Agent"),
UserOS: ua.OS,
UserOSVersion: ua.OSVersion,
UserBrowser: ua.Browser,
ProjectKey: *req.ProjectKey,
TrackerVersion: req.TrackerVersion,
DoNotTrack: req.DoNotTrack,
Platform: "web",
UserAgent: r.Header.Get("User-Agent"),
UserOS: ua.OS,
UserOSVersion: ua.OSVersion,
UserBrowser: ua.Browser,
UserBrowserVersion: ua.BrowserVersion,
UserDevice: ua.Device,
UserDeviceType: ua.DeviceType,
UserCountry: country,
UserDevice: ua.Device,
UserDeviceType: ua.DeviceType,
UserCountry: country,
})
if err != nil {
log.Printf("Unable to insert Unstarted Session: %v\n", err);
log.Printf("Unable to insert Unstarted Session: %v\n", err)
}
w.WriteHeader(http.StatusOK)
}