feat(backend): topic redirections
This commit is contained in:
parent
f3fad20e6e
commit
11c86f555d
12 changed files with 299 additions and 286 deletions
|
|
@ -28,11 +28,11 @@ ENV TZ=UTC \
|
|||
BEACON_SIZE_LIMIT=7000000 \
|
||||
KAFKA_USE_SSL=true \
|
||||
REDIS_STREAMS_MAX_LEN=3000 \
|
||||
TOPIC_RAW=raw \
|
||||
TOPIC_RAW_WEB=raw \
|
||||
TOPIC_RAW_IOS=raw-ios \
|
||||
TOPIC_CACHE=cache \
|
||||
TOPIC_ANALYTICS=analytics \
|
||||
TOPIC_TRIGGER=trigger \
|
||||
TOPIC_EVENTS=events \
|
||||
GROUP_SINK=sink \
|
||||
GROUP_STORAGE=storage \
|
||||
GROUP_DB=db \
|
||||
|
|
|
|||
|
|
@ -29,11 +29,11 @@ ENV TZ=UTC \
|
|||
BEACON_SIZE_LIMIT=1000000 \
|
||||
KAFKA_USE_SSL=true \
|
||||
REDIS_STREAMS_MAX_LEN=3000 \
|
||||
TOPIC_RAW=raw \
|
||||
TOPIC_RAW_WEB=raw \
|
||||
TOPIC_RAW_IOS=raw-ios \
|
||||
TOPIC_CACHE=cache \
|
||||
TOPIC_ANALYTICS=analytics \
|
||||
TOPIC_TRIGGER=trigger \
|
||||
TOPIC_EVENTS=events \
|
||||
GROUP_SINK=sink \
|
||||
GROUP_STORAGE=storage \
|
||||
GROUP_DB=db \
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ func main() {
|
|||
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
|
||||
|
||||
GROUP_CACHE := env.String("GROUP_CACHE")
|
||||
TOPIC_TRIGGER := env.String("TOPIC_TRIGGER")
|
||||
TOPIC_CACHE := env.String("TOPIC_CACHE")
|
||||
|
||||
cacher := cacher.NewCacher(
|
||||
env.String("AWS_REGION"),
|
||||
|
|
@ -31,7 +31,7 @@ func main() {
|
|||
|
||||
consumer := queue.NewMessageConsumer(
|
||||
GROUP_CACHE,
|
||||
[]string{ TOPIC_TRIGGER },
|
||||
[]string{ TOPIC_CACHE },
|
||||
func(sessionID uint64, message messages.Message, e *types.Meta) {
|
||||
switch msg := message.(type) {
|
||||
case *messages.AssetCache:
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ func main() {
|
|||
consumer := queue.NewMessageConsumer(
|
||||
env.String("GROUP_DB"),
|
||||
[]string{
|
||||
//env.String("TOPIC_RAW"),
|
||||
env.String("TOPIC_IOS_RAW"),
|
||||
env.String("TOPIC_TRIGGER"),
|
||||
},
|
||||
func(sessionID uint64, msg messages.Message, _ *types.Meta) {
|
||||
|
|
|
|||
|
|
@ -30,7 +30,8 @@ func main() {
|
|||
consumer := queue.NewMessageConsumer(
|
||||
GROUP_EVENTS,
|
||||
[]string{
|
||||
env.String("TOPIC_RAW"),
|
||||
env.String("TOPIC_RAW_WEB"),
|
||||
env.String("TOPIC_RAW_IOS"),
|
||||
},
|
||||
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
|
||||
lastTs = meta.Timestamp
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ import (
|
|||
|
||||
func sendAssetForCache(sessionID uint64, baseURL string, relativeURL string) {
|
||||
if fullURL, cacheable := assets.GetFullCachableURL(baseURL, relativeURL); cacheable {
|
||||
producer.Produce(TOPIC_TRIGGER, sessionID, messages.Encode(&messages.AssetCache{
|
||||
producer.Produce(TOPIC_CACHE, sessionID, messages.Encode(&messages.AssetCache{
|
||||
URL: fullURL,
|
||||
}))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,126 +1,17 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
gzip "github.com/klauspost/pgzip"
|
||||
|
||||
"openreplay/backend/pkg/db/postgres"
|
||||
"openreplay/backend/pkg/token"
|
||||
. "openreplay/backend/pkg/messages"
|
||||
)
|
||||
|
||||
const JSON_SIZE_LIMIT int64 = 1e3 // 1Kb
|
||||
|
||||
func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
|
||||
type request struct {
|
||||
Token string `json:"token"`
|
||||
UserUUID *string `json:"userUUID"`
|
||||
RevID string `json:"revID"`
|
||||
Timestamp uint64 `json:"timestamp"`
|
||||
TrackerVersion string `json:"trackerVersion"`
|
||||
IsSnippet bool `json:"isSnippet"`
|
||||
DeviceMemory uint64 `json:"deviceMemory"`
|
||||
JsHeapSizeLimit uint64 `json:"jsHeapSizeLimit"`
|
||||
ProjectKey *string `json:"projectKey"`
|
||||
Reset bool `json:"reset"`
|
||||
}
|
||||
type response struct {
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Delay int64 `json:"delay"`
|
||||
Token string `json:"token"`
|
||||
UserUUID string `json:"userUUID"`
|
||||
SessionID string `json:"sessionID"`
|
||||
BeaconSizeLimit int64 `json:"beaconSizeLimit"`
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
req := &request{}
|
||||
body := http.MaxBytesReader(w, r.Body, JSON_SIZE_LIMIT) // what if Body == nil?? // use r.ContentLength to return specific error?
|
||||
//defer body.Close()
|
||||
if err := json.NewDecoder(body).Decode(req); err != nil {
|
||||
responseWithError(w, http.StatusBadRequest, err)
|
||||
return
|
||||
}
|
||||
|
||||
if req.ProjectKey == nil {
|
||||
responseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"))
|
||||
return
|
||||
}
|
||||
|
||||
p, err := pgconn.GetProjectByKey(*req.ProjectKey)
|
||||
if err != nil {
|
||||
if postgres.IsNoRowsErr(err) {
|
||||
responseWithError(w, http.StatusNotFound, errors.New("Project doesn't exist or is not active"))
|
||||
} else {
|
||||
responseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
userUUID := getUUID(req.UserUUID)
|
||||
tokenData, err := tokenizer.Parse(req.Token)
|
||||
if err != nil || req.Reset { // Starting the new one
|
||||
dice := byte(rand.Intn(100)) // [0, 100)
|
||||
if dice >= p.SampleRate {
|
||||
responseWithError(w, http.StatusForbidden, errors.New("cancel"))
|
||||
return
|
||||
}
|
||||
|
||||
ua := uaParser.ParseFromHTTPRequest(r)
|
||||
if ua == nil {
|
||||
responseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
|
||||
return
|
||||
}
|
||||
sessionID, err := flaker.Compose(uint64(startTime.UnixNano() / 1e6))
|
||||
if err != nil {
|
||||
responseWithError(w, http.StatusInternalServerError, err)
|
||||
return
|
||||
}
|
||||
// TODO: if EXPIRED => send message for two sessions association
|
||||
expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond)
|
||||
tokenData = &token.TokenData{sessionID, expTime.UnixNano() / 1e6}
|
||||
|
||||
country := geoIP.ExtractISOCodeFromHTTPRequest(r)
|
||||
producer.Produce(TOPIC_RAW, tokenData.ID, Encode(&SessionStart{
|
||||
Timestamp: req.Timestamp,
|
||||
ProjectID: uint64(p.ProjectID),
|
||||
TrackerVersion: req.TrackerVersion,
|
||||
RevID: req.RevID,
|
||||
UserUUID: userUUID,
|
||||
UserAgent: r.Header.Get("User-Agent"),
|
||||
UserOS: ua.OS,
|
||||
UserOSVersion: ua.OSVersion,
|
||||
UserBrowser: ua.Browser,
|
||||
UserBrowserVersion: ua.BrowserVersion,
|
||||
UserDevice: ua.Device,
|
||||
UserDeviceType: ua.DeviceType,
|
||||
UserCountry: country,
|
||||
UserDeviceMemorySize: req.DeviceMemory,
|
||||
UserDeviceHeapSize: req.JsHeapSizeLimit,
|
||||
}))
|
||||
}
|
||||
|
||||
//delayDuration := time.Now().Sub(startTime)
|
||||
responseWithJSON(w, &response{
|
||||
//Timestamp: startTime.UnixNano() / 1e6,
|
||||
//Delay: delayDuration.Nanoseconds() / 1e6,
|
||||
Token: tokenizer.Compose(*tokenData),
|
||||
UserUUID: userUUID,
|
||||
SessionID: strconv.FormatUint(tokenData.ID, 10),
|
||||
BeaconSizeLimit: BEACON_SIZE_LIMIT,
|
||||
})
|
||||
}
|
||||
|
||||
func pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64) {
|
||||
func pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64, topicName string) {
|
||||
body := http.MaxBytesReader(w, r.Body, BEACON_SIZE_LIMIT)
|
||||
//defer body.Close()
|
||||
var reader io.ReadCloser
|
||||
|
|
@ -145,148 +36,6 @@ func pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64) {
|
|||
responseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
|
||||
return
|
||||
}
|
||||
producer.Produce(TOPIC_RAW, sessionID, buf) // What if not able to send?
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
func pushMessagesHandler(w http.ResponseWriter, r *http.Request) {
|
||||
sessionData, err := tokenizer.ParseFromHTTPRequest(r)
|
||||
if err != nil {
|
||||
responseWithError(w, http.StatusUnauthorized, err)
|
||||
return
|
||||
}
|
||||
pushMessages(w, r, sessionData.ID)
|
||||
}
|
||||
|
||||
func pushMessagesSeparatelyHandler(w http.ResponseWriter, r *http.Request) {
|
||||
sessionData, err := tokenizer.ParseFromHTTPRequest(r)
|
||||
if err != nil {
|
||||
responseWithError(w, http.StatusUnauthorized, err)
|
||||
return
|
||||
}
|
||||
body := http.MaxBytesReader(w, r.Body, BEACON_SIZE_LIMIT)
|
||||
//defer body.Close()
|
||||
buf, err := ioutil.ReadAll(body)
|
||||
if err != nil {
|
||||
responseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
|
||||
return
|
||||
}
|
||||
//log.Printf("Sending batch...")
|
||||
//startTime := time.Now()
|
||||
|
||||
// analyticsMessages := make([]Message, 0, 200)
|
||||
|
||||
rewritenBuf, err := RewriteBatch(buf, func(msg Message) Message {
|
||||
switch m := msg.(type) {
|
||||
case *SetNodeAttributeURLBased:
|
||||
if m.Name == "src" || m.Name == "href" {
|
||||
msg = &SetNodeAttribute{
|
||||
ID: m.ID,
|
||||
Name: m.Name,
|
||||
Value: handleURL(sessionData.ID, m.BaseURL, m.Value),
|
||||
}
|
||||
} else if m.Name == "style" {
|
||||
msg = &SetNodeAttribute{
|
||||
ID: m.ID,
|
||||
Name: m.Name,
|
||||
Value: handleCSS(sessionData.ID, m.BaseURL, m.Value),
|
||||
}
|
||||
}
|
||||
case *SetCSSDataURLBased:
|
||||
msg = &SetCSSData{
|
||||
ID: m.ID,
|
||||
Data: handleCSS(sessionData.ID, m.BaseURL, m.Data),
|
||||
}
|
||||
case *CSSInsertRuleURLBased:
|
||||
msg = &CSSInsertRule{
|
||||
ID: m.ID,
|
||||
Index: m.Index,
|
||||
Rule: handleCSS(sessionData.ID, m.BaseURL, m.Rule),
|
||||
}
|
||||
}
|
||||
|
||||
// switch msg.(type) {
|
||||
// case *BatchMeta, // TODO: watchout! Meta().Index'es are changed here (though it is still unique for the topic-session pair)
|
||||
// *SetPageLocation,
|
||||
// *PageLoadTiming,
|
||||
// *PageRenderTiming,
|
||||
// *PerformanceTrack,
|
||||
// *SetInputTarget,
|
||||
// *SetInputValue,
|
||||
// *MouseClick,
|
||||
// *RawErrorEvent,
|
||||
// *JSException,
|
||||
// *ResourceTiming,
|
||||
// *RawCustomEvent,
|
||||
// *CustomIssue,
|
||||
// *Fetch,
|
||||
// *StateAction,
|
||||
// *GraphQL,
|
||||
// *CreateElementNode,
|
||||
// *CreateTextNode,
|
||||
// *RemoveNode,
|
||||
// *CreateDocument,
|
||||
// *RemoveNodeAttribute,
|
||||
// *MoveNode,
|
||||
// *SetCSSData,
|
||||
// *CSSInsertRule,
|
||||
// *CSSDeleteRule:
|
||||
// analyticsMessages = append(analyticsMessages, msg)
|
||||
//}
|
||||
|
||||
return msg
|
||||
})
|
||||
if err != nil {
|
||||
responseWithError(w, http.StatusForbidden, err)
|
||||
return
|
||||
}
|
||||
producer.Produce(TOPIC_RAW, sessionData.ID, rewritenBuf)
|
||||
//producer.Produce(TOPIC_ANALYTICS, sessionData.ID, WriteBatch(analyticsMessages))
|
||||
//duration := time.Now().Sub(startTime)
|
||||
//log.Printf("Sended batch within %v nsec; %v nsek/byte", duration.Nanoseconds(), duration.Nanoseconds()/int64(len(buf)))
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
func notStartedHandler(w http.ResponseWriter, r *http.Request) {
|
||||
type request struct {
|
||||
ProjectKey *string `json:"projectKey"`
|
||||
TrackerVersion string `json:"trackerVersion"`
|
||||
DoNotTrack bool `json:"DoNotTrack"`
|
||||
// RevID string `json:"revID"`
|
||||
}
|
||||
req := &request{}
|
||||
body := http.MaxBytesReader(w, r.Body, JSON_SIZE_LIMIT)
|
||||
defer body.Close()
|
||||
if err := json.NewDecoder(body).Decode(req); err != nil {
|
||||
responseWithError(w, http.StatusBadRequest, err)
|
||||
return
|
||||
}
|
||||
if req.ProjectKey == nil {
|
||||
responseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"))
|
||||
return
|
||||
}
|
||||
ua := uaParser.ParseFromHTTPRequest(r) // TODO?: insert anyway
|
||||
if ua == nil {
|
||||
responseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
|
||||
return
|
||||
}
|
||||
country := geoIP.ExtractISOCodeFromHTTPRequest(r)
|
||||
err := pgconn.InsertUnstartedSession(postgres.UnstartedSession{
|
||||
ProjectKey: *req.ProjectKey,
|
||||
TrackerVersion: req.TrackerVersion,
|
||||
DoNotTrack: req.DoNotTrack,
|
||||
Platform: "web",
|
||||
UserAgent: r.Header.Get("User-Agent"),
|
||||
UserOS: ua.OS,
|
||||
UserOSVersion: ua.OSVersion,
|
||||
UserBrowser: ua.Browser,
|
||||
UserBrowserVersion: ua.BrowserVersion,
|
||||
UserDevice: ua.Device,
|
||||
UserDeviceType: ua.DeviceType,
|
||||
UserCountry: country,
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("Unable to insert Unstarted Session: %v\n", err)
|
||||
}
|
||||
producer.Produce(topicName, sessionID, buf) // What if not able to send?
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -97,7 +97,7 @@ func startSessionHandlerIOS(w http.ResponseWriter, r *http.Request) {
|
|||
country := geoIP.ExtractISOCodeFromHTTPRequest(r)
|
||||
|
||||
// The difference with web is mostly here:
|
||||
producer.Produce(TOPIC_RAW, tokenData.ID, Encode(&IOSSessionStart{
|
||||
producer.Produce(TOPIC_RAW_IOS, tokenData.ID, Encode(&IOSSessionStart{
|
||||
Timestamp: req.Timestamp,
|
||||
ProjectID: uint64(p.ProjectID),
|
||||
TrackerVersion: req.TrackerVersion,
|
||||
|
|
@ -127,18 +127,29 @@ func startSessionHandlerIOS(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
|
||||
func pushLateMessagesHandler(w http.ResponseWriter, r *http.Request) {
|
||||
func pushMessagesHandlerIOS(w http.ResponseWriter, r *http.Request) {
|
||||
sessionData, err := tokenizer.ParseFromHTTPRequest(r)
|
||||
if err != nil {
|
||||
responseWithError(w, http.StatusUnauthorized, err)
|
||||
return
|
||||
}
|
||||
pushMessages(w, r, sessionData.ID, TOPIC_RAW_IOS)
|
||||
}
|
||||
|
||||
|
||||
|
||||
func pushLateMessagesHandlerIOS(w http.ResponseWriter, r *http.Request) {
|
||||
sessionData, err := tokenizer.ParseFromHTTPRequest(r)
|
||||
if err != nil && err != token.EXPIRED {
|
||||
responseWithError(w, http.StatusUnauthorized, err)
|
||||
return
|
||||
}
|
||||
// Check timestamps here?
|
||||
pushMessages(w, r, sessionData.ID)
|
||||
pushMessages(w, r, sessionData.ID,TOPIC_RAW_IOS)
|
||||
}
|
||||
|
||||
|
||||
func iosImagesUploadHandler(w http.ResponseWriter, r *http.Request) {
|
||||
func imagesUploadHandlerIOS(w http.ResponseWriter, r *http.Request) {
|
||||
sessionData, err := tokenizer.ParseFromHTTPRequest(r)
|
||||
if err != nil { // Should accept expired token?
|
||||
responseWithError(w, http.StatusUnauthorized, err)
|
||||
|
|
@ -168,13 +179,12 @@ func iosImagesUploadHandler(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
for _, fileHeaderList := range r.MultipartForm.File {
|
||||
for _, fileHeader := range fileHeaderList {
|
||||
file, err := fileHeader.Open() //TODO: mime type from header
|
||||
file, err := fileHeader.Open()
|
||||
if err != nil {
|
||||
continue // TODO: send server error or accumulate successful files
|
||||
}
|
||||
key := prefix + fileHeader.Filename
|
||||
log.Printf("Uploading ios screen: %v", key)
|
||||
go func() {
|
||||
go func() { //TODO: mime type from header
|
||||
if err := s3.Upload(file, key, "image/jpeg", false); err != nil {
|
||||
log.Printf("Upload ios screen error. %v", err)
|
||||
}
|
||||
|
|
|
|||
249
backend/services/http/handlers_web.go
Normal file
249
backend/services/http/handlers_web.go
Normal file
|
|
@ -0,0 +1,249 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"openreplay/backend/pkg/db/postgres"
|
||||
"openreplay/backend/pkg/token"
|
||||
. "openreplay/backend/pkg/messages"
|
||||
)
|
||||
|
||||
func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
|
||||
type request struct {
|
||||
Token string `json:"token"`
|
||||
UserUUID *string `json:"userUUID"`
|
||||
RevID string `json:"revID"`
|
||||
Timestamp uint64 `json:"timestamp"`
|
||||
TrackerVersion string `json:"trackerVersion"`
|
||||
IsSnippet bool `json:"isSnippet"`
|
||||
DeviceMemory uint64 `json:"deviceMemory"`
|
||||
JsHeapSizeLimit uint64 `json:"jsHeapSizeLimit"`
|
||||
ProjectKey *string `json:"projectKey"`
|
||||
Reset bool `json:"reset"`
|
||||
}
|
||||
type response struct {
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Delay int64 `json:"delay"`
|
||||
Token string `json:"token"`
|
||||
UserUUID string `json:"userUUID"`
|
||||
SessionID string `json:"sessionID"`
|
||||
BeaconSizeLimit int64 `json:"beaconSizeLimit"`
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
req := &request{}
|
||||
body := http.MaxBytesReader(w, r.Body, JSON_SIZE_LIMIT) // what if Body == nil?? // use r.ContentLength to return specific error?
|
||||
//defer body.Close()
|
||||
if err := json.NewDecoder(body).Decode(req); err != nil {
|
||||
responseWithError(w, http.StatusBadRequest, err)
|
||||
return
|
||||
}
|
||||
|
||||
if req.ProjectKey == nil {
|
||||
responseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"))
|
||||
return
|
||||
}
|
||||
|
||||
p, err := pgconn.GetProjectByKey(*req.ProjectKey)
|
||||
if err != nil {
|
||||
if postgres.IsNoRowsErr(err) {
|
||||
responseWithError(w, http.StatusNotFound, errors.New("Project doesn't exist or capture limit has been reached"))
|
||||
} else {
|
||||
responseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
userUUID := getUUID(req.UserUUID)
|
||||
tokenData, err := tokenizer.Parse(req.Token)
|
||||
if err != nil || req.Reset { // Starting the new one
|
||||
dice := byte(rand.Intn(100)) // [0, 100)
|
||||
if dice >= p.SampleRate {
|
||||
responseWithError(w, http.StatusForbidden, errors.New("cancel"))
|
||||
return
|
||||
}
|
||||
|
||||
ua := uaParser.ParseFromHTTPRequest(r)
|
||||
if ua == nil {
|
||||
responseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
|
||||
return
|
||||
}
|
||||
sessionID, err := flaker.Compose(uint64(startTime.UnixNano() / 1e6))
|
||||
if err != nil {
|
||||
responseWithError(w, http.StatusInternalServerError, err)
|
||||
return
|
||||
}
|
||||
// TODO: if EXPIRED => send message for two sessions association
|
||||
expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond)
|
||||
tokenData = &token.TokenData{sessionID, expTime.UnixNano() / 1e6}
|
||||
|
||||
country := geoIP.ExtractISOCodeFromHTTPRequest(r)
|
||||
producer.Produce(TOPIC_RAW_WEB, tokenData.ID, Encode(&SessionStart{
|
||||
Timestamp: req.Timestamp,
|
||||
ProjectID: uint64(p.ProjectID),
|
||||
TrackerVersion: req.TrackerVersion,
|
||||
RevID: req.RevID,
|
||||
UserUUID: userUUID,
|
||||
UserAgent: r.Header.Get("User-Agent"),
|
||||
UserOS: ua.OS,
|
||||
UserOSVersion: ua.OSVersion,
|
||||
UserBrowser: ua.Browser,
|
||||
UserBrowserVersion: ua.BrowserVersion,
|
||||
UserDevice: ua.Device,
|
||||
UserDeviceType: ua.DeviceType,
|
||||
UserCountry: country,
|
||||
UserDeviceMemorySize: req.DeviceMemory,
|
||||
UserDeviceHeapSize: req.JsHeapSizeLimit,
|
||||
}))
|
||||
}
|
||||
|
||||
//delayDuration := time.Now().Sub(startTime)
|
||||
responseWithJSON(w, &response{
|
||||
//Timestamp: startTime.UnixNano() / 1e6,
|
||||
//Delay: delayDuration.Nanoseconds() / 1e6,
|
||||
Token: tokenizer.Compose(*tokenData),
|
||||
UserUUID: userUUID,
|
||||
SessionID: strconv.FormatUint(tokenData.ID, 10),
|
||||
BeaconSizeLimit: BEACON_SIZE_LIMIT,
|
||||
})
|
||||
}
|
||||
|
||||
func pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request) {
|
||||
sessionData, err := tokenizer.ParseFromHTTPRequest(r)
|
||||
if err != nil {
|
||||
responseWithError(w, http.StatusUnauthorized, err)
|
||||
return
|
||||
}
|
||||
body := http.MaxBytesReader(w, r.Body, BEACON_SIZE_LIMIT)
|
||||
//defer body.Close()
|
||||
buf, err := ioutil.ReadAll(body)
|
||||
if err != nil {
|
||||
responseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
|
||||
return
|
||||
}
|
||||
//log.Printf("Sending batch...")
|
||||
//startTime := time.Now()
|
||||
|
||||
// analyticsMessages := make([]Message, 0, 200)
|
||||
|
||||
rewritenBuf, err := RewriteBatch(buf, func(msg Message) Message {
|
||||
switch m := msg.(type) {
|
||||
case *SetNodeAttributeURLBased:
|
||||
if m.Name == "src" || m.Name == "href" {
|
||||
msg = &SetNodeAttribute{
|
||||
ID: m.ID,
|
||||
Name: m.Name,
|
||||
Value: handleURL(sessionData.ID, m.BaseURL, m.Value),
|
||||
}
|
||||
} else if m.Name == "style" {
|
||||
msg = &SetNodeAttribute{
|
||||
ID: m.ID,
|
||||
Name: m.Name,
|
||||
Value: handleCSS(sessionData.ID, m.BaseURL, m.Value),
|
||||
}
|
||||
}
|
||||
case *SetCSSDataURLBased:
|
||||
msg = &SetCSSData{
|
||||
ID: m.ID,
|
||||
Data: handleCSS(sessionData.ID, m.BaseURL, m.Data),
|
||||
}
|
||||
case *CSSInsertRuleURLBased:
|
||||
msg = &CSSInsertRule{
|
||||
ID: m.ID,
|
||||
Index: m.Index,
|
||||
Rule: handleCSS(sessionData.ID, m.BaseURL, m.Rule),
|
||||
}
|
||||
}
|
||||
|
||||
// switch msg.(type) {
|
||||
// case *BatchMeta, // TODO: watchout! Meta().Index'es are changed here (though it is still unique for the topic-session pair)
|
||||
// *SetPageLocation,
|
||||
// *PageLoadTiming,
|
||||
// *PageRenderTiming,
|
||||
// *PerformanceTrack,
|
||||
// *SetInputTarget,
|
||||
// *SetInputValue,
|
||||
// *MouseClick,
|
||||
// *RawErrorEvent,
|
||||
// *JSException,
|
||||
// *ResourceTiming,
|
||||
// *RawCustomEvent,
|
||||
// *CustomIssue,
|
||||
// *Fetch,
|
||||
// *StateAction,
|
||||
// *GraphQL,
|
||||
// *CreateElementNode,
|
||||
// *CreateTextNode,
|
||||
// *RemoveNode,
|
||||
// *CreateDocument,
|
||||
// *RemoveNodeAttribute,
|
||||
// *MoveNode,
|
||||
// *SetCSSData,
|
||||
// *CSSInsertRule,
|
||||
// *CSSDeleteRule:
|
||||
// analyticsMessages = append(analyticsMessages, msg)
|
||||
//}
|
||||
|
||||
return msg
|
||||
})
|
||||
if err != nil {
|
||||
responseWithError(w, http.StatusForbidden, err)
|
||||
return
|
||||
}
|
||||
producer.Produce(TOPIC_RAW_WEB, sessionData.ID, rewritenBuf)
|
||||
//producer.Produce(TOPIC_ANALYTICS, sessionData.ID, WriteBatch(analyticsMessages))
|
||||
//duration := time.Now().Sub(startTime)
|
||||
//log.Printf("Sended batch within %v nsec; %v nsek/byte", duration.Nanoseconds(), duration.Nanoseconds()/int64(len(buf)))
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
func notStartedHandlerWeb(w http.ResponseWriter, r *http.Request) {
|
||||
type request struct {
|
||||
ProjectKey *string `json:"projectKey"`
|
||||
TrackerVersion string `json:"trackerVersion"`
|
||||
DoNotTrack bool `json:"DoNotTrack"`
|
||||
// RevID string `json:"revID"`
|
||||
}
|
||||
req := &request{}
|
||||
body := http.MaxBytesReader(w, r.Body, JSON_SIZE_LIMIT)
|
||||
defer body.Close()
|
||||
if err := json.NewDecoder(body).Decode(req); err != nil {
|
||||
responseWithError(w, http.StatusBadRequest, err)
|
||||
return
|
||||
}
|
||||
if req.ProjectKey == nil {
|
||||
responseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"))
|
||||
return
|
||||
}
|
||||
ua := uaParser.ParseFromHTTPRequest(r) // TODO?: insert anyway
|
||||
if ua == nil {
|
||||
responseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
|
||||
return
|
||||
}
|
||||
country := geoIP.ExtractISOCodeFromHTTPRequest(r)
|
||||
err := pgconn.InsertUnstartedSession(postgres.UnstartedSession{
|
||||
ProjectKey: *req.ProjectKey,
|
||||
TrackerVersion: req.TrackerVersion,
|
||||
DoNotTrack: req.DoNotTrack,
|
||||
Platform: "web",
|
||||
UserAgent: r.Header.Get("User-Agent"),
|
||||
UserOS: ua.OS,
|
||||
UserOSVersion: ua.OSVersion,
|
||||
UserBrowser: ua.Browser,
|
||||
UserBrowserVersion: ua.BrowserVersion,
|
||||
UserDevice: ua.Device,
|
||||
UserDeviceType: ua.DeviceType,
|
||||
UserCountry: country,
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("Unable to insert Unstarted Session: %v\n", err)
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
|
@ -34,10 +34,11 @@ var geoIP *geoip.GeoIP
|
|||
var tokenizer *token.Tokenizer
|
||||
var s3 *storage.S3
|
||||
|
||||
var TOPIC_RAW string
|
||||
var TOPIC_RAW_WEB string
|
||||
var TOPIC_RAW_IOS string
|
||||
var TOPIC_CACHE string
|
||||
var TOPIC_TRIGGER string
|
||||
var TOPIC_ANALYTICS string
|
||||
// var kafkaTopicEvents string
|
||||
//var TOPIC_ANALYTICS string
|
||||
var CACHE_ASSESTS bool
|
||||
var BEACON_SIZE_LIMIT int64
|
||||
|
||||
|
|
@ -46,9 +47,11 @@ func main() {
|
|||
|
||||
producer = queue.NewProducer()
|
||||
defer producer.Close(15000)
|
||||
TOPIC_RAW = env.String("TOPIC_RAW")
|
||||
TOPIC_RAW_WEB = env.String("TOPIC_RAW_WEB")
|
||||
TOPIC_RAW_IOS = env.String("TOPIC_RAW_IOS")
|
||||
TOPIC_CACHE = env.String("TOPIC_CACHE")
|
||||
TOPIC_TRIGGER = env.String("TOPIC_TRIGGER")
|
||||
TOPIC_ANALYTICS = env.String("TOPIC_ANALYTICS")
|
||||
//TOPIC_ANALYTICS = env.String("TOPIC_ANALYTICS")
|
||||
rewriter = assets.NewRewriter(env.String("ASSETS_ORIGIN"))
|
||||
pgconn = cache.NewPGCache(postgres.NewConn(env.String("POSTGRES_STRING")), 1000 * 60 * 20)
|
||||
defer pgconn.Close()
|
||||
|
|
@ -85,7 +88,7 @@ func main() {
|
|||
case "/v1/web/not-started":
|
||||
switch r.Method {
|
||||
case http.MethodPost:
|
||||
notStartedHandler(w, r)
|
||||
notStartedHandlerWeb(w, r)
|
||||
default:
|
||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||
}
|
||||
|
|
@ -99,7 +102,7 @@ func main() {
|
|||
case "/v1/web/i":
|
||||
switch r.Method {
|
||||
case http.MethodPost:
|
||||
pushMessagesSeparatelyHandler(w, r)
|
||||
pushMessagesHandlerWeb(w, r)
|
||||
default:
|
||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||
}
|
||||
|
|
@ -113,21 +116,21 @@ func main() {
|
|||
case "/v1/ios/i":
|
||||
switch r.Method {
|
||||
case http.MethodPost:
|
||||
pushMessagesHandler(w, r)
|
||||
pushMessagesHandlerIOS(w, r)
|
||||
default:
|
||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||
}
|
||||
case "/v1/ios/late":
|
||||
switch r.Method {
|
||||
case http.MethodPost:
|
||||
pushLateMessagesHandler(w, r)
|
||||
pushLateMessagesHandlerIOS(w, r)
|
||||
default:
|
||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||
}
|
||||
case "/v1/ios/images":
|
||||
switch r.Method {
|
||||
case http.MethodPost:
|
||||
iosImagesUploadHandler(w, r)
|
||||
imagesUploadHandlerIOS(w, r)
|
||||
default:
|
||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ import (
|
|||
|
||||
func main() {
|
||||
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
|
||||
TOPIC_RAW := env.String("TOPIC_RAW")
|
||||
TOPIC_RAW := env.String("TOPIC_RAW_WEB")
|
||||
POSTGRES_STRING := env.String("POSTGRES_STRING")
|
||||
|
||||
pg := postgres.NewConn(POSTGRES_STRING)
|
||||
|
|
|
|||
|
|
@ -10,9 +10,9 @@ import (
|
|||
"syscall"
|
||||
|
||||
"openreplay/backend/pkg/env"
|
||||
"openreplay/backend/pkg/messages"
|
||||
"openreplay/backend/pkg/queue"
|
||||
"openreplay/backend/pkg/queue/types"
|
||||
. "openreplay/backend/pkg/messages"
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -27,16 +27,17 @@ func main() {
|
|||
consumer := queue.NewMessageConsumer(
|
||||
env.String("GROUP_SINK"),
|
||||
[]string{
|
||||
env.String("TOPIC_RAW"),
|
||||
env.String("TOPIC_RAW_WEB"),
|
||||
env.String("TOPIC_RAW_IOS")
|
||||
},
|
||||
func(sessionID uint64, message messages.Message, _ *types.Meta) {
|
||||
//typeID, err := messages.GetMessageTypeID(value)
|
||||
func(sessionID uint64, message Message, _ *types.Meta) {
|
||||
//typeID, err := GetMessageTypeID(value)
|
||||
// if err != nil {
|
||||
// log.Printf("Message type decoding error: %v", err)
|
||||
// return
|
||||
// }
|
||||
typeID := message.Meta().TypeID
|
||||
if !messages.IsReplayerType(typeID) {
|
||||
if !IsReplayerType(typeID) {
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -44,7 +45,7 @@ func main() {
|
|||
|
||||
value := message.Encode()
|
||||
var data []byte
|
||||
if messages.IsIOSType(typeID) {
|
||||
if IsIOSType(typeID) {
|
||||
data = value
|
||||
} else {
|
||||
data = make([]byte, len(value)+8)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue