diff --git a/.github/workflows/workers.yaml b/.github/workflows/workers.yaml index 49fd0948b..7ce78ad6f 100644 --- a/.github/workflows/workers.yaml +++ b/.github/workflows/workers.yaml @@ -49,7 +49,7 @@ jobs: # { - git diff --name-only HEAD HEAD~1 | grep backend/services | grep -vE ^ee/ | cut -d '/' -f3 + git diff --name-only HEAD HEAD~1 | grep -E "backend/cmd|backend/services" | grep -vE ^ee/ | cut -d '/' -f3 git diff --name-only HEAD HEAD~1 | grep backend/pkg | grep -vE ^ee/ | cut -d '/' -f3 | uniq | while read -r pkg_name ; do grep -rl "pkg/$pkg_name" backend/services | cut -d '/' -f3 diff --git a/backend/build.sh b/backend/build.sh old mode 100644 new mode 100755 index c3c40dd33..8e3bea86a --- a/backend/build.sh +++ b/backend/build.sh @@ -18,6 +18,28 @@ check_prereq() { return } + +function build_service() { + image="$1" + echo "BUILDING $image" + case "$image" in + http) + echo build http + docker build -t ${DOCKER_REPO:-'local'}/$image:${git_sha1} --build-arg SERVICE_NAME=$image -f ./cmd/Dockerfile . + [[ $PUSH_IMAGE -eq 1 ]] && { + docker push ${DOCKER_REPO:-'local'}/$image:${git_sha1} + } + ;; + *) + docker build -t ${DOCKER_REPO:-'local'}/$image:${git_sha1} --build-arg SERVICE_NAME=$image . + [[ $PUSH_IMAGE -eq 1 ]] && { + docker push ${DOCKER_REPO:-'local'}/$image:${git_sha1} + } + ;; + esac + return +} + function build_api(){ # Copy enterprise code [[ $1 == "ee" ]] && { @@ -25,20 +47,12 @@ function build_api(){ ee="true" } [[ $2 != "" ]] && { - image="$2" - docker build -t ${DOCKER_REPO:-'local'}/$image:${git_sha1} --build-arg SERVICE_NAME=$image . - [[ $PUSH_IMAGE -eq 1 ]] && { - docker push ${DOCKER_REPO:-'local'}/$image:${git_sha1} - } - echo "build completed for http" + build_service $2 return } for image in $(ls services); do - docker build -t ${DOCKER_REPO:-'local'}/$image:${git_sha1} --build-arg SERVICE_NAME=$image . - [[ $PUSH_IMAGE -eq 1 ]] && { - docker push ${DOCKER_REPO:-'local'}/$image:${git_sha1} - } + build_service $image echo "::set-output name=image::${DOCKER_REPO:-'local'}/$image:${git_sha1}" done echo "backend build completed" diff --git a/backend/cmd/Dockerfile b/backend/cmd/Dockerfile new file mode 100644 index 000000000..e2f611afa --- /dev/null +++ b/backend/cmd/Dockerfile @@ -0,0 +1,60 @@ +FROM golang:1.18-alpine3.15 AS prepare + +RUN apk add --no-cache git openssh openssl-dev pkgconf gcc g++ make libc-dev bash + +WORKDIR /root + +COPY go.mod . +COPY go.sum . +RUN go mod download + + +FROM prepare AS build +COPY pkg pkg +COPY services services +COPY internal internal +COPY cmd cmd + +ARG SERVICE_NAME +RUN CGO_ENABLED=1 GOOS=linux GOARCH=amd64 go build -o service -tags musl openreplay/backend/cmd/$SERVICE_NAME + +FROM alpine +RUN apk add --no-cache ca-certificates + +ENV TZ=UTC \ + FS_ULIMIT=1000 \ + FS_DIR=/mnt/efs \ + MAXMINDDB_FILE=/root/geoip.mmdb \ + UAPARSER_FILE=/root/regexes.yaml \ + HTTP_PORT=80 \ + BEACON_SIZE_LIMIT=7000000 \ + KAFKA_USE_SSL=true \ + KAFKA_MAX_POLL_INTERVAL_MS=400000 \ + REDIS_STREAMS_MAX_LEN=3000 \ + TOPIC_RAW_WEB=raw \ + TOPIC_RAW_IOS=raw-ios \ + TOPIC_CACHE=cache \ + TOPIC_ANALYTICS=analytics \ + TOPIC_TRIGGER=trigger \ + GROUP_SINK=sink \ + GROUP_STORAGE=storage \ + GROUP_DB=db \ + GROUP_ENDER=ender \ + GROUP_CACHE=cache \ + AWS_REGION_WEB=eu-central-1 \ + AWS_REGION_IOS=eu-west-1 \ + AWS_REGION_ASSETS=eu-central-1 \ + CACHE_ASSETS=true \ + ASSETS_SIZE_LIMIT=6291456 \ + FS_CLEAN_HRS=72 \ + LOG_QUEUE_STATS_INTERVAL_SEC=60 + + +ARG SERVICE_NAME +RUN if [ "$SERVICE_NAME" = "http" ]; then \ + wget https://raw.githubusercontent.com/ua-parser/uap-core/master/regexes.yaml -O "$UAPARSER_FILE" &&\ + wget https://static.openreplay.com/geoip/GeoLite2-Country.mmdb -O "$MAXMINDDB_FILE"; fi + + +COPY --from=build /root/service /root/service +ENTRYPOINT /root/service diff --git a/backend/services/http/README.md b/backend/cmd/http/README.md similarity index 100% rename from backend/services/http/README.md rename to backend/cmd/http/README.md diff --git a/backend/cmd/http/main.go b/backend/cmd/http/main.go new file mode 100644 index 000000000..541baab62 --- /dev/null +++ b/backend/cmd/http/main.go @@ -0,0 +1,62 @@ +package main + +import ( + "log" + "openreplay/backend/internal/config" + "openreplay/backend/internal/router" + "openreplay/backend/internal/server" + "openreplay/backend/internal/services" + "openreplay/backend/pkg/db/cache" + "openreplay/backend/pkg/db/postgres" + "openreplay/backend/pkg/pprof" + "openreplay/backend/pkg/queue" + "os" + "os/signal" + "syscall" +) + +func main() { + log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) + pprof.StartProfilingServer() + + // Load configuration + cfg := config.New() + + // Connect to queue + producer := queue.NewProducer() + defer producer.Close(15000) + + // Connect to database + dbConn := cache.NewPGCache(postgres.NewConn(cfg.Postgres), 1000*60*20) + defer dbConn.Close() + + // Build all services + services := services.New(cfg, producer, dbConn) + + // Init server's routes + router, err := router.NewRouter(cfg, services) + if err != nil { + log.Fatalf("failed while creating engine: %s", err) + } + + // Init server + server, err := server.New(router.GetHandler(), cfg.HTTPHost, cfg.HTTPPort, cfg.HTTPTimeout) + if err != nil { + log.Fatalf("failed while creating server: %s", err) + } + + // Run server + go func() { + if err := server.Start(); err != nil { + log.Fatalf("Server error: %v\n", err) + } + }() + log.Printf("Server successfully started on port %v\n", cfg.HTTPPort) + + // Wait stop signal to shut down server gracefully + sigchan := make(chan os.Signal, 1) + signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) + <-sigchan + log.Printf("Shutting down the server\n") + server.Stop() +} diff --git a/backend/internal/assetscache/assets.go b/backend/internal/assetscache/assets.go new file mode 100644 index 000000000..1ef70b56c --- /dev/null +++ b/backend/internal/assetscache/assets.go @@ -0,0 +1,83 @@ +package assetscache + +import ( + "openreplay/backend/internal/config" + "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/queue/types" + "openreplay/backend/pkg/url/assets" +) + +type AssetsCache struct { + cfg *config.Config + rewriter *assets.Rewriter + producer types.Producer +} + +func New(cfg *config.Config, rewriter *assets.Rewriter, producer types.Producer) *AssetsCache { + return &AssetsCache{ + cfg: cfg, + rewriter: rewriter, + producer: producer, + } +} + +func (e *AssetsCache) ParseAssets(sessID uint64, msg messages.Message) messages.Message { + switch m := msg.(type) { + case *messages.SetNodeAttributeURLBased: + if m.Name == "src" || m.Name == "href" { + return &messages.SetNodeAttribute{ + ID: m.ID, + Name: m.Name, + Value: e.handleURL(sessID, m.BaseURL, m.Value), + } + } else if m.Name == "style" { + return &messages.SetNodeAttribute{ + ID: m.ID, + Name: m.Name, + Value: e.handleCSS(sessID, m.BaseURL, m.Value), + } + } + case *messages.SetCSSDataURLBased: + return &messages.SetCSSData{ + ID: m.ID, + Data: e.handleCSS(sessID, m.BaseURL, m.Data), + } + case *messages.CSSInsertRuleURLBased: + return &messages.CSSInsertRule{ + ID: m.ID, + Index: m.Index, + Rule: e.handleCSS(sessID, m.BaseURL, m.Rule), + } + } + return msg +} + +func (e *AssetsCache) sendAssetForCache(sessionID uint64, baseURL string, relativeURL string) { + if fullURL, cacheable := assets.GetFullCachableURL(baseURL, relativeURL); cacheable { + e.producer.Produce(e.cfg.TopicCache, sessionID, messages.Encode(&messages.AssetCache{ + URL: fullURL, + })) + } +} + +func (e *AssetsCache) sendAssetsForCacheFromCSS(sessionID uint64, baseURL string, css string) { + for _, u := range assets.ExtractURLsFromCSS(css) { // TODO: in one shot with rewriting + e.sendAssetForCache(sessionID, baseURL, u) + } +} + +func (e *AssetsCache) handleURL(sessionID uint64, baseURL string, url string) string { + if e.cfg.CacheAssets { + e.sendAssetForCache(sessionID, baseURL, url) + return e.rewriter.RewriteURL(sessionID, baseURL, url) + } + return assets.ResolveURL(baseURL, url) +} + +func (e *AssetsCache) handleCSS(sessionID uint64, baseURL string, css string) string { + if e.cfg.CacheAssets { + e.sendAssetsForCacheFromCSS(sessionID, baseURL, css) + return e.rewriter.RewriteCSS(sessionID, baseURL, css) + } + return assets.ResolveCSS(baseURL, css) +} diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go new file mode 100644 index 000000000..5b55ba346 --- /dev/null +++ b/backend/internal/config/config.go @@ -0,0 +1,50 @@ +package config + +import ( + "openreplay/backend/pkg/env" + "time" +) + +type Config struct { + HTTPHost string + HTTPPort string + HTTPTimeout time.Duration + TopicRawWeb string + TopicRawIOS string + TopicCache string + CacheAssets bool + BeaconSizeLimit int64 + JsonSizeLimit int64 + FileSizeLimit int64 + AssetsOrigin string + AWSRegion string + S3BucketIOSImages string + Postgres string + TokenSecret string + UAParserFile string + MaxMinDBFile string + WorkerID uint16 +} + +func New() *Config { + return &Config{ + HTTPHost: "", // empty by default + HTTPPort: env.String("HTTP_PORT"), + HTTPTimeout: time.Second * 60, + TopicRawWeb: env.String("TOPIC_RAW_WEB"), + TopicRawIOS: env.String("TOPIC_RAW_IOS"), + TopicCache: env.String("TOPIC_CACHE"), + CacheAssets: env.Bool("CACHE_ASSETS"), + BeaconSizeLimit: int64(env.Uint64("BEACON_SIZE_LIMIT")), + JsonSizeLimit: 1e3, // 1Kb + FileSizeLimit: 1e7, // 10Mb + AssetsOrigin: env.String("ASSETS_ORIGIN"), + AWSRegion: env.String("AWS_REGION"), + S3BucketIOSImages: env.String("S3_BUCKET_IOS_IMAGES"), + Postgres: env.String("POSTGRES_STRING"), + TokenSecret: env.String("TOKEN_SECRET"), + UAParserFile: env.String("UAPARSER_FILE"), + MaxMinDBFile: env.String("MAXMINDDB_FILE"), + WorkerID: env.WorkerID(), + } +} diff --git a/backend/services/http/geoip/geoip.go b/backend/internal/geoip/geoip.go similarity index 100% rename from backend/services/http/geoip/geoip.go rename to backend/internal/geoip/geoip.go diff --git a/backend/services/http/geoip/http.go b/backend/internal/geoip/http.go similarity index 100% rename from backend/services/http/geoip/http.go rename to backend/internal/geoip/http.go diff --git a/backend/services/http/ios-device.go b/backend/internal/ios/ios-device.go similarity index 99% rename from backend/services/http/ios-device.go rename to backend/internal/ios/ios-device.go index 6a09e5e07..8df33035b 100644 --- a/backend/services/http/ios-device.go +++ b/backend/internal/ios/ios-device.go @@ -1,4 +1,4 @@ -package main +package ios import ( "strings" diff --git a/backend/internal/router/handlers-ios.go b/backend/internal/router/handlers-ios.go new file mode 100644 index 000000000..50f92a6ad --- /dev/null +++ b/backend/internal/router/handlers-ios.go @@ -0,0 +1,172 @@ +package router + +import ( + "encoding/json" + "errors" + "log" + "math/rand" + "net/http" + "openreplay/backend/internal/ios" + "openreplay/backend/internal/uuid" + "strconv" + "time" + + "openreplay/backend/pkg/db/postgres" + . "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/token" +) + +func (e *Router) startSessionHandlerIOS(w http.ResponseWriter, r *http.Request) { + startTime := time.Now() + req := &StartIOSSessionRequest{} + + if r.Body == nil { + ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty")) + return + } + body := http.MaxBytesReader(w, r.Body, e.cfg.JsonSizeLimit) + defer body.Close() + + if err := json.NewDecoder(body).Decode(req); err != nil { + ResponseWithError(w, http.StatusBadRequest, err) + return + } + + if req.ProjectKey == nil { + ResponseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required")) + return + } + + p, err := e.services.Database.GetProjectByKey(*req.ProjectKey) + if err != nil { + if postgres.IsNoRowsErr(err) { + ResponseWithError(w, http.StatusNotFound, errors.New("Project doesn't exist or is not active")) + } else { + ResponseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging + } + return + } + userUUID := uuid.GetUUID(req.UserUUID) + tokenData, err := e.services.Tokenizer.Parse(req.Token) + + if err != nil { // Starting the new one + dice := byte(rand.Intn(100)) // [0, 100) + if dice >= p.SampleRate { + ResponseWithError(w, http.StatusForbidden, errors.New("cancel")) + return + } + + ua := e.services.UaParser.ParseFromHTTPRequest(r) + if ua == nil { + ResponseWithError(w, http.StatusForbidden, errors.New("browser not recognized")) + return + } + sessionID, err := e.services.Flaker.Compose(uint64(startTime.UnixMilli())) + if err != nil { + ResponseWithError(w, http.StatusInternalServerError, err) + return + } + // TODO: if EXPIRED => send message for two sessions association + expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond) + tokenData = &token.TokenData{sessionID, expTime.UnixMilli()} + + country := e.services.GeoIP.ExtractISOCodeFromHTTPRequest(r) + + // The difference with web is mostly here: + e.services.Producer.Produce(e.cfg.TopicRawIOS, tokenData.ID, Encode(&IOSSessionStart{ + Timestamp: req.Timestamp, + ProjectID: uint64(p.ProjectID), + TrackerVersion: req.TrackerVersion, + RevID: req.RevID, + UserUUID: userUUID, + UserOS: "IOS", + UserOSVersion: req.UserOSVersion, + UserDevice: ios.MapIOSDevice(req.UserDevice), + UserDeviceType: ios.GetIOSDeviceType(req.UserDevice), + UserCountry: country, + })) + } + + ResponseWithJSON(w, &StartIOSSessionResponse{ + Token: e.services.Tokenizer.Compose(*tokenData), + UserUUID: userUUID, + SessionID: strconv.FormatUint(tokenData.ID, 10), + BeaconSizeLimit: e.cfg.BeaconSizeLimit, + }) +} + +func (e *Router) pushMessagesHandlerIOS(w http.ResponseWriter, r *http.Request) { + sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r) + if err != nil { + ResponseWithError(w, http.StatusUnauthorized, err) + return + } + e.pushMessages(w, r, sessionData.ID, e.cfg.TopicRawIOS) +} + +func (e *Router) pushLateMessagesHandlerIOS(w http.ResponseWriter, r *http.Request) { + sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r) + if err != nil && err != token.EXPIRED { + ResponseWithError(w, http.StatusUnauthorized, err) + return + } + // Check timestamps here? + e.pushMessages(w, r, sessionData.ID, e.cfg.TopicRawIOS) +} + +func (e *Router) imagesUploadHandlerIOS(w http.ResponseWriter, r *http.Request) { + log.Printf("recieved imagerequest") + + sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r) + if err != nil { // Should accept expired token? + ResponseWithError(w, http.StatusUnauthorized, err) + return + } + + if r.Body == nil { + ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty")) + return + } + r.Body = http.MaxBytesReader(w, r.Body, e.cfg.FileSizeLimit) + defer r.Body.Close() + + err = r.ParseMultipartForm(1e6) // ~1Mb + if err == http.ErrNotMultipart || err == http.ErrMissingBoundary { + ResponseWithError(w, http.StatusUnsupportedMediaType, err) + return + // } else if err == multipart.ErrMessageTooLarge // if non-files part exceeds 10 MB + } else if err != nil { + ResponseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging + return + } + + if r.MultipartForm == nil { + ResponseWithError(w, http.StatusInternalServerError, errors.New("Multipart not parsed")) + return + } + + if len(r.MultipartForm.Value["projectKey"]) == 0 { + ResponseWithError(w, http.StatusBadRequest, errors.New("projectKey parameter missing")) // status for missing/wrong parameter? + return + } + + prefix := r.MultipartForm.Value["projectKey"][0] + "/" + strconv.FormatUint(sessionData.ID, 10) + "/" + + for _, fileHeaderList := range r.MultipartForm.File { + for _, fileHeader := range fileHeaderList { + file, err := fileHeader.Open() + if err != nil { + continue // TODO: send server error or accumulate successful files + } + key := prefix + fileHeader.Filename + log.Printf("Uploading image... %v", key) + go func() { //TODO: mime type from header + if err := e.services.Storage.Upload(file, key, "image/jpeg", false); err != nil { + log.Printf("Upload ios screen error. %v", err) + } + }() + } + } + + w.WriteHeader(http.StatusOK) +} diff --git a/backend/internal/router/handlers-web.go b/backend/internal/router/handlers-web.go new file mode 100644 index 000000000..fc7c6421d --- /dev/null +++ b/backend/internal/router/handlers-web.go @@ -0,0 +1,187 @@ +package router + +import ( + "bytes" + "encoding/json" + "errors" + "log" + "math/rand" + "net/http" + "openreplay/backend/internal/uuid" + "strconv" + "time" + + "openreplay/backend/pkg/db/postgres" + . "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/token" +) + +func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) { + startTime := time.Now() + + // Check request body + if r.Body == nil { + ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty")) + return + } + body := http.MaxBytesReader(w, r.Body, e.cfg.JsonSizeLimit) + defer body.Close() + + // Parse request body + req := &StartSessionRequest{} + if err := json.NewDecoder(body).Decode(req); err != nil { + ResponseWithError(w, http.StatusBadRequest, err) + return + } + + // Handler's logic + if req.ProjectKey == nil { + ResponseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required")) + return + } + + p, err := e.services.Database.GetProjectByKey(*req.ProjectKey) + if err != nil { + if postgres.IsNoRowsErr(err) { + ResponseWithError(w, http.StatusNotFound, errors.New("Project doesn't exist or capture limit has been reached")) + } else { + ResponseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging + } + return + } + + userUUID := uuid.GetUUID(req.UserUUID) + tokenData, err := e.services.Tokenizer.Parse(req.Token) + if err != nil || req.Reset { // Starting the new one + dice := byte(rand.Intn(100)) // [0, 100) + if dice >= p.SampleRate { + ResponseWithError(w, http.StatusForbidden, errors.New("cancel")) + return + } + + ua := e.services.UaParser.ParseFromHTTPRequest(r) + if ua == nil { + ResponseWithError(w, http.StatusForbidden, errors.New("browser not recognized")) + return + } + sessionID, err := e.services.Flaker.Compose(uint64(startTime.UnixNano() / 1e6)) + if err != nil { + ResponseWithError(w, http.StatusInternalServerError, err) + return + } + // TODO: if EXPIRED => send message for two sessions association + expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond) + tokenData = &token.TokenData{ID: sessionID, ExpTime: expTime.UnixNano() / 1e6} + + e.services.Producer.Produce(e.cfg.TopicRawWeb, tokenData.ID, Encode(&SessionStart{ + Timestamp: req.Timestamp, + ProjectID: uint64(p.ProjectID), + TrackerVersion: req.TrackerVersion, + RevID: req.RevID, + UserUUID: userUUID, + UserAgent: r.Header.Get("User-Agent"), + UserOS: ua.OS, + UserOSVersion: ua.OSVersion, + UserBrowser: ua.Browser, + UserBrowserVersion: ua.BrowserVersion, + UserDevice: ua.Device, + UserDeviceType: ua.DeviceType, + UserCountry: e.services.GeoIP.ExtractISOCodeFromHTTPRequest(r), + UserDeviceMemorySize: req.DeviceMemory, + UserDeviceHeapSize: req.JsHeapSizeLimit, + UserID: req.UserID, + })) + } + + ResponseWithJSON(w, &StartSessionResponse{ + Token: e.services.Tokenizer.Compose(*tokenData), + UserUUID: userUUID, + SessionID: strconv.FormatUint(tokenData.ID, 10), + BeaconSizeLimit: e.cfg.BeaconSizeLimit, + }) +} + +func (e *Router) pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request) { + // Check authorization + sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r) + if err != nil { + ResponseWithError(w, http.StatusUnauthorized, err) + return + } + + // Check request body + if r.Body == nil { + ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty")) + return + } + body := http.MaxBytesReader(w, r.Body, e.cfg.BeaconSizeLimit) + defer body.Close() + + var handledMessages bytes.Buffer + + // Process each message in request data + err = ReadBatchReader(body, func(msg Message) { + msg = e.services.Assets.ParseAssets(sessionData.ID, msg) + handledMessages.Write(msg.Encode()) + }) + if err != nil { + ResponseWithError(w, http.StatusForbidden, err) + return + } + + // Send processed messages to queue as array of bytes + err = e.services.Producer.Produce(e.cfg.TopicRawWeb, sessionData.ID, handledMessages.Bytes()) + if err != nil { + log.Printf("can't send processed messages to queue: %s", err) + } + + w.WriteHeader(http.StatusOK) +} + +func (e *Router) notStartedHandlerWeb(w http.ResponseWriter, r *http.Request) { + // Check request body + if r.Body == nil { + ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty")) + return + } + body := http.MaxBytesReader(w, r.Body, e.cfg.JsonSizeLimit) + defer body.Close() + + // Parse request body + req := &NotStartedRequest{} + if err := json.NewDecoder(body).Decode(req); err != nil { + ResponseWithError(w, http.StatusBadRequest, err) + return + } + + // Handler's logic + if req.ProjectKey == nil { + ResponseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required")) + return + } + ua := e.services.UaParser.ParseFromHTTPRequest(r) // TODO?: insert anyway + if ua == nil { + ResponseWithError(w, http.StatusForbidden, errors.New("browser not recognized")) + return + } + country := e.services.GeoIP.ExtractISOCodeFromHTTPRequest(r) + err := e.services.Database.InsertUnstartedSession(postgres.UnstartedSession{ + ProjectKey: *req.ProjectKey, + TrackerVersion: req.TrackerVersion, + DoNotTrack: req.DoNotTrack, + Platform: "web", + UserAgent: r.Header.Get("User-Agent"), + UserOS: ua.OS, + UserOSVersion: ua.OSVersion, + UserBrowser: ua.Browser, + UserBrowserVersion: ua.BrowserVersion, + UserDevice: ua.Device, + UserDeviceType: ua.DeviceType, + UserCountry: country, + }) + if err != nil { + log.Printf("Unable to insert Unstarted Session: %v\n", err) + } + + w.WriteHeader(http.StatusOK) +} diff --git a/backend/services/http/handlers.go b/backend/internal/router/handlers.go similarity index 56% rename from backend/services/http/handlers.go rename to backend/internal/router/handlers.go index dd73925af..34a7a990d 100644 --- a/backend/services/http/handlers.go +++ b/backend/internal/router/handlers.go @@ -1,28 +1,27 @@ -package main +package router import ( + gzip "github.com/klauspost/pgzip" "io" "io/ioutil" "log" "net/http" - - gzip "github.com/klauspost/pgzip" ) -const JSON_SIZE_LIMIT int64 = 1e3 // 1Kb - -func pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64, topicName string) { - body := http.MaxBytesReader(w, r.Body, BEACON_SIZE_LIMIT) +func (e *Router) pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64, topicName string) { + body := http.MaxBytesReader(w, r.Body, e.cfg.BeaconSizeLimit) defer body.Close() + var reader io.ReadCloser var err error + switch r.Header.Get("Content-Encoding") { case "gzip": log.Println("Gzip", reader) reader, err = gzip.NewReader(body) if err != nil { - responseWithError(w, http.StatusInternalServerError, err) // TODO: stage-dependent responce + ResponseWithError(w, http.StatusInternalServerError, err) // TODO: stage-dependent response return } log.Println("Gzip reader init", reader) @@ -33,9 +32,9 @@ func pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64, topi log.Println("Reader after switch:", reader) buf, err := ioutil.ReadAll(reader) if err != nil { - responseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging + ResponseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging return } - producer.Produce(topicName, sessionID, buf) // What if not able to send? + e.services.Producer.Produce(topicName, sessionID, buf) // What if not able to send? w.WriteHeader(http.StatusOK) } diff --git a/backend/internal/router/model.go b/backend/internal/router/model.go new file mode 100644 index 000000000..b39c49688 --- /dev/null +++ b/backend/internal/router/model.go @@ -0,0 +1,49 @@ +package router + +type StartSessionRequest struct { + Token string `json:"token"` + UserUUID *string `json:"userUUID"` + RevID string `json:"revID"` + Timestamp uint64 `json:"timestamp"` + TrackerVersion string `json:"trackerVersion"` + IsSnippet bool `json:"isSnippet"` + DeviceMemory uint64 `json:"deviceMemory"` + JsHeapSizeLimit uint64 `json:"jsHeapSizeLimit"` + ProjectKey *string `json:"projectKey"` + Reset bool `json:"reset"` + UserID string `json:"userID"` +} + +type StartSessionResponse struct { + Timestamp int64 `json:"timestamp"` + Delay int64 `json:"delay"` + Token string `json:"token"` + UserUUID string `json:"userUUID"` + SessionID string `json:"sessionID"` + BeaconSizeLimit int64 `json:"beaconSizeLimit"` +} + +type NotStartedRequest struct { + ProjectKey *string `json:"projectKey"` + TrackerVersion string `json:"trackerVersion"` + DoNotTrack bool `json:"DoNotTrack"` +} + +type StartIOSSessionRequest struct { + Token string `json:"token"` + ProjectKey *string `json:"projectKey"` + TrackerVersion string `json:"trackerVersion"` + RevID string `json:"revID"` + UserUUID *string `json:"userUUID"` + UserOSVersion string `json:"userOSVersion"` + UserDevice string `json:"userDevice"` + Timestamp uint64 `json:"timestamp"` +} + +type StartIOSSessionResponse struct { + Token string `json:"token"` + ImagesHashList []string `json:"imagesHashList"` + UserUUID string `json:"userUUID"` + BeaconSizeLimit int64 `json:"beaconSizeLimit"` + SessionID string `json:"sessionID"` +} diff --git a/backend/services/http/response.go b/backend/internal/router/response.go similarity index 59% rename from backend/services/http/response.go rename to backend/internal/router/response.go index 11d9b328d..0b4725419 100644 --- a/backend/services/http/response.go +++ b/backend/internal/router/response.go @@ -1,4 +1,4 @@ -package main +package router import ( "encoding/json" @@ -6,7 +6,7 @@ import ( "net/http" ) -func responseWithJSON(w http.ResponseWriter, res interface{}) { +func ResponseWithJSON(w http.ResponseWriter, res interface{}) { body, err := json.Marshal(res) if err != nil { log.Println(err) @@ -15,10 +15,10 @@ func responseWithJSON(w http.ResponseWriter, res interface{}) { w.Write(body) } -func responseWithError(w http.ResponseWriter, code int, err error) { +func ResponseWithError(w http.ResponseWriter, code int, err error) { type response struct { Error string `json:"error"` } w.WriteHeader(code) - responseWithJSON(w, &response{err.Error()}) + ResponseWithJSON(w, &response{err.Error()}) } diff --git a/backend/internal/router/router.go b/backend/internal/router/router.go new file mode 100644 index 000000000..145630351 --- /dev/null +++ b/backend/internal/router/router.go @@ -0,0 +1,70 @@ +package router + +import ( + "github.com/gorilla/mux" + "log" + "net/http" + "openreplay/backend/internal/config" + http2 "openreplay/backend/internal/services" +) + +type Router struct { + router *mux.Router + cfg *config.Config + services *http2.ServicesBuilder +} + +func NewRouter(cfg *config.Config, services *http2.ServicesBuilder) (*Router, error) { + e := &Router{ + cfg: cfg, + services: services, + } + e.init() + return e, nil +} + +func (e *Router) init() { + e.router = mux.NewRouter() + // Root path + e.router.HandleFunc("/", e.root).Methods("POST") + + // Web handlers + e.router.HandleFunc("/v1/web/not-started", e.notStartedHandlerWeb).Methods("POST") + e.router.HandleFunc("/v1/web/start", e.startSessionHandlerWeb).Methods("POST") + e.router.HandleFunc("/v1/web/i", e.pushMessagesHandlerWeb).Methods("POST") + + // iOS handlers + e.router.HandleFunc("/v1/ios/start", e.startSessionHandlerIOS).Methods("POST") + e.router.HandleFunc("/v1/ios/i", e.pushMessagesHandlerIOS).Methods("POST") + e.router.HandleFunc("/v1/ios/late", e.pushLateMessagesHandlerIOS).Methods("POST") + e.router.HandleFunc("/v1/ios/images", e.imagesUploadHandlerIOS).Methods("POST") + + // CORS middleware + e.router.Use(e.corsMiddleware) +} + +func (e *Router) root(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) +} + +func (e *Router) corsMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Prepare headers for preflight requests + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "POST") + w.Header().Set("Access-Control-Allow-Headers", "Content-Type,Authorization") + if r.Method == http.MethodOptions { + w.Header().Set("Cache-Control", "max-age=86400") + w.WriteHeader(http.StatusOK) + return + } + log.Printf("Request: %v - %v ", r.Method, r.URL.Path) + + // Serve request + next.ServeHTTP(w, r) + }) +} + +func (e *Router) GetHandler() http.Handler { + return e.router +} diff --git a/backend/internal/server/server.go b/backend/internal/server/server.go new file mode 100644 index 000000000..2670ba537 --- /dev/null +++ b/backend/internal/server/server.go @@ -0,0 +1,46 @@ +package server + +import ( + "context" + "errors" + "fmt" + "golang.org/x/net/http2" + "log" + "net/http" + "time" +) + +type Server struct { + server *http.Server +} + +func New(handler http.Handler, host, port string, timeout time.Duration) (*Server, error) { + switch { + case port == "": + return nil, errors.New("empty server port") + case handler == nil: + return nil, errors.New("empty handler") + case timeout < 1: + return nil, fmt.Errorf("invalid timeout %d", timeout) + } + server := &http.Server{ + Addr: fmt.Sprintf("%s:%s", host, port), + Handler: handler, + ReadTimeout: timeout, + WriteTimeout: timeout, + } + if err := http2.ConfigureServer(server, nil); err != nil { + log.Printf("can't configure http2 server: %s", err) + } + return &Server{ + server: server, + }, nil +} + +func (s *Server) Start() error { + return s.server.ListenAndServe() +} + +func (s *Server) Stop() { + s.server.Shutdown(context.Background()) +} diff --git a/backend/internal/services/services.go b/backend/internal/services/services.go new file mode 100644 index 000000000..5b84e1dfb --- /dev/null +++ b/backend/internal/services/services.go @@ -0,0 +1,39 @@ +package services + +import ( + "openreplay/backend/internal/assetscache" + "openreplay/backend/internal/config" + "openreplay/backend/internal/geoip" + "openreplay/backend/internal/uaparser" + "openreplay/backend/pkg/db/cache" + "openreplay/backend/pkg/flakeid" + "openreplay/backend/pkg/queue/types" + "openreplay/backend/pkg/storage" + "openreplay/backend/pkg/token" + "openreplay/backend/pkg/url/assets" +) + +type ServicesBuilder struct { + Database *cache.PGCache + Producer types.Producer + Assets *assetscache.AssetsCache + Flaker *flakeid.Flaker + UaParser *uaparser.UAParser + GeoIP *geoip.GeoIP + Tokenizer *token.Tokenizer + Storage *storage.S3 +} + +func New(cfg *config.Config, producer types.Producer, pgconn *cache.PGCache) *ServicesBuilder { + rewriter := assets.NewRewriter(cfg.AssetsOrigin) + return &ServicesBuilder{ + Database: pgconn, + Producer: producer, + Assets: assetscache.New(cfg, rewriter, producer), + Storage: storage.NewS3(cfg.AWSRegion, cfg.S3BucketIOSImages), + Tokenizer: token.NewTokenizer(cfg.TokenSecret), + UaParser: uaparser.NewUAParser(cfg.UAParserFile), + GeoIP: geoip.NewGeoIP(cfg.MaxMinDBFile), + Flaker: flakeid.NewFlaker(cfg.WorkerID), + } +} diff --git a/backend/services/http/uaparser/http.go b/backend/internal/uaparser/http.go similarity index 100% rename from backend/services/http/uaparser/http.go rename to backend/internal/uaparser/http.go diff --git a/backend/services/http/uaparser/uaparser.go b/backend/internal/uaparser/uaparser.go similarity index 100% rename from backend/services/http/uaparser/uaparser.go rename to backend/internal/uaparser/uaparser.go diff --git a/backend/services/http/uuid.go b/backend/internal/uuid/uuid.go similarity index 76% rename from backend/services/http/uuid.go rename to backend/internal/uuid/uuid.go index 87704d740..44dd76827 100644 --- a/backend/services/http/uuid.go +++ b/backend/internal/uuid/uuid.go @@ -1,10 +1,10 @@ -package main +package uuid import ( "github.com/google/uuid" ) -func getUUID(u *string) string { +func GetUUID(u *string) string { if u != nil { _, err := uuid.Parse(*u) if err == nil { diff --git a/backend/pkg/messages/batch.go b/backend/pkg/messages/batch.go index 9241672a3..850f22de9 100644 --- a/backend/pkg/messages/batch.go +++ b/backend/pkg/messages/batch.go @@ -1,17 +1,12 @@ package messages import ( - "bytes" "io" "github.com/pkg/errors" ) -func ReadBatch(b []byte, callback func(Message)) error { - return ReadBatchReader(bytes.NewReader(b), callback) -} - -func ReadBatchReader(reader io.Reader, callback func(Message)) error { +func ReadBatchReader(reader io.Reader, messageHandler func(Message)) error { var index uint64 var timestamp int64 for { @@ -21,7 +16,7 @@ func ReadBatchReader(reader io.Reader, callback func(Message)) error { } else if err != nil { return errors.Wrapf(err, "Batch Message decoding error on message with index %v", index) } - msg = transformDepricated(msg) + msg = transformDeprecated(msg) isBatchMeta := false switch m := msg.(type) { @@ -48,37 +43,11 @@ func ReadBatchReader(reader io.Reader, callback func(Message)) error { } msg.Meta().Index = index msg.Meta().Timestamp = timestamp - callback(msg) + + messageHandler(msg) if !isBatchMeta { // Without that indexes will be unique anyway, though shifted by 1 because BatchMeta is not counted in tracker index++ } } return errors.New("Error of the codeflow. (Should return on EOF)") } - -const AVG_MESSAGE_SIZE = 40 // TODO: calculate OR calculate dynamically -func WriteBatch(mList []Message) []byte { - batch := make([]byte, AVG_MESSAGE_SIZE*len(mList)) - p := 0 - for _, msg := range mList { - msgBytes := msg.Encode() - if len(batch) < p+len(msgBytes) { - newBatch := make([]byte, 2*len(batch)+len(msgBytes)) - copy(newBatch, batch) - batch = newBatch - } - copy(batch[p:], msgBytes) - p += len(msgBytes) - } - return batch[:p] -} - -func RewriteBatch(reader io.Reader, rewrite func(Message) Message) ([]byte, error) { - mList := make([]Message, 0, 10) // 10? - if err := ReadBatchReader(reader, func(m Message) { - mList = append(mList, rewrite(m)) - }); err != nil { - return nil, err - } - return WriteBatch(mList), nil -} diff --git a/backend/pkg/messages/legacy-message-transform.go b/backend/pkg/messages/legacy-message-transform.go index 031c4444a..6774d95b1 100644 --- a/backend/pkg/messages/legacy-message-transform.go +++ b/backend/pkg/messages/legacy-message-transform.go @@ -1,6 +1,6 @@ package messages -func transformDepricated(msg Message) Message { +func transformDeprecated(msg Message) Message { switch m := msg.(type) { case *MouseClickDepricated: meta := m.Meta() @@ -10,19 +10,7 @@ func transformDepricated(msg Message) Message { ID: m.ID, HesitationTime: m.HesitationTime, Label: m.Label, - // Selector: '', } - // case *FetchDepricated: - // return &Fetch { - // Method: m.Method, - // URL: m.URL, - // Request: m.Request, - // Response: m.Response, - // Status: m.Status, - // Timestamp: m.Timestamp, - // Duration: m.Duration, - // // Headers: '' - // } default: return msg } diff --git a/backend/pkg/queue/messages.go b/backend/pkg/queue/messages.go index 0ab184ee6..9b9ff43a5 100644 --- a/backend/pkg/queue/messages.go +++ b/backend/pkg/queue/messages.go @@ -1,6 +1,7 @@ package queue import ( + "bytes" "log" "openreplay/backend/pkg/messages" @@ -9,7 +10,7 @@ import ( func NewMessageConsumer(group string, topics []string, handler types.DecodedMessageHandler, autoCommit bool) types.Consumer { return NewConsumer(group, topics, func(sessionID uint64, value []byte, meta *types.Meta) { - if err := messages.ReadBatch(value, func(msg messages.Message) { + if err := messages.ReadBatchReader(bytes.NewReader(value), func(msg messages.Message) { handler(sessionID, msg, meta) }); err != nil { log.Printf("Decode error: %v\n", err) diff --git a/backend/services/http/assets.go b/backend/services/http/assets.go deleted file mode 100644 index b6ac61186..000000000 --- a/backend/services/http/assets.go +++ /dev/null @@ -1,36 +0,0 @@ -package main - -import ( - "openreplay/backend/pkg/messages" - "openreplay/backend/pkg/url/assets" -) - -func sendAssetForCache(sessionID uint64, baseURL string, relativeURL string) { - if fullURL, cacheable := assets.GetFullCachableURL(baseURL, relativeURL); cacheable { - producer.Produce(TOPIC_CACHE, sessionID, messages.Encode(&messages.AssetCache{ - URL: fullURL, - })) - } -} - -func sendAssetsForCacheFromCSS(sessionID uint64, baseURL string, css string) { - for _, u := range assets.ExtractURLsFromCSS(css) { // TODO: in one shot with rewriting - sendAssetForCache(sessionID, baseURL, u) - } -} - -func handleURL(sessionID uint64, baseURL string, url string) string { - if CACHE_ASSESTS { - sendAssetForCache(sessionID, baseURL, url) - return rewriter.RewriteURL(sessionID, baseURL, url) - } - return assets.ResolveURL(baseURL, url) -} - -func handleCSS(sessionID uint64, baseURL string, css string) string { - if CACHE_ASSESTS { - sendAssetsForCacheFromCSS(sessionID, baseURL, css) - return rewriter.RewriteCSS(sessionID, baseURL, css) - } - return assets.ResolveCSS(baseURL, css) -} diff --git a/backend/services/http/build_hack b/backend/services/http/build_hack new file mode 100644 index 000000000..e69de29bb diff --git a/backend/services/http/handlers-depricated.go b/backend/services/http/handlers-depricated.go deleted file mode 100644 index 06ab7d0f9..000000000 --- a/backend/services/http/handlers-depricated.go +++ /dev/null @@ -1 +0,0 @@ -package main diff --git a/backend/services/http/handlers-ios.go b/backend/services/http/handlers-ios.go deleted file mode 100644 index 8116980e1..000000000 --- a/backend/services/http/handlers-ios.go +++ /dev/null @@ -1,195 +0,0 @@ -package main - -import ( - "encoding/json" - "errors" - "log" - "math/rand" - "net/http" - "strconv" - "time" - - "openreplay/backend/pkg/db/postgres" - . "openreplay/backend/pkg/messages" - "openreplay/backend/pkg/token" -) - -const FILES_SIZE_LIMIT int64 = 1e7 // 10Mb - -func startSessionHandlerIOS(w http.ResponseWriter, r *http.Request) { - type request struct { - Token string `json:"token"` - ProjectKey *string `json:"projectKey"` - TrackerVersion string `json:"trackerVersion"` - RevID string `json:"revID"` - UserUUID *string `json:"userUUID"` - //UserOS string `json"userOS"` //hardcoded 'MacOS' - UserOSVersion string `json:"userOSVersion"` - UserDevice string `json:"userDevice"` - Timestamp uint64 `json:"timestamp"` - // UserDeviceType uint 0:phone 1:pad 2:tv 3:carPlay 5:mac - // “performances”:{ - // “activeProcessorCount”:8, - // “isLowPowerModeEnabled”:0, - // “orientation”:0, - // “systemUptime”:585430, - // “batteryState”:0, - // “thermalState”:0, - // “batteryLevel”:0, - // “processorCount”:8, - // “physicalMemory”:17179869184 - // }, - } - type response struct { - Token string `json:"token"` - ImagesHashList []string `json:"imagesHashList"` - UserUUID string `json:"userUUID"` - BeaconSizeLimit int64 `json:"beaconSizeLimit"` - SessionID string `json:"sessionID"` - } - startTime := time.Now() - req := &request{} - body := http.MaxBytesReader(w, r.Body, JSON_SIZE_LIMIT) - defer body.Close() - if err := json.NewDecoder(body).Decode(req); err != nil { - responseWithError(w, http.StatusBadRequest, err) - return - } - - if req.ProjectKey == nil { - responseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required")) - return - } - - p, err := pgconn.GetProjectByKey(*req.ProjectKey) - if err != nil { - if postgres.IsNoRowsErr(err) { - responseWithError(w, http.StatusNotFound, errors.New("Project doesn't exist or is not active")) - } else { - responseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging - } - return - } - userUUID := getUUID(req.UserUUID) - tokenData, err := tokenizer.Parse(req.Token) - - if err != nil { // Starting the new one - dice := byte(rand.Intn(100)) // [0, 100) - if dice >= p.SampleRate { - responseWithError(w, http.StatusForbidden, errors.New("cancel")) - return - } - - ua := uaParser.ParseFromHTTPRequest(r) - if ua == nil { - responseWithError(w, http.StatusForbidden, errors.New("browser not recognized")) - return - } - sessionID, err := flaker.Compose(uint64(startTime.UnixMilli())) - if err != nil { - responseWithError(w, http.StatusInternalServerError, err) - return - } - // TODO: if EXPIRED => send message for two sessions association - expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond) - tokenData = &token.TokenData{sessionID, expTime.UnixMilli()} - - country := geoIP.ExtractISOCodeFromHTTPRequest(r) - - // The difference with web is mostly here: - producer.Produce(TOPIC_RAW_IOS, tokenData.ID, Encode(&IOSSessionStart{ - Timestamp: req.Timestamp, - ProjectID: uint64(p.ProjectID), - TrackerVersion: req.TrackerVersion, - RevID: req.RevID, - UserUUID: userUUID, - UserOS: "IOS", - UserOSVersion: req.UserOSVersion, - UserDevice: MapIOSDevice(req.UserDevice), - UserDeviceType: GetIOSDeviceType(req.UserDevice), - UserCountry: country, - })) - } - - // imagesHashList, err := s3.GetFrequentlyUsedKeys(*(req.EncodedProjectID)) // TODO: reuse index: ~ frequency * size - // if err != nil { - // responseWithError(w, http.StatusInternalServerError, err) - // return - // } - - responseWithJSON(w, &response{ - // ImagesHashList: imagesHashList, - Token: tokenizer.Compose(*tokenData), - UserUUID: userUUID, - SessionID: strconv.FormatUint(tokenData.ID, 10), - BeaconSizeLimit: BEACON_SIZE_LIMIT, - }) -} - -func pushMessagesHandlerIOS(w http.ResponseWriter, r *http.Request) { - sessionData, err := tokenizer.ParseFromHTTPRequest(r) - if err != nil { - responseWithError(w, http.StatusUnauthorized, err) - return - } - pushMessages(w, r, sessionData.ID, TOPIC_RAW_IOS) -} - -func pushLateMessagesHandlerIOS(w http.ResponseWriter, r *http.Request) { - sessionData, err := tokenizer.ParseFromHTTPRequest(r) - if err != nil && err != token.EXPIRED { - responseWithError(w, http.StatusUnauthorized, err) - return - } - // Check timestamps here? - pushMessages(w, r, sessionData.ID, TOPIC_RAW_IOS) -} - -func imagesUploadHandlerIOS(w http.ResponseWriter, r *http.Request) { - log.Printf("recieved imagerequest") - - sessionData, err := tokenizer.ParseFromHTTPRequest(r) - if err != nil { // Should accept expired token? - responseWithError(w, http.StatusUnauthorized, err) - return - } - - r.Body = http.MaxBytesReader(w, r.Body, FILES_SIZE_LIMIT) - defer r.Body.Close() - err = r.ParseMultipartForm(1e6) // ~1Mb - if err == http.ErrNotMultipart || err == http.ErrMissingBoundary { - responseWithError(w, http.StatusUnsupportedMediaType, err) - // } else if err == multipart.ErrMessageTooLarge // if non-files part exceeds 10 MB - } else if err != nil { - responseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging - } - - if r.MultipartForm == nil { - responseWithError(w, http.StatusInternalServerError, errors.New("Multipart not parsed")) - } - - if len(r.MultipartForm.Value["projectKey"]) == 0 { - responseWithError(w, http.StatusBadRequest, errors.New("projectKey parameter missing")) // status for missing/wrong parameter? - return - } - - prefix := r.MultipartForm.Value["projectKey"][0] + "/" + strconv.FormatUint(sessionData.ID, 10) + "/" - - for _, fileHeaderList := range r.MultipartForm.File { - for _, fileHeader := range fileHeaderList { - file, err := fileHeader.Open() - if err != nil { - continue // TODO: send server error or accumulate successful files - } - key := prefix + fileHeader.Filename - log.Printf("Uploading image... %v", key) - go func() { //TODO: mime type from header - if err := s3.Upload(file, key, "image/jpeg", false); err != nil { - log.Printf("Upload ios screen error. %v", err) - } - }() - } - } - - w.WriteHeader(http.StatusOK) -} diff --git a/backend/services/http/handlers-web.go b/backend/services/http/handlers-web.go deleted file mode 100644 index 7aab5bfbc..000000000 --- a/backend/services/http/handlers-web.go +++ /dev/null @@ -1,241 +0,0 @@ -package main - -import ( - "encoding/json" - "errors" - "log" - "math/rand" - "net/http" - "strconv" - "time" - - "openreplay/backend/pkg/db/postgres" - . "openreplay/backend/pkg/messages" - "openreplay/backend/pkg/token" -) - -func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) { - type request struct { - Token string `json:"token"` - UserUUID *string `json:"userUUID"` - RevID string `json:"revID"` - Timestamp uint64 `json:"timestamp"` - TrackerVersion string `json:"trackerVersion"` - IsSnippet bool `json:"isSnippet"` - DeviceMemory uint64 `json:"deviceMemory"` - JsHeapSizeLimit uint64 `json:"jsHeapSizeLimit"` - ProjectKey *string `json:"projectKey"` - Reset bool `json:"reset"` - UserID string `json:"userID"` - } - type response struct { - Timestamp int64 `json:"timestamp"` - Delay int64 `json:"delay"` - Token string `json:"token"` - UserUUID string `json:"userUUID"` - SessionID string `json:"sessionID"` - BeaconSizeLimit int64 `json:"beaconSizeLimit"` - } - - startTime := time.Now() - req := &request{} - body := http.MaxBytesReader(w, r.Body, JSON_SIZE_LIMIT) // what if Body == nil?? // use r.ContentLength to return specific error? - defer body.Close() - if err := json.NewDecoder(body).Decode(req); err != nil { - responseWithError(w, http.StatusBadRequest, err) - return - } - - if req.ProjectKey == nil { - responseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required")) - return - } - - p, err := pgconn.GetProjectByKey(*req.ProjectKey) - if err != nil { - if postgres.IsNoRowsErr(err) { - responseWithError(w, http.StatusNotFound, errors.New("Project doesn't exist or capture limit has been reached")) - } else { - responseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging - } - return - } - - userUUID := getUUID(req.UserUUID) - tokenData, err := tokenizer.Parse(req.Token) - if err != nil || req.Reset { // Starting the new one - dice := byte(rand.Intn(100)) // [0, 100) - if dice >= p.SampleRate { - responseWithError(w, http.StatusForbidden, errors.New("cancel")) - return - } - - ua := uaParser.ParseFromHTTPRequest(r) - if ua == nil { - responseWithError(w, http.StatusForbidden, errors.New("browser not recognized")) - return - } - sessionID, err := flaker.Compose(uint64(startTime.UnixNano() / 1e6)) - if err != nil { - responseWithError(w, http.StatusInternalServerError, err) - return - } - // TODO: if EXPIRED => send message for two sessions association - expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond) - tokenData = &token.TokenData{sessionID, expTime.UnixNano() / 1e6} - - country := geoIP.ExtractISOCodeFromHTTPRequest(r) - producer.Produce(TOPIC_RAW_WEB, tokenData.ID, Encode(&SessionStart{ - Timestamp: req.Timestamp, - ProjectID: uint64(p.ProjectID), - TrackerVersion: req.TrackerVersion, - RevID: req.RevID, - UserUUID: userUUID, - UserAgent: r.Header.Get("User-Agent"), - UserOS: ua.OS, - UserOSVersion: ua.OSVersion, - UserBrowser: ua.Browser, - UserBrowserVersion: ua.BrowserVersion, - UserDevice: ua.Device, - UserDeviceType: ua.DeviceType, - UserCountry: country, - UserDeviceMemorySize: req.DeviceMemory, - UserDeviceHeapSize: req.JsHeapSizeLimit, - UserID: req.UserID, - })) - } - - //delayDuration := time.Now().Sub(startTime) - responseWithJSON(w, &response{ - //Timestamp: startTime.UnixNano() / 1e6, - //Delay: delayDuration.Nanoseconds() / 1e6, - Token: tokenizer.Compose(*tokenData), - UserUUID: userUUID, - SessionID: strconv.FormatUint(tokenData.ID, 10), - BeaconSizeLimit: BEACON_SIZE_LIMIT, - }) -} - -func pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request) { - sessionData, err := tokenizer.ParseFromHTTPRequest(r) - if err != nil { - responseWithError(w, http.StatusUnauthorized, err) - return - } - body := http.MaxBytesReader(w, r.Body, BEACON_SIZE_LIMIT) - defer body.Close() - - rewritenBuf, err := RewriteBatch(body, func(msg Message) Message { - switch m := msg.(type) { - case *SetNodeAttributeURLBased: - if m.Name == "src" || m.Name == "href" { - msg = &SetNodeAttribute{ - ID: m.ID, - Name: m.Name, - Value: handleURL(sessionData.ID, m.BaseURL, m.Value), - } - } else if m.Name == "style" { - msg = &SetNodeAttribute{ - ID: m.ID, - Name: m.Name, - Value: handleCSS(sessionData.ID, m.BaseURL, m.Value), - } - } - case *SetCSSDataURLBased: - msg = &SetCSSData{ - ID: m.ID, - Data: handleCSS(sessionData.ID, m.BaseURL, m.Data), - } - case *CSSInsertRuleURLBased: - msg = &CSSInsertRule{ - ID: m.ID, - Index: m.Index, - Rule: handleCSS(sessionData.ID, m.BaseURL, m.Rule), - } - } - - // switch msg.(type) { - // case *BatchMeta, // TODO: watchout! Meta().Index'es are changed here (though it is still unique for the topic-session pair) - // *SetPageLocation, - // *PageLoadTiming, - // *PageRenderTiming, - // *PerformanceTrack, - // *SetInputTarget, - // *SetInputValue, - // *MouseClick, - // *RawErrorEvent, - // *JSException, - // *ResourceTiming, - // *RawCustomEvent, - // *CustomIssue, - // *Fetch, - // *StateAction, - // *GraphQL, - // *CreateElementNode, - // *CreateTextNode, - // *RemoveNode, - // *CreateDocument, - // *RemoveNodeAttribute, - // *MoveNode, - // *SetCSSData, - // *CSSInsertRule, - // *CSSDeleteRule: - // analyticsMessages = append(analyticsMessages, msg) - //} - - return msg - }) - if err != nil { - responseWithError(w, http.StatusForbidden, err) - return - } - producer.Produce(TOPIC_RAW_WEB, sessionData.ID, rewritenBuf) - //producer.Produce(TOPIC_ANALYTICS, sessionData.ID, WriteBatch(analyticsMessages)) - //duration := time.Now().Sub(startTime) - //log.Printf("Sended batch within %v nsec; %v nsek/byte", duration.Nanoseconds(), duration.Nanoseconds()/int64(len(buf))) - w.WriteHeader(http.StatusOK) -} - -func notStartedHandlerWeb(w http.ResponseWriter, r *http.Request) { - type request struct { - ProjectKey *string `json:"projectKey"` - TrackerVersion string `json:"trackerVersion"` - DoNotTrack bool `json:"DoNotTrack"` - // RevID string `json:"revID"` - } - req := &request{} - body := http.MaxBytesReader(w, r.Body, JSON_SIZE_LIMIT) - defer body.Close() - if err := json.NewDecoder(body).Decode(req); err != nil { - responseWithError(w, http.StatusBadRequest, err) - return - } - if req.ProjectKey == nil { - responseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required")) - return - } - ua := uaParser.ParseFromHTTPRequest(r) // TODO?: insert anyway - if ua == nil { - responseWithError(w, http.StatusForbidden, errors.New("browser not recognized")) - return - } - country := geoIP.ExtractISOCodeFromHTTPRequest(r) - err := pgconn.InsertUnstartedSession(postgres.UnstartedSession{ - ProjectKey: *req.ProjectKey, - TrackerVersion: req.TrackerVersion, - DoNotTrack: req.DoNotTrack, - Platform: "web", - UserAgent: r.Header.Get("User-Agent"), - UserOS: ua.OS, - UserOSVersion: ua.OSVersion, - UserBrowser: ua.Browser, - UserBrowserVersion: ua.BrowserVersion, - UserDevice: ua.Device, - UserDeviceType: ua.DeviceType, - UserCountry: country, - }) - if err != nil { - log.Printf("Unable to insert Unstarted Session: %v\n", err) - } - w.WriteHeader(http.StatusOK) -} diff --git a/backend/services/http/main.go b/backend/services/http/main.go deleted file mode 100644 index 1f3bc93b3..000000000 --- a/backend/services/http/main.go +++ /dev/null @@ -1,156 +0,0 @@ -package main - -import ( - "context" - "log" - "net/http" - "os" - "os/signal" - "syscall" - - "golang.org/x/net/http2" - - "openreplay/backend/pkg/db/cache" - "openreplay/backend/pkg/db/postgres" - "openreplay/backend/pkg/env" - "openreplay/backend/pkg/flakeid" - "openreplay/backend/pkg/queue" - "openreplay/backend/pkg/queue/types" - "openreplay/backend/pkg/storage" - "openreplay/backend/pkg/token" - "openreplay/backend/pkg/url/assets" - "openreplay/backend/services/http/geoip" - "openreplay/backend/services/http/uaparser" - - "openreplay/backend/pkg/pprof" -) - -var rewriter *assets.Rewriter -var producer types.Producer -var pgconn *cache.PGCache -var flaker *flakeid.Flaker -var uaParser *uaparser.UAParser -var geoIP *geoip.GeoIP -var tokenizer *token.Tokenizer -var s3 *storage.S3 - -var TOPIC_RAW_WEB string -var TOPIC_RAW_IOS string -var TOPIC_CACHE string -var TOPIC_TRIGGER string - -//var TOPIC_ANALYTICS string -var CACHE_ASSESTS bool -var BEACON_SIZE_LIMIT int64 - -func main() { - log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) - pprof.StartProfilingServer() - - producer = queue.NewProducer() - defer producer.Close(15000) - TOPIC_RAW_WEB = env.String("TOPIC_RAW_WEB") - TOPIC_RAW_IOS = env.String("TOPIC_RAW_IOS") - TOPIC_CACHE = env.String("TOPIC_CACHE") - TOPIC_TRIGGER = env.String("TOPIC_TRIGGER") - //TOPIC_ANALYTICS = env.String("TOPIC_ANALYTICS") - rewriter = assets.NewRewriter(env.String("ASSETS_ORIGIN")) - pgconn = cache.NewPGCache(postgres.NewConn(env.String("POSTGRES_STRING")), 1000*60*20) - defer pgconn.Close() - s3 = storage.NewS3(env.String("AWS_REGION"), env.String("S3_BUCKET_IOS_IMAGES")) - tokenizer = token.NewTokenizer(env.String("TOKEN_SECRET")) - uaParser = uaparser.NewUAParser(env.String("UAPARSER_FILE")) - geoIP = geoip.NewGeoIP(env.String("MAXMINDDB_FILE")) - flaker = flakeid.NewFlaker(env.WorkerID()) - CACHE_ASSESTS = env.Bool("CACHE_ASSETS") - BEACON_SIZE_LIMIT = int64(env.Uint64("BEACON_SIZE_LIMIT")) - - HTTP_PORT := env.String("HTTP_PORT") - - server := &http.Server{ - Addr: ":" + HTTP_PORT, - Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - - // TODO: agree with specification - w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Access-Control-Allow-Methods", "POST") - w.Header().Set("Access-Control-Allow-Headers", "Content-Type,Authorization") - if r.Method == http.MethodOptions { - w.Header().Set("Cache-Control", "max-age=86400") - w.WriteHeader(http.StatusOK) - return - } - - log.Printf("Request: %v - %v ", r.Method, r.URL.Path) - - switch r.URL.Path { - case "/": - w.WriteHeader(http.StatusOK) - case "/v1/web/not-started": - switch r.Method { - case http.MethodPost: - notStartedHandlerWeb(w, r) - default: - w.WriteHeader(http.StatusMethodNotAllowed) - } - case "/v1/web/start": - switch r.Method { - case http.MethodPost: - startSessionHandlerWeb(w, r) - default: - w.WriteHeader(http.StatusMethodNotAllowed) - } - case "/v1/web/i": - switch r.Method { - case http.MethodPost: - pushMessagesHandlerWeb(w, r) - default: - w.WriteHeader(http.StatusMethodNotAllowed) - } - case "/v1/ios/start": - switch r.Method { - case http.MethodPost: - startSessionHandlerIOS(w, r) - default: - w.WriteHeader(http.StatusMethodNotAllowed) - } - case "/v1/ios/i": - switch r.Method { - case http.MethodPost: - pushMessagesHandlerIOS(w, r) - default: - w.WriteHeader(http.StatusMethodNotAllowed) - } - case "/v1/ios/late": - switch r.Method { - case http.MethodPost: - pushLateMessagesHandlerIOS(w, r) - default: - w.WriteHeader(http.StatusMethodNotAllowed) - } - case "/v1/ios/images": - switch r.Method { - case http.MethodPost: - imagesUploadHandlerIOS(w, r) - default: - w.WriteHeader(http.StatusMethodNotAllowed) - } - default: - w.WriteHeader(http.StatusNotFound) - } - }), - } - http2.ConfigureServer(server, nil) - go func() { - if err := server.ListenAndServe(); err != nil { - log.Printf("Server error: %v\n", err) - log.Fatal("Server error") - } - }() - log.Printf("Server successfully started on port %v\n", HTTP_PORT) - sigchan := make(chan os.Signal, 1) - signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) - <-sigchan - log.Printf("Shutting down the server\n") - server.Shutdown(context.Background()) -} diff --git a/scripts/vagrant/README.md b/scripts/vagrant/README.md index ffe132c73..5262930ba 100644 --- a/scripts/vagrant/README.md +++ b/scripts/vagrant/README.md @@ -38,8 +38,8 @@ Use the following instructions if you’re running Windows 10 or Windows 8: openreplay.local Select File > Save to save your changes. -**Open browser** -http://openreplay.local +**Open the below URL and create an account** +http://openreplay.local/signup ``` ### To start developing