diff --git a/backend/services/alerts/main.go b/backend/services/alerts/main.go index 2c0ff09f2..e90df24b8 100644 --- a/backend/services/alerts/main.go +++ b/backend/services/alerts/main.go @@ -8,8 +8,8 @@ import ( "syscall" "time" - "openreplay/backend/pkg/env" "openreplay/backend/pkg/db/postgres" + "openreplay/backend/pkg/env" ) func main() { @@ -25,8 +25,7 @@ func main() { log.Fatal(err) } - - manager := NewManager(NOTIFICATIONS_STRING, pgs, pg) + manager := NewManager(NOTIFICATIONS_STRING, POSTGRES_STRING, pgs, pg) if err := pg.IterateAlerts(func(a *postgres.Alert, err error) { if err != nil { log.Printf("Postgres error: %v\n", err) diff --git a/backend/services/alerts/manager.go b/backend/services/alerts/manager.go index 9e8d9cde2..11ddb9363 100644 --- a/backend/services/alerts/manager.go +++ b/backend/services/alerts/manager.go @@ -16,6 +16,7 @@ const PGParallelLimit = 2 var pgCount int64 type manager struct { + postgresString string notificationsUrl string alertsCache map[uint32]*postgres.Alert cacheMutex sync.Mutex @@ -28,8 +29,9 @@ type manager struct { notificationsMutex sync.Mutex } -func NewManager(notificationsUrl string, pg *sql.DB, pg *postgres.Conn) *manager { +func NewManager(notificationsUrl string, postgresString string, pgs *sql.DB, pg *postgres.Conn) *manager { return &manager{ + postgresString: postgresString, notificationsUrl: notificationsUrl, alertsCache: make(map[uint32]*postgres.Alert), cacheMutex: sync.Mutex{}, @@ -157,10 +159,10 @@ func (m *manager) ProcessNotifications(allNotifications map[uint32]*postgres.Ten if err := m.pg.SaveLastNotification(allIds); err != nil { log.Printf("Error saving LastNotification time: %v", err) if err.Error() == "conn closed" { - m.pg, err = postgres.NewPostgres(POSTGRES_STRING) - if err != nil { - panic(fmt.Sprintf("Postgres renew notifications connection error: %v\n", err)) - } + m.pg = postgres.NewConn(m.postgresString) + //if err != nil { + // panic(fmt.Sprintf("Postgres renew notifications connection error: %v\n", err)) + //} if err := m.pg.SaveLastNotification(allIds); err != nil { panic(fmt.Sprintf("Error saving LastNotification time, suicide: %v", err)) } diff --git a/backend/services/ender/builder/builder.go b/backend/services/ender/builder/builder.go index 49fbd9443..cccf96bcf 100644 --- a/backend/services/ender/builder/builder.go +++ b/backend/services/ender/builder/builder.go @@ -4,8 +4,6 @@ import ( "net/url" "strings" - "log" - "openreplay/backend/pkg/intervals" . "openreplay/backend/pkg/messages" ) diff --git a/backend/services/http/handlers.go b/backend/services/http/handlers.go index 607f302f9..c17a46286 100644 --- a/backend/services/http/handlers.go +++ b/backend/services/http/handlers.go @@ -3,36 +3,36 @@ package main import ( "encoding/json" "errors" - "time" - "log" "io" "io/ioutil" - "net/http" + "log" "math/rand" + "net/http" "strconv" + "time" gzip "github.com/klauspost/pgzip" - . "openreplay/backend/pkg/messages" "openreplay/backend/pkg/db/postgres" - "openreplay/backend/pkg/token" + . "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/token" ) -const JSON_SIZE_LIMIT int64 = 1e3 // 1Kb -const BATCH_SIZE_LIMIT int64 = 1e6 // 1Mb +const JSON_SIZE_LIMIT int64 = 1e3 // 1Kb +const BATCH_SIZE_LIMIT int64 = 1e6 // 1Mb func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) { type request struct { - Token string `json:"token"` - UserUUID *string `json:"userUUID"` - RevID string `json:"revID"` - Timestamp uint64 `json:"timestamp"` - TrackerVersion string `json:"trackerVersion"` - IsSnippet bool `json:"isSnippet"` - DeviceMemory uint64 `json:"deviceMemory"` - JsHeapSizeLimit uint64 `json:"jsHeapSizeLimit"` - ProjectKey *string `json:"projectKey"` - Reset boolean `json:"reset"` + Token string `json:"token"` + UserUUID *string `json:"userUUID"` + RevID string `json:"revID"` + Timestamp uint64 `json:"timestamp"` + TrackerVersion string `json:"trackerVersion"` + IsSnippet bool `json:"isSnippet"` + DeviceMemory uint64 `json:"deviceMemory"` + JsHeapSizeLimit uint64 `json:"jsHeapSizeLimit"` + ProjectKey *string `json:"projectKey"` + Reset bool `json:"reset"` } type response struct { Timestamp int64 `json:"timestamp"` @@ -91,21 +91,21 @@ func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) { country := geoIP.ExtractISOCodeFromHTTPRequest(r) producer.Produce(topicRaw, tokenData.ID, Encode(&SessionStart{ - Timestamp: req.Timestamp, - ProjectID: uint64(p.ProjectID), - TrackerVersion: req.TrackerVersion, - RevID: req.RevID, - UserUUID: userUUID, - UserAgent: r.Header.Get("User-Agent"), - UserOS: ua.OS, - UserOSVersion: ua.OSVersion, - UserBrowser: ua.Browser, - UserBrowserVersion: ua.BrowserVersion, - UserDevice: ua.Device, - UserDeviceType: ua.DeviceType, - UserCountry: country, + Timestamp: req.Timestamp, + ProjectID: uint64(p.ProjectID), + TrackerVersion: req.TrackerVersion, + RevID: req.RevID, + UserUUID: userUUID, + UserAgent: r.Header.Get("User-Agent"), + UserOS: ua.OS, + UserOSVersion: ua.OSVersion, + UserBrowser: ua.Browser, + UserBrowserVersion: ua.BrowserVersion, + UserDevice: ua.Device, + UserDeviceType: ua.DeviceType, + UserCountry: country, UserDeviceMemorySize: req.DeviceMemory, - UserDeviceHeapSize: req.JsHeapSizeLimit, + UserDeviceHeapSize: req.JsHeapSizeLimit, })) } @@ -119,8 +119,6 @@ func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) { }) } - - func pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64) { body := http.MaxBytesReader(w, r.Body, BATCH_SIZE_LIMIT) //defer body.Close() @@ -132,7 +130,7 @@ func pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64) { responseWithError(w, http.StatusInternalServerError, err) // TODO: stage-dependent responce return } - defer reader.Close() + defer reader.Close() default: reader = body } @@ -170,7 +168,7 @@ func pushMessagesSeparatelyHandler(w http.ResponseWriter, r *http.Request) { //log.Printf("Sending batch...") //startTime := time.Now() -// analyticsMessages := make([]Message, 0, 200) + // analyticsMessages := make([]Message, 0, 200) rewritenBuf, err := RewriteBatch(buf, func(msg Message) Message { switch m := msg.(type) { @@ -178,63 +176,61 @@ func pushMessagesSeparatelyHandler(w http.ResponseWriter, r *http.Request) { if m.Name == "src" || m.Name == "href" { sendAssetForCache(sessionData.ID, m.BaseURL, m.Value) msg = &SetNodeAttribute{ - ID: m.ID, - Name: m.Name, + ID: m.ID, + Name: m.Name, Value: rewriter.RewriteURL(sessionData.ID, m.BaseURL, m.Value), } } else if m.Name == "style" { sendAssetsForCacheFromCSS(sessionData.ID, m.BaseURL, m.Value) msg = &SetNodeAttribute{ - ID: m.ID, - Name: m.Name, + ID: m.ID, + Name: m.Name, Value: rewriter.RewriteCSS(sessionData.ID, m.BaseURL, m.Value), } } case *SetCSSDataURLBased: sendAssetsForCacheFromCSS(sessionData.ID, m.BaseURL, m.Data) msg = &SetCSSData{ - ID: m.ID, + ID: m.ID, Data: rewriter.RewriteCSS(sessionData.ID, m.BaseURL, m.Data), } - case *CSSInsertRuleURLBased: + case *CSSInsertRuleURLBased: sendAssetsForCacheFromCSS(sessionData.ID, m.BaseURL, m.Rule) msg = &CSSInsertRule{ - ID: m.ID, + ID: m.ID, Index: m.Index, - Rule: rewriter.RewriteCSS(sessionData.ID, m.BaseURL, m.Rule), - } + Rule: rewriter.RewriteCSS(sessionData.ID, m.BaseURL, m.Rule), + } } - // switch msg.(type) { - // case *BatchMeta, // TODO: watchout! Meta().Index'es are changed here (though it is still unique for the topic-session pair) - // *SetPageLocation, - // *PageLoadTiming, - // *PageRenderTiming, - // *PerformanceTrack, - // *SetInputTarget, - // *SetInputValue, - // *MouseClick, - // *RawErrorEvent, - // *JSException, - // *ResourceTiming, - // *RawCustomEvent, - // *CustomIssue, - // *Fetch, - // *StateAction, - // *GraphQL, - // *CreateElementNode, - // *CreateTextNode, - // *RemoveNode, - // *CreateDocument, - // *RemoveNodeAttribute, - // *MoveNode, - // *SetCSSData, - // *CSSInsertRule, - // *CSSDeleteRule: - // analyticsMessages = append(analyticsMessages, msg) - //} - - + // switch msg.(type) { + // case *BatchMeta, // TODO: watchout! Meta().Index'es are changed here (though it is still unique for the topic-session pair) + // *SetPageLocation, + // *PageLoadTiming, + // *PageRenderTiming, + // *PerformanceTrack, + // *SetInputTarget, + // *SetInputValue, + // *MouseClick, + // *RawErrorEvent, + // *JSException, + // *ResourceTiming, + // *RawCustomEvent, + // *CustomIssue, + // *Fetch, + // *StateAction, + // *GraphQL, + // *CreateElementNode, + // *CreateTextNode, + // *RemoveNode, + // *CreateDocument, + // *RemoveNodeAttribute, + // *MoveNode, + // *SetCSSData, + // *CSSInsertRule, + // *CSSDeleteRule: + // analyticsMessages = append(analyticsMessages, msg) + //} return msg }) @@ -251,9 +247,9 @@ func pushMessagesSeparatelyHandler(w http.ResponseWriter, r *http.Request) { func notStartedHandler(w http.ResponseWriter, r *http.Request) { type request struct { - ProjectKey *string `json:"projectKey"` - TrackerVersion string `json:"trackerVersion"` - DoNotTrack bool `json:"DoNotTrack"` + ProjectKey *string `json:"projectKey"` + TrackerVersion string `json:"trackerVersion"` + DoNotTrack bool `json:"DoNotTrack"` // RevID string `json:"revID"` } req := &request{} @@ -274,23 +270,21 @@ func notStartedHandler(w http.ResponseWriter, r *http.Request) { } country := geoIP.ExtractISOCodeFromHTTPRequest(r) err := pgconn.InsertUnstartedSession(postgres.UnstartedSession{ - ProjectKey: *req.ProjectKey, - TrackerVersion: req.TrackerVersion, - DoNotTrack: req.DoNotTrack, - Platform: "web", - UserAgent: r.Header.Get("User-Agent"), - UserOS: ua.OS, - UserOSVersion: ua.OSVersion, - UserBrowser: ua.Browser, + ProjectKey: *req.ProjectKey, + TrackerVersion: req.TrackerVersion, + DoNotTrack: req.DoNotTrack, + Platform: "web", + UserAgent: r.Header.Get("User-Agent"), + UserOS: ua.OS, + UserOSVersion: ua.OSVersion, + UserBrowser: ua.Browser, UserBrowserVersion: ua.BrowserVersion, - UserDevice: ua.Device, - UserDeviceType: ua.DeviceType, - UserCountry: country, + UserDevice: ua.Device, + UserDeviceType: ua.DeviceType, + UserCountry: country, }) if err != nil { - log.Printf("Unable to insert Unstarted Session: %v\n", err); + log.Printf("Unable to insert Unstarted Session: %v\n", err) } w.WriteHeader(http.StatusOK) } - -