From 11c86f555d749965604af8bf4335f12fe8132ce9 Mon Sep 17 00:00:00 2001 From: ShiKhu Date: Wed, 10 Nov 2021 17:04:45 +0100 Subject: [PATCH] feat(backend): topic redirections --- backend/Dockerfile | 4 +- backend/Dockerfile.bundle | 4 +- backend/services/assets/main.go | 4 +- backend/services/db/main.go | 2 +- backend/services/ender/main.go | 3 +- backend/services/http/assets.go | 2 +- backend/services/http/handlers.go | 255 +------------------------- backend/services/http/handlers_ios.go | 24 ++- backend/services/http/handlers_web.go | 249 +++++++++++++++++++++++++ backend/services/http/main.go | 23 ++- backend/services/integrations/main.go | 2 +- backend/services/sink/main.go | 13 +- 12 files changed, 299 insertions(+), 286 deletions(-) create mode 100644 backend/services/http/handlers_web.go diff --git a/backend/Dockerfile b/backend/Dockerfile index 8353b8f63..6ca305ca1 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -28,11 +28,11 @@ ENV TZ=UTC \ BEACON_SIZE_LIMIT=7000000 \ KAFKA_USE_SSL=true \ REDIS_STREAMS_MAX_LEN=3000 \ - TOPIC_RAW=raw \ + TOPIC_RAW_WEB=raw \ + TOPIC_RAW_IOS=raw-ios \ TOPIC_CACHE=cache \ TOPIC_ANALYTICS=analytics \ TOPIC_TRIGGER=trigger \ - TOPIC_EVENTS=events \ GROUP_SINK=sink \ GROUP_STORAGE=storage \ GROUP_DB=db \ diff --git a/backend/Dockerfile.bundle b/backend/Dockerfile.bundle index 904bb45f4..efbcb2684 100644 --- a/backend/Dockerfile.bundle +++ b/backend/Dockerfile.bundle @@ -29,11 +29,11 @@ ENV TZ=UTC \ BEACON_SIZE_LIMIT=1000000 \ KAFKA_USE_SSL=true \ REDIS_STREAMS_MAX_LEN=3000 \ - TOPIC_RAW=raw \ + TOPIC_RAW_WEB=raw \ + TOPIC_RAW_IOS=raw-ios \ TOPIC_CACHE=cache \ TOPIC_ANALYTICS=analytics \ TOPIC_TRIGGER=trigger \ - TOPIC_EVENTS=events \ GROUP_SINK=sink \ GROUP_STORAGE=storage \ GROUP_DB=db \ diff --git a/backend/services/assets/main.go b/backend/services/assets/main.go index 05c779bbb..34f4558b9 100644 --- a/backend/services/assets/main.go +++ b/backend/services/assets/main.go @@ -20,7 +20,7 @@ func main() { log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) GROUP_CACHE := env.String("GROUP_CACHE") - TOPIC_TRIGGER := env.String("TOPIC_TRIGGER") + TOPIC_CACHE := env.String("TOPIC_CACHE") cacher := cacher.NewCacher( env.String("AWS_REGION"), @@ -31,7 +31,7 @@ func main() { consumer := queue.NewMessageConsumer( GROUP_CACHE, - []string{ TOPIC_TRIGGER }, + []string{ TOPIC_CACHE }, func(sessionID uint64, message messages.Message, e *types.Meta) { switch msg := message.(type) { case *messages.AssetCache: diff --git a/backend/services/db/main.go b/backend/services/db/main.go index a2cef41b3..c0f48aaff 100644 --- a/backend/services/db/main.go +++ b/backend/services/db/main.go @@ -32,7 +32,7 @@ func main() { consumer := queue.NewMessageConsumer( env.String("GROUP_DB"), []string{ - //env.String("TOPIC_RAW"), + env.String("TOPIC_IOS_RAW"), env.String("TOPIC_TRIGGER"), }, func(sessionID uint64, msg messages.Message, _ *types.Meta) { diff --git a/backend/services/ender/main.go b/backend/services/ender/main.go index e99c6866c..0fbd1010c 100644 --- a/backend/services/ender/main.go +++ b/backend/services/ender/main.go @@ -30,7 +30,8 @@ func main() { consumer := queue.NewMessageConsumer( GROUP_EVENTS, []string{ - env.String("TOPIC_RAW"), + env.String("TOPIC_RAW_WEB"), + env.String("TOPIC_RAW_IOS"), }, func(sessionID uint64, msg messages.Message, meta *types.Meta) { lastTs = meta.Timestamp diff --git a/backend/services/http/assets.go b/backend/services/http/assets.go index 69fb7f53d..cc055087a 100644 --- a/backend/services/http/assets.go +++ b/backend/services/http/assets.go @@ -7,7 +7,7 @@ import ( func sendAssetForCache(sessionID uint64, baseURL string, relativeURL string) { if fullURL, cacheable := assets.GetFullCachableURL(baseURL, relativeURL); cacheable { - producer.Produce(TOPIC_TRIGGER, sessionID, messages.Encode(&messages.AssetCache{ + producer.Produce(TOPIC_CACHE, sessionID, messages.Encode(&messages.AssetCache{ URL: fullURL, })) } diff --git a/backend/services/http/handlers.go b/backend/services/http/handlers.go index 2ac2852a2..e45e84e64 100644 --- a/backend/services/http/handlers.go +++ b/backend/services/http/handlers.go @@ -1,126 +1,17 @@ package main import ( - "encoding/json" - "errors" "io" "io/ioutil" "log" - "math/rand" "net/http" - "strconv" - "time" gzip "github.com/klauspost/pgzip" - - "openreplay/backend/pkg/db/postgres" - "openreplay/backend/pkg/token" - . "openreplay/backend/pkg/messages" ) const JSON_SIZE_LIMIT int64 = 1e3 // 1Kb -func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) { - type request struct { - Token string `json:"token"` - UserUUID *string `json:"userUUID"` - RevID string `json:"revID"` - Timestamp uint64 `json:"timestamp"` - TrackerVersion string `json:"trackerVersion"` - IsSnippet bool `json:"isSnippet"` - DeviceMemory uint64 `json:"deviceMemory"` - JsHeapSizeLimit uint64 `json:"jsHeapSizeLimit"` - ProjectKey *string `json:"projectKey"` - Reset bool `json:"reset"` - } - type response struct { - Timestamp int64 `json:"timestamp"` - Delay int64 `json:"delay"` - Token string `json:"token"` - UserUUID string `json:"userUUID"` - SessionID string `json:"sessionID"` - BeaconSizeLimit int64 `json:"beaconSizeLimit"` - } - - startTime := time.Now() - req := &request{} - body := http.MaxBytesReader(w, r.Body, JSON_SIZE_LIMIT) // what if Body == nil?? // use r.ContentLength to return specific error? - //defer body.Close() - if err := json.NewDecoder(body).Decode(req); err != nil { - responseWithError(w, http.StatusBadRequest, err) - return - } - - if req.ProjectKey == nil { - responseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required")) - return - } - - p, err := pgconn.GetProjectByKey(*req.ProjectKey) - if err != nil { - if postgres.IsNoRowsErr(err) { - responseWithError(w, http.StatusNotFound, errors.New("Project doesn't exist or is not active")) - } else { - responseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging - } - return - } - - userUUID := getUUID(req.UserUUID) - tokenData, err := tokenizer.Parse(req.Token) - if err != nil || req.Reset { // Starting the new one - dice := byte(rand.Intn(100)) // [0, 100) - if dice >= p.SampleRate { - responseWithError(w, http.StatusForbidden, errors.New("cancel")) - return - } - - ua := uaParser.ParseFromHTTPRequest(r) - if ua == nil { - responseWithError(w, http.StatusForbidden, errors.New("browser not recognized")) - return - } - sessionID, err := flaker.Compose(uint64(startTime.UnixNano() / 1e6)) - if err != nil { - responseWithError(w, http.StatusInternalServerError, err) - return - } - // TODO: if EXPIRED => send message for two sessions association - expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond) - tokenData = &token.TokenData{sessionID, expTime.UnixNano() / 1e6} - - country := geoIP.ExtractISOCodeFromHTTPRequest(r) - producer.Produce(TOPIC_RAW, tokenData.ID, Encode(&SessionStart{ - Timestamp: req.Timestamp, - ProjectID: uint64(p.ProjectID), - TrackerVersion: req.TrackerVersion, - RevID: req.RevID, - UserUUID: userUUID, - UserAgent: r.Header.Get("User-Agent"), - UserOS: ua.OS, - UserOSVersion: ua.OSVersion, - UserBrowser: ua.Browser, - UserBrowserVersion: ua.BrowserVersion, - UserDevice: ua.Device, - UserDeviceType: ua.DeviceType, - UserCountry: country, - UserDeviceMemorySize: req.DeviceMemory, - UserDeviceHeapSize: req.JsHeapSizeLimit, - })) - } - - //delayDuration := time.Now().Sub(startTime) - responseWithJSON(w, &response{ - //Timestamp: startTime.UnixNano() / 1e6, - //Delay: delayDuration.Nanoseconds() / 1e6, - Token: tokenizer.Compose(*tokenData), - UserUUID: userUUID, - SessionID: strconv.FormatUint(tokenData.ID, 10), - BeaconSizeLimit: BEACON_SIZE_LIMIT, - }) -} - -func pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64) { +func pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64, topicName string) { body := http.MaxBytesReader(w, r.Body, BEACON_SIZE_LIMIT) //defer body.Close() var reader io.ReadCloser @@ -145,148 +36,6 @@ func pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64) { responseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging return } - producer.Produce(TOPIC_RAW, sessionID, buf) // What if not able to send? - w.WriteHeader(http.StatusOK) -} - -func pushMessagesHandler(w http.ResponseWriter, r *http.Request) { - sessionData, err := tokenizer.ParseFromHTTPRequest(r) - if err != nil { - responseWithError(w, http.StatusUnauthorized, err) - return - } - pushMessages(w, r, sessionData.ID) -} - -func pushMessagesSeparatelyHandler(w http.ResponseWriter, r *http.Request) { - sessionData, err := tokenizer.ParseFromHTTPRequest(r) - if err != nil { - responseWithError(w, http.StatusUnauthorized, err) - return - } - body := http.MaxBytesReader(w, r.Body, BEACON_SIZE_LIMIT) - //defer body.Close() - buf, err := ioutil.ReadAll(body) - if err != nil { - responseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging - return - } - //log.Printf("Sending batch...") - //startTime := time.Now() - - // analyticsMessages := make([]Message, 0, 200) - - rewritenBuf, err := RewriteBatch(buf, func(msg Message) Message { - switch m := msg.(type) { - case *SetNodeAttributeURLBased: - if m.Name == "src" || m.Name == "href" { - msg = &SetNodeAttribute{ - ID: m.ID, - Name: m.Name, - Value: handleURL(sessionData.ID, m.BaseURL, m.Value), - } - } else if m.Name == "style" { - msg = &SetNodeAttribute{ - ID: m.ID, - Name: m.Name, - Value: handleCSS(sessionData.ID, m.BaseURL, m.Value), - } - } - case *SetCSSDataURLBased: - msg = &SetCSSData{ - ID: m.ID, - Data: handleCSS(sessionData.ID, m.BaseURL, m.Data), - } - case *CSSInsertRuleURLBased: - msg = &CSSInsertRule{ - ID: m.ID, - Index: m.Index, - Rule: handleCSS(sessionData.ID, m.BaseURL, m.Rule), - } - } - - // switch msg.(type) { - // case *BatchMeta, // TODO: watchout! Meta().Index'es are changed here (though it is still unique for the topic-session pair) - // *SetPageLocation, - // *PageLoadTiming, - // *PageRenderTiming, - // *PerformanceTrack, - // *SetInputTarget, - // *SetInputValue, - // *MouseClick, - // *RawErrorEvent, - // *JSException, - // *ResourceTiming, - // *RawCustomEvent, - // *CustomIssue, - // *Fetch, - // *StateAction, - // *GraphQL, - // *CreateElementNode, - // *CreateTextNode, - // *RemoveNode, - // *CreateDocument, - // *RemoveNodeAttribute, - // *MoveNode, - // *SetCSSData, - // *CSSInsertRule, - // *CSSDeleteRule: - // analyticsMessages = append(analyticsMessages, msg) - //} - - return msg - }) - if err != nil { - responseWithError(w, http.StatusForbidden, err) - return - } - producer.Produce(TOPIC_RAW, sessionData.ID, rewritenBuf) - //producer.Produce(TOPIC_ANALYTICS, sessionData.ID, WriteBatch(analyticsMessages)) - //duration := time.Now().Sub(startTime) - //log.Printf("Sended batch within %v nsec; %v nsek/byte", duration.Nanoseconds(), duration.Nanoseconds()/int64(len(buf))) - w.WriteHeader(http.StatusOK) -} - -func notStartedHandler(w http.ResponseWriter, r *http.Request) { - type request struct { - ProjectKey *string `json:"projectKey"` - TrackerVersion string `json:"trackerVersion"` - DoNotTrack bool `json:"DoNotTrack"` - // RevID string `json:"revID"` - } - req := &request{} - body := http.MaxBytesReader(w, r.Body, JSON_SIZE_LIMIT) - defer body.Close() - if err := json.NewDecoder(body).Decode(req); err != nil { - responseWithError(w, http.StatusBadRequest, err) - return - } - if req.ProjectKey == nil { - responseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required")) - return - } - ua := uaParser.ParseFromHTTPRequest(r) // TODO?: insert anyway - if ua == nil { - responseWithError(w, http.StatusForbidden, errors.New("browser not recognized")) - return - } - country := geoIP.ExtractISOCodeFromHTTPRequest(r) - err := pgconn.InsertUnstartedSession(postgres.UnstartedSession{ - ProjectKey: *req.ProjectKey, - TrackerVersion: req.TrackerVersion, - DoNotTrack: req.DoNotTrack, - Platform: "web", - UserAgent: r.Header.Get("User-Agent"), - UserOS: ua.OS, - UserOSVersion: ua.OSVersion, - UserBrowser: ua.Browser, - UserBrowserVersion: ua.BrowserVersion, - UserDevice: ua.Device, - UserDeviceType: ua.DeviceType, - UserCountry: country, - }) - if err != nil { - log.Printf("Unable to insert Unstarted Session: %v\n", err) - } + producer.Produce(topicName, sessionID, buf) // What if not able to send? w.WriteHeader(http.StatusOK) } diff --git a/backend/services/http/handlers_ios.go b/backend/services/http/handlers_ios.go index 3d2bea213..3bc70e9d3 100644 --- a/backend/services/http/handlers_ios.go +++ b/backend/services/http/handlers_ios.go @@ -97,7 +97,7 @@ func startSessionHandlerIOS(w http.ResponseWriter, r *http.Request) { country := geoIP.ExtractISOCodeFromHTTPRequest(r) // The difference with web is mostly here: - producer.Produce(TOPIC_RAW, tokenData.ID, Encode(&IOSSessionStart{ + producer.Produce(TOPIC_RAW_IOS, tokenData.ID, Encode(&IOSSessionStart{ Timestamp: req.Timestamp, ProjectID: uint64(p.ProjectID), TrackerVersion: req.TrackerVersion, @@ -127,18 +127,29 @@ func startSessionHandlerIOS(w http.ResponseWriter, r *http.Request) { } -func pushLateMessagesHandler(w http.ResponseWriter, r *http.Request) { +func pushMessagesHandlerIOS(w http.ResponseWriter, r *http.Request) { + sessionData, err := tokenizer.ParseFromHTTPRequest(r) + if err != nil { + responseWithError(w, http.StatusUnauthorized, err) + return + } + pushMessages(w, r, sessionData.ID, TOPIC_RAW_IOS) +} + + + +func pushLateMessagesHandlerIOS(w http.ResponseWriter, r *http.Request) { sessionData, err := tokenizer.ParseFromHTTPRequest(r) if err != nil && err != token.EXPIRED { responseWithError(w, http.StatusUnauthorized, err) return } // Check timestamps here? - pushMessages(w, r, sessionData.ID) + pushMessages(w, r, sessionData.ID,TOPIC_RAW_IOS) } -func iosImagesUploadHandler(w http.ResponseWriter, r *http.Request) { +func imagesUploadHandlerIOS(w http.ResponseWriter, r *http.Request) { sessionData, err := tokenizer.ParseFromHTTPRequest(r) if err != nil { // Should accept expired token? responseWithError(w, http.StatusUnauthorized, err) @@ -168,13 +179,12 @@ func iosImagesUploadHandler(w http.ResponseWriter, r *http.Request) { for _, fileHeaderList := range r.MultipartForm.File { for _, fileHeader := range fileHeaderList { - file, err := fileHeader.Open() //TODO: mime type from header + file, err := fileHeader.Open() if err != nil { continue // TODO: send server error or accumulate successful files } key := prefix + fileHeader.Filename - log.Printf("Uploading ios screen: %v", key) - go func() { + go func() { //TODO: mime type from header if err := s3.Upload(file, key, "image/jpeg", false); err != nil { log.Printf("Upload ios screen error. %v", err) } diff --git a/backend/services/http/handlers_web.go b/backend/services/http/handlers_web.go new file mode 100644 index 000000000..5e144f1cc --- /dev/null +++ b/backend/services/http/handlers_web.go @@ -0,0 +1,249 @@ +package main + +import ( + "encoding/json" + "errors" + "io/ioutil" + "log" + "math/rand" + "net/http" + "strconv" + "time" + + "openreplay/backend/pkg/db/postgres" + "openreplay/backend/pkg/token" + . "openreplay/backend/pkg/messages" +) + +func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) { + type request struct { + Token string `json:"token"` + UserUUID *string `json:"userUUID"` + RevID string `json:"revID"` + Timestamp uint64 `json:"timestamp"` + TrackerVersion string `json:"trackerVersion"` + IsSnippet bool `json:"isSnippet"` + DeviceMemory uint64 `json:"deviceMemory"` + JsHeapSizeLimit uint64 `json:"jsHeapSizeLimit"` + ProjectKey *string `json:"projectKey"` + Reset bool `json:"reset"` + } + type response struct { + Timestamp int64 `json:"timestamp"` + Delay int64 `json:"delay"` + Token string `json:"token"` + UserUUID string `json:"userUUID"` + SessionID string `json:"sessionID"` + BeaconSizeLimit int64 `json:"beaconSizeLimit"` + } + + startTime := time.Now() + req := &request{} + body := http.MaxBytesReader(w, r.Body, JSON_SIZE_LIMIT) // what if Body == nil?? // use r.ContentLength to return specific error? + //defer body.Close() + if err := json.NewDecoder(body).Decode(req); err != nil { + responseWithError(w, http.StatusBadRequest, err) + return + } + + if req.ProjectKey == nil { + responseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required")) + return + } + + p, err := pgconn.GetProjectByKey(*req.ProjectKey) + if err != nil { + if postgres.IsNoRowsErr(err) { + responseWithError(w, http.StatusNotFound, errors.New("Project doesn't exist or capture limit has been reached")) + } else { + responseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging + } + return + } + + userUUID := getUUID(req.UserUUID) + tokenData, err := tokenizer.Parse(req.Token) + if err != nil || req.Reset { // Starting the new one + dice := byte(rand.Intn(100)) // [0, 100) + if dice >= p.SampleRate { + responseWithError(w, http.StatusForbidden, errors.New("cancel")) + return + } + + ua := uaParser.ParseFromHTTPRequest(r) + if ua == nil { + responseWithError(w, http.StatusForbidden, errors.New("browser not recognized")) + return + } + sessionID, err := flaker.Compose(uint64(startTime.UnixNano() / 1e6)) + if err != nil { + responseWithError(w, http.StatusInternalServerError, err) + return + } + // TODO: if EXPIRED => send message for two sessions association + expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond) + tokenData = &token.TokenData{sessionID, expTime.UnixNano() / 1e6} + + country := geoIP.ExtractISOCodeFromHTTPRequest(r) + producer.Produce(TOPIC_RAW_WEB, tokenData.ID, Encode(&SessionStart{ + Timestamp: req.Timestamp, + ProjectID: uint64(p.ProjectID), + TrackerVersion: req.TrackerVersion, + RevID: req.RevID, + UserUUID: userUUID, + UserAgent: r.Header.Get("User-Agent"), + UserOS: ua.OS, + UserOSVersion: ua.OSVersion, + UserBrowser: ua.Browser, + UserBrowserVersion: ua.BrowserVersion, + UserDevice: ua.Device, + UserDeviceType: ua.DeviceType, + UserCountry: country, + UserDeviceMemorySize: req.DeviceMemory, + UserDeviceHeapSize: req.JsHeapSizeLimit, + })) + } + + //delayDuration := time.Now().Sub(startTime) + responseWithJSON(w, &response{ + //Timestamp: startTime.UnixNano() / 1e6, + //Delay: delayDuration.Nanoseconds() / 1e6, + Token: tokenizer.Compose(*tokenData), + UserUUID: userUUID, + SessionID: strconv.FormatUint(tokenData.ID, 10), + BeaconSizeLimit: BEACON_SIZE_LIMIT, + }) +} + +func pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request) { + sessionData, err := tokenizer.ParseFromHTTPRequest(r) + if err != nil { + responseWithError(w, http.StatusUnauthorized, err) + return + } + body := http.MaxBytesReader(w, r.Body, BEACON_SIZE_LIMIT) + //defer body.Close() + buf, err := ioutil.ReadAll(body) + if err != nil { + responseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging + return + } + //log.Printf("Sending batch...") + //startTime := time.Now() + + // analyticsMessages := make([]Message, 0, 200) + + rewritenBuf, err := RewriteBatch(buf, func(msg Message) Message { + switch m := msg.(type) { + case *SetNodeAttributeURLBased: + if m.Name == "src" || m.Name == "href" { + msg = &SetNodeAttribute{ + ID: m.ID, + Name: m.Name, + Value: handleURL(sessionData.ID, m.BaseURL, m.Value), + } + } else if m.Name == "style" { + msg = &SetNodeAttribute{ + ID: m.ID, + Name: m.Name, + Value: handleCSS(sessionData.ID, m.BaseURL, m.Value), + } + } + case *SetCSSDataURLBased: + msg = &SetCSSData{ + ID: m.ID, + Data: handleCSS(sessionData.ID, m.BaseURL, m.Data), + } + case *CSSInsertRuleURLBased: + msg = &CSSInsertRule{ + ID: m.ID, + Index: m.Index, + Rule: handleCSS(sessionData.ID, m.BaseURL, m.Rule), + } + } + + // switch msg.(type) { + // case *BatchMeta, // TODO: watchout! Meta().Index'es are changed here (though it is still unique for the topic-session pair) + // *SetPageLocation, + // *PageLoadTiming, + // *PageRenderTiming, + // *PerformanceTrack, + // *SetInputTarget, + // *SetInputValue, + // *MouseClick, + // *RawErrorEvent, + // *JSException, + // *ResourceTiming, + // *RawCustomEvent, + // *CustomIssue, + // *Fetch, + // *StateAction, + // *GraphQL, + // *CreateElementNode, + // *CreateTextNode, + // *RemoveNode, + // *CreateDocument, + // *RemoveNodeAttribute, + // *MoveNode, + // *SetCSSData, + // *CSSInsertRule, + // *CSSDeleteRule: + // analyticsMessages = append(analyticsMessages, msg) + //} + + return msg + }) + if err != nil { + responseWithError(w, http.StatusForbidden, err) + return + } + producer.Produce(TOPIC_RAW_WEB, sessionData.ID, rewritenBuf) + //producer.Produce(TOPIC_ANALYTICS, sessionData.ID, WriteBatch(analyticsMessages)) + //duration := time.Now().Sub(startTime) + //log.Printf("Sended batch within %v nsec; %v nsek/byte", duration.Nanoseconds(), duration.Nanoseconds()/int64(len(buf))) + w.WriteHeader(http.StatusOK) +} + +func notStartedHandlerWeb(w http.ResponseWriter, r *http.Request) { + type request struct { + ProjectKey *string `json:"projectKey"` + TrackerVersion string `json:"trackerVersion"` + DoNotTrack bool `json:"DoNotTrack"` + // RevID string `json:"revID"` + } + req := &request{} + body := http.MaxBytesReader(w, r.Body, JSON_SIZE_LIMIT) + defer body.Close() + if err := json.NewDecoder(body).Decode(req); err != nil { + responseWithError(w, http.StatusBadRequest, err) + return + } + if req.ProjectKey == nil { + responseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required")) + return + } + ua := uaParser.ParseFromHTTPRequest(r) // TODO?: insert anyway + if ua == nil { + responseWithError(w, http.StatusForbidden, errors.New("browser not recognized")) + return + } + country := geoIP.ExtractISOCodeFromHTTPRequest(r) + err := pgconn.InsertUnstartedSession(postgres.UnstartedSession{ + ProjectKey: *req.ProjectKey, + TrackerVersion: req.TrackerVersion, + DoNotTrack: req.DoNotTrack, + Platform: "web", + UserAgent: r.Header.Get("User-Agent"), + UserOS: ua.OS, + UserOSVersion: ua.OSVersion, + UserBrowser: ua.Browser, + UserBrowserVersion: ua.BrowserVersion, + UserDevice: ua.Device, + UserDeviceType: ua.DeviceType, + UserCountry: country, + }) + if err != nil { + log.Printf("Unable to insert Unstarted Session: %v\n", err) + } + w.WriteHeader(http.StatusOK) +} \ No newline at end of file diff --git a/backend/services/http/main.go b/backend/services/http/main.go index ccd755ed2..9d82139d5 100644 --- a/backend/services/http/main.go +++ b/backend/services/http/main.go @@ -34,10 +34,11 @@ var geoIP *geoip.GeoIP var tokenizer *token.Tokenizer var s3 *storage.S3 -var TOPIC_RAW string +var TOPIC_RAW_WEB string +var TOPIC_RAW_IOS string +var TOPIC_CACHE string var TOPIC_TRIGGER string -var TOPIC_ANALYTICS string -// var kafkaTopicEvents string +//var TOPIC_ANALYTICS string var CACHE_ASSESTS bool var BEACON_SIZE_LIMIT int64 @@ -46,9 +47,11 @@ func main() { producer = queue.NewProducer() defer producer.Close(15000) - TOPIC_RAW = env.String("TOPIC_RAW") + TOPIC_RAW_WEB = env.String("TOPIC_RAW_WEB") + TOPIC_RAW_IOS = env.String("TOPIC_RAW_IOS") + TOPIC_CACHE = env.String("TOPIC_CACHE") TOPIC_TRIGGER = env.String("TOPIC_TRIGGER") - TOPIC_ANALYTICS = env.String("TOPIC_ANALYTICS") + //TOPIC_ANALYTICS = env.String("TOPIC_ANALYTICS") rewriter = assets.NewRewriter(env.String("ASSETS_ORIGIN")) pgconn = cache.NewPGCache(postgres.NewConn(env.String("POSTGRES_STRING")), 1000 * 60 * 20) defer pgconn.Close() @@ -85,7 +88,7 @@ func main() { case "/v1/web/not-started": switch r.Method { case http.MethodPost: - notStartedHandler(w, r) + notStartedHandlerWeb(w, r) default: w.WriteHeader(http.StatusMethodNotAllowed) } @@ -99,7 +102,7 @@ func main() { case "/v1/web/i": switch r.Method { case http.MethodPost: - pushMessagesSeparatelyHandler(w, r) + pushMessagesHandlerWeb(w, r) default: w.WriteHeader(http.StatusMethodNotAllowed) } @@ -113,21 +116,21 @@ func main() { case "/v1/ios/i": switch r.Method { case http.MethodPost: - pushMessagesHandler(w, r) + pushMessagesHandlerIOS(w, r) default: w.WriteHeader(http.StatusMethodNotAllowed) } case "/v1/ios/late": switch r.Method { case http.MethodPost: - pushLateMessagesHandler(w, r) + pushLateMessagesHandlerIOS(w, r) default: w.WriteHeader(http.StatusMethodNotAllowed) } case "/v1/ios/images": switch r.Method { case http.MethodPost: - iosImagesUploadHandler(w, r) + imagesUploadHandlerIOS(w, r) default: w.WriteHeader(http.StatusMethodNotAllowed) } diff --git a/backend/services/integrations/main.go b/backend/services/integrations/main.go index e1ea58ebd..e535dd6cc 100644 --- a/backend/services/integrations/main.go +++ b/backend/services/integrations/main.go @@ -19,7 +19,7 @@ import ( func main() { log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) - TOPIC_RAW := env.String("TOPIC_RAW") + TOPIC_RAW := env.String("TOPIC_RAW_WEB") POSTGRES_STRING := env.String("POSTGRES_STRING") pg := postgres.NewConn(POSTGRES_STRING) diff --git a/backend/services/sink/main.go b/backend/services/sink/main.go index 4a6ac189d..8d3e5ab02 100644 --- a/backend/services/sink/main.go +++ b/backend/services/sink/main.go @@ -10,9 +10,9 @@ import ( "syscall" "openreplay/backend/pkg/env" - "openreplay/backend/pkg/messages" "openreplay/backend/pkg/queue" "openreplay/backend/pkg/queue/types" + . "openreplay/backend/pkg/messages" ) @@ -27,16 +27,17 @@ func main() { consumer := queue.NewMessageConsumer( env.String("GROUP_SINK"), []string{ - env.String("TOPIC_RAW"), + env.String("TOPIC_RAW_WEB"), + env.String("TOPIC_RAW_IOS") }, - func(sessionID uint64, message messages.Message, _ *types.Meta) { - //typeID, err := messages.GetMessageTypeID(value) + func(sessionID uint64, message Message, _ *types.Meta) { + //typeID, err := GetMessageTypeID(value) // if err != nil { // log.Printf("Message type decoding error: %v", err) // return // } typeID := message.Meta().TypeID - if !messages.IsReplayerType(typeID) { + if !IsReplayerType(typeID) { return } @@ -44,7 +45,7 @@ func main() { value := message.Encode() var data []byte - if messages.IsIOSType(typeID) { + if IsIOSType(typeID) { data = value } else { data = make([]byte, len(value)+8)