Merge branch 'dev' of github.com:openreplay/openreplay into user-list

This commit is contained in:
Shekar Siri 2022-05-04 16:42:45 +02:00
commit 5ef382c9b8
31 changed files with 868 additions and 708 deletions

View file

@ -49,7 +49,7 @@ jobs:
#
{
git diff --name-only HEAD HEAD~1 | grep backend/services | grep -vE ^ee/ | cut -d '/' -f3
git diff --name-only HEAD HEAD~1 | grep -E "backend/cmd|backend/services" | grep -vE ^ee/ | cut -d '/' -f3
git diff --name-only HEAD HEAD~1 | grep backend/pkg | grep -vE ^ee/ | cut -d '/' -f3 | uniq | while read -r pkg_name ; do
grep -rl "pkg/$pkg_name" backend/services | cut -d '/' -f3

34
backend/build.sh Normal file → Executable file
View file

@ -18,6 +18,28 @@ check_prereq() {
return
}
function build_service() {
image="$1"
echo "BUILDING $image"
case "$image" in
http)
echo build http
docker build -t ${DOCKER_REPO:-'local'}/$image:${git_sha1} --build-arg SERVICE_NAME=$image -f ./cmd/Dockerfile .
[[ $PUSH_IMAGE -eq 1 ]] && {
docker push ${DOCKER_REPO:-'local'}/$image:${git_sha1}
}
;;
*)
docker build -t ${DOCKER_REPO:-'local'}/$image:${git_sha1} --build-arg SERVICE_NAME=$image .
[[ $PUSH_IMAGE -eq 1 ]] && {
docker push ${DOCKER_REPO:-'local'}/$image:${git_sha1}
}
;;
esac
return
}
function build_api(){
# Copy enterprise code
[[ $1 == "ee" ]] && {
@ -25,20 +47,12 @@ function build_api(){
ee="true"
}
[[ $2 != "" ]] && {
image="$2"
docker build -t ${DOCKER_REPO:-'local'}/$image:${git_sha1} --build-arg SERVICE_NAME=$image .
[[ $PUSH_IMAGE -eq 1 ]] && {
docker push ${DOCKER_REPO:-'local'}/$image:${git_sha1}
}
echo "build completed for http"
build_service $2
return
}
for image in $(ls services);
do
docker build -t ${DOCKER_REPO:-'local'}/$image:${git_sha1} --build-arg SERVICE_NAME=$image .
[[ $PUSH_IMAGE -eq 1 ]] && {
docker push ${DOCKER_REPO:-'local'}/$image:${git_sha1}
}
build_service $image
echo "::set-output name=image::${DOCKER_REPO:-'local'}/$image:${git_sha1}"
done
echo "backend build completed"

60
backend/cmd/Dockerfile Normal file
View file

@ -0,0 +1,60 @@
FROM golang:1.18-alpine3.15 AS prepare
RUN apk add --no-cache git openssh openssl-dev pkgconf gcc g++ make libc-dev bash
WORKDIR /root
COPY go.mod .
COPY go.sum .
RUN go mod download
FROM prepare AS build
COPY pkg pkg
COPY services services
COPY internal internal
COPY cmd cmd
ARG SERVICE_NAME
RUN CGO_ENABLED=1 GOOS=linux GOARCH=amd64 go build -o service -tags musl openreplay/backend/cmd/$SERVICE_NAME
FROM alpine
RUN apk add --no-cache ca-certificates
ENV TZ=UTC \
FS_ULIMIT=1000 \
FS_DIR=/mnt/efs \
MAXMINDDB_FILE=/root/geoip.mmdb \
UAPARSER_FILE=/root/regexes.yaml \
HTTP_PORT=80 \
BEACON_SIZE_LIMIT=7000000 \
KAFKA_USE_SSL=true \
KAFKA_MAX_POLL_INTERVAL_MS=400000 \
REDIS_STREAMS_MAX_LEN=3000 \
TOPIC_RAW_WEB=raw \
TOPIC_RAW_IOS=raw-ios \
TOPIC_CACHE=cache \
TOPIC_ANALYTICS=analytics \
TOPIC_TRIGGER=trigger \
GROUP_SINK=sink \
GROUP_STORAGE=storage \
GROUP_DB=db \
GROUP_ENDER=ender \
GROUP_CACHE=cache \
AWS_REGION_WEB=eu-central-1 \
AWS_REGION_IOS=eu-west-1 \
AWS_REGION_ASSETS=eu-central-1 \
CACHE_ASSETS=true \
ASSETS_SIZE_LIMIT=6291456 \
FS_CLEAN_HRS=72 \
LOG_QUEUE_STATS_INTERVAL_SEC=60
ARG SERVICE_NAME
RUN if [ "$SERVICE_NAME" = "http" ]; then \
wget https://raw.githubusercontent.com/ua-parser/uap-core/master/regexes.yaml -O "$UAPARSER_FILE" &&\
wget https://static.openreplay.com/geoip/GeoLite2-Country.mmdb -O "$MAXMINDDB_FILE"; fi
COPY --from=build /root/service /root/service
ENTRYPOINT /root/service

62
backend/cmd/http/main.go Normal file
View file

@ -0,0 +1,62 @@
package main
import (
"log"
"openreplay/backend/internal/config"
"openreplay/backend/internal/router"
"openreplay/backend/internal/server"
"openreplay/backend/internal/services"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/pprof"
"openreplay/backend/pkg/queue"
"os"
"os/signal"
"syscall"
)
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
pprof.StartProfilingServer()
// Load configuration
cfg := config.New()
// Connect to queue
producer := queue.NewProducer()
defer producer.Close(15000)
// Connect to database
dbConn := cache.NewPGCache(postgres.NewConn(cfg.Postgres), 1000*60*20)
defer dbConn.Close()
// Build all services
services := services.New(cfg, producer, dbConn)
// Init server's routes
router, err := router.NewRouter(cfg, services)
if err != nil {
log.Fatalf("failed while creating engine: %s", err)
}
// Init server
server, err := server.New(router.GetHandler(), cfg.HTTPHost, cfg.HTTPPort, cfg.HTTPTimeout)
if err != nil {
log.Fatalf("failed while creating server: %s", err)
}
// Run server
go func() {
if err := server.Start(); err != nil {
log.Fatalf("Server error: %v\n", err)
}
}()
log.Printf("Server successfully started on port %v\n", cfg.HTTPPort)
// Wait stop signal to shut down server gracefully
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
<-sigchan
log.Printf("Shutting down the server\n")
server.Stop()
}

View file

@ -0,0 +1,83 @@
package assetscache
import (
"openreplay/backend/internal/config"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/url/assets"
)
type AssetsCache struct {
cfg *config.Config
rewriter *assets.Rewriter
producer types.Producer
}
func New(cfg *config.Config, rewriter *assets.Rewriter, producer types.Producer) *AssetsCache {
return &AssetsCache{
cfg: cfg,
rewriter: rewriter,
producer: producer,
}
}
func (e *AssetsCache) ParseAssets(sessID uint64, msg messages.Message) messages.Message {
switch m := msg.(type) {
case *messages.SetNodeAttributeURLBased:
if m.Name == "src" || m.Name == "href" {
return &messages.SetNodeAttribute{
ID: m.ID,
Name: m.Name,
Value: e.handleURL(sessID, m.BaseURL, m.Value),
}
} else if m.Name == "style" {
return &messages.SetNodeAttribute{
ID: m.ID,
Name: m.Name,
Value: e.handleCSS(sessID, m.BaseURL, m.Value),
}
}
case *messages.SetCSSDataURLBased:
return &messages.SetCSSData{
ID: m.ID,
Data: e.handleCSS(sessID, m.BaseURL, m.Data),
}
case *messages.CSSInsertRuleURLBased:
return &messages.CSSInsertRule{
ID: m.ID,
Index: m.Index,
Rule: e.handleCSS(sessID, m.BaseURL, m.Rule),
}
}
return msg
}
func (e *AssetsCache) sendAssetForCache(sessionID uint64, baseURL string, relativeURL string) {
if fullURL, cacheable := assets.GetFullCachableURL(baseURL, relativeURL); cacheable {
e.producer.Produce(e.cfg.TopicCache, sessionID, messages.Encode(&messages.AssetCache{
URL: fullURL,
}))
}
}
func (e *AssetsCache) sendAssetsForCacheFromCSS(sessionID uint64, baseURL string, css string) {
for _, u := range assets.ExtractURLsFromCSS(css) { // TODO: in one shot with rewriting
e.sendAssetForCache(sessionID, baseURL, u)
}
}
func (e *AssetsCache) handleURL(sessionID uint64, baseURL string, url string) string {
if e.cfg.CacheAssets {
e.sendAssetForCache(sessionID, baseURL, url)
return e.rewriter.RewriteURL(sessionID, baseURL, url)
}
return assets.ResolveURL(baseURL, url)
}
func (e *AssetsCache) handleCSS(sessionID uint64, baseURL string, css string) string {
if e.cfg.CacheAssets {
e.sendAssetsForCacheFromCSS(sessionID, baseURL, css)
return e.rewriter.RewriteCSS(sessionID, baseURL, css)
}
return assets.ResolveCSS(baseURL, css)
}

View file

@ -0,0 +1,50 @@
package config
import (
"openreplay/backend/pkg/env"
"time"
)
type Config struct {
HTTPHost string
HTTPPort string
HTTPTimeout time.Duration
TopicRawWeb string
TopicRawIOS string
TopicCache string
CacheAssets bool
BeaconSizeLimit int64
JsonSizeLimit int64
FileSizeLimit int64
AssetsOrigin string
AWSRegion string
S3BucketIOSImages string
Postgres string
TokenSecret string
UAParserFile string
MaxMinDBFile string
WorkerID uint16
}
func New() *Config {
return &Config{
HTTPHost: "", // empty by default
HTTPPort: env.String("HTTP_PORT"),
HTTPTimeout: time.Second * 60,
TopicRawWeb: env.String("TOPIC_RAW_WEB"),
TopicRawIOS: env.String("TOPIC_RAW_IOS"),
TopicCache: env.String("TOPIC_CACHE"),
CacheAssets: env.Bool("CACHE_ASSETS"),
BeaconSizeLimit: int64(env.Uint64("BEACON_SIZE_LIMIT")),
JsonSizeLimit: 1e3, // 1Kb
FileSizeLimit: 1e7, // 10Mb
AssetsOrigin: env.String("ASSETS_ORIGIN"),
AWSRegion: env.String("AWS_REGION"),
S3BucketIOSImages: env.String("S3_BUCKET_IOS_IMAGES"),
Postgres: env.String("POSTGRES_STRING"),
TokenSecret: env.String("TOKEN_SECRET"),
UAParserFile: env.String("UAPARSER_FILE"),
MaxMinDBFile: env.String("MAXMINDDB_FILE"),
WorkerID: env.WorkerID(),
}
}

View file

@ -1,4 +1,4 @@
package main
package ios
import (
"strings"

View file

@ -0,0 +1,172 @@
package router
import (
"encoding/json"
"errors"
"log"
"math/rand"
"net/http"
"openreplay/backend/internal/ios"
"openreplay/backend/internal/uuid"
"strconv"
"time"
"openreplay/backend/pkg/db/postgres"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/token"
)
func (e *Router) startSessionHandlerIOS(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
req := &StartIOSSessionRequest{}
if r.Body == nil {
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
return
}
body := http.MaxBytesReader(w, r.Body, e.cfg.JsonSizeLimit)
defer body.Close()
if err := json.NewDecoder(body).Decode(req); err != nil {
ResponseWithError(w, http.StatusBadRequest, err)
return
}
if req.ProjectKey == nil {
ResponseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"))
return
}
p, err := e.services.Database.GetProjectByKey(*req.ProjectKey)
if err != nil {
if postgres.IsNoRowsErr(err) {
ResponseWithError(w, http.StatusNotFound, errors.New("Project doesn't exist or is not active"))
} else {
ResponseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
}
return
}
userUUID := uuid.GetUUID(req.UserUUID)
tokenData, err := e.services.Tokenizer.Parse(req.Token)
if err != nil { // Starting the new one
dice := byte(rand.Intn(100)) // [0, 100)
if dice >= p.SampleRate {
ResponseWithError(w, http.StatusForbidden, errors.New("cancel"))
return
}
ua := e.services.UaParser.ParseFromHTTPRequest(r)
if ua == nil {
ResponseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
return
}
sessionID, err := e.services.Flaker.Compose(uint64(startTime.UnixMilli()))
if err != nil {
ResponseWithError(w, http.StatusInternalServerError, err)
return
}
// TODO: if EXPIRED => send message for two sessions association
expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond)
tokenData = &token.TokenData{sessionID, expTime.UnixMilli()}
country := e.services.GeoIP.ExtractISOCodeFromHTTPRequest(r)
// The difference with web is mostly here:
e.services.Producer.Produce(e.cfg.TopicRawIOS, tokenData.ID, Encode(&IOSSessionStart{
Timestamp: req.Timestamp,
ProjectID: uint64(p.ProjectID),
TrackerVersion: req.TrackerVersion,
RevID: req.RevID,
UserUUID: userUUID,
UserOS: "IOS",
UserOSVersion: req.UserOSVersion,
UserDevice: ios.MapIOSDevice(req.UserDevice),
UserDeviceType: ios.GetIOSDeviceType(req.UserDevice),
UserCountry: country,
}))
}
ResponseWithJSON(w, &StartIOSSessionResponse{
Token: e.services.Tokenizer.Compose(*tokenData),
UserUUID: userUUID,
SessionID: strconv.FormatUint(tokenData.ID, 10),
BeaconSizeLimit: e.cfg.BeaconSizeLimit,
})
}
func (e *Router) pushMessagesHandlerIOS(w http.ResponseWriter, r *http.Request) {
sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r)
if err != nil {
ResponseWithError(w, http.StatusUnauthorized, err)
return
}
e.pushMessages(w, r, sessionData.ID, e.cfg.TopicRawIOS)
}
func (e *Router) pushLateMessagesHandlerIOS(w http.ResponseWriter, r *http.Request) {
sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r)
if err != nil && err != token.EXPIRED {
ResponseWithError(w, http.StatusUnauthorized, err)
return
}
// Check timestamps here?
e.pushMessages(w, r, sessionData.ID, e.cfg.TopicRawIOS)
}
func (e *Router) imagesUploadHandlerIOS(w http.ResponseWriter, r *http.Request) {
log.Printf("recieved imagerequest")
sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r)
if err != nil { // Should accept expired token?
ResponseWithError(w, http.StatusUnauthorized, err)
return
}
if r.Body == nil {
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
return
}
r.Body = http.MaxBytesReader(w, r.Body, e.cfg.FileSizeLimit)
defer r.Body.Close()
err = r.ParseMultipartForm(1e6) // ~1Mb
if err == http.ErrNotMultipart || err == http.ErrMissingBoundary {
ResponseWithError(w, http.StatusUnsupportedMediaType, err)
return
// } else if err == multipart.ErrMessageTooLarge // if non-files part exceeds 10 MB
} else if err != nil {
ResponseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
return
}
if r.MultipartForm == nil {
ResponseWithError(w, http.StatusInternalServerError, errors.New("Multipart not parsed"))
return
}
if len(r.MultipartForm.Value["projectKey"]) == 0 {
ResponseWithError(w, http.StatusBadRequest, errors.New("projectKey parameter missing")) // status for missing/wrong parameter?
return
}
prefix := r.MultipartForm.Value["projectKey"][0] + "/" + strconv.FormatUint(sessionData.ID, 10) + "/"
for _, fileHeaderList := range r.MultipartForm.File {
for _, fileHeader := range fileHeaderList {
file, err := fileHeader.Open()
if err != nil {
continue // TODO: send server error or accumulate successful files
}
key := prefix + fileHeader.Filename
log.Printf("Uploading image... %v", key)
go func() { //TODO: mime type from header
if err := e.services.Storage.Upload(file, key, "image/jpeg", false); err != nil {
log.Printf("Upload ios screen error. %v", err)
}
}()
}
}
w.WriteHeader(http.StatusOK)
}

View file

@ -0,0 +1,187 @@
package router
import (
"bytes"
"encoding/json"
"errors"
"log"
"math/rand"
"net/http"
"openreplay/backend/internal/uuid"
"strconv"
"time"
"openreplay/backend/pkg/db/postgres"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/token"
)
func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
// Check request body
if r.Body == nil {
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
return
}
body := http.MaxBytesReader(w, r.Body, e.cfg.JsonSizeLimit)
defer body.Close()
// Parse request body
req := &StartSessionRequest{}
if err := json.NewDecoder(body).Decode(req); err != nil {
ResponseWithError(w, http.StatusBadRequest, err)
return
}
// Handler's logic
if req.ProjectKey == nil {
ResponseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"))
return
}
p, err := e.services.Database.GetProjectByKey(*req.ProjectKey)
if err != nil {
if postgres.IsNoRowsErr(err) {
ResponseWithError(w, http.StatusNotFound, errors.New("Project doesn't exist or capture limit has been reached"))
} else {
ResponseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
}
return
}
userUUID := uuid.GetUUID(req.UserUUID)
tokenData, err := e.services.Tokenizer.Parse(req.Token)
if err != nil || req.Reset { // Starting the new one
dice := byte(rand.Intn(100)) // [0, 100)
if dice >= p.SampleRate {
ResponseWithError(w, http.StatusForbidden, errors.New("cancel"))
return
}
ua := e.services.UaParser.ParseFromHTTPRequest(r)
if ua == nil {
ResponseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
return
}
sessionID, err := e.services.Flaker.Compose(uint64(startTime.UnixNano() / 1e6))
if err != nil {
ResponseWithError(w, http.StatusInternalServerError, err)
return
}
// TODO: if EXPIRED => send message for two sessions association
expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond)
tokenData = &token.TokenData{ID: sessionID, ExpTime: expTime.UnixNano() / 1e6}
e.services.Producer.Produce(e.cfg.TopicRawWeb, tokenData.ID, Encode(&SessionStart{
Timestamp: req.Timestamp,
ProjectID: uint64(p.ProjectID),
TrackerVersion: req.TrackerVersion,
RevID: req.RevID,
UserUUID: userUUID,
UserAgent: r.Header.Get("User-Agent"),
UserOS: ua.OS,
UserOSVersion: ua.OSVersion,
UserBrowser: ua.Browser,
UserBrowserVersion: ua.BrowserVersion,
UserDevice: ua.Device,
UserDeviceType: ua.DeviceType,
UserCountry: e.services.GeoIP.ExtractISOCodeFromHTTPRequest(r),
UserDeviceMemorySize: req.DeviceMemory,
UserDeviceHeapSize: req.JsHeapSizeLimit,
UserID: req.UserID,
}))
}
ResponseWithJSON(w, &StartSessionResponse{
Token: e.services.Tokenizer.Compose(*tokenData),
UserUUID: userUUID,
SessionID: strconv.FormatUint(tokenData.ID, 10),
BeaconSizeLimit: e.cfg.BeaconSizeLimit,
})
}
func (e *Router) pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request) {
// Check authorization
sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r)
if err != nil {
ResponseWithError(w, http.StatusUnauthorized, err)
return
}
// Check request body
if r.Body == nil {
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
return
}
body := http.MaxBytesReader(w, r.Body, e.cfg.BeaconSizeLimit)
defer body.Close()
var handledMessages bytes.Buffer
// Process each message in request data
err = ReadBatchReader(body, func(msg Message) {
msg = e.services.Assets.ParseAssets(sessionData.ID, msg)
handledMessages.Write(msg.Encode())
})
if err != nil {
ResponseWithError(w, http.StatusForbidden, err)
return
}
// Send processed messages to queue as array of bytes
err = e.services.Producer.Produce(e.cfg.TopicRawWeb, sessionData.ID, handledMessages.Bytes())
if err != nil {
log.Printf("can't send processed messages to queue: %s", err)
}
w.WriteHeader(http.StatusOK)
}
func (e *Router) notStartedHandlerWeb(w http.ResponseWriter, r *http.Request) {
// Check request body
if r.Body == nil {
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
return
}
body := http.MaxBytesReader(w, r.Body, e.cfg.JsonSizeLimit)
defer body.Close()
// Parse request body
req := &NotStartedRequest{}
if err := json.NewDecoder(body).Decode(req); err != nil {
ResponseWithError(w, http.StatusBadRequest, err)
return
}
// Handler's logic
if req.ProjectKey == nil {
ResponseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"))
return
}
ua := e.services.UaParser.ParseFromHTTPRequest(r) // TODO?: insert anyway
if ua == nil {
ResponseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
return
}
country := e.services.GeoIP.ExtractISOCodeFromHTTPRequest(r)
err := e.services.Database.InsertUnstartedSession(postgres.UnstartedSession{
ProjectKey: *req.ProjectKey,
TrackerVersion: req.TrackerVersion,
DoNotTrack: req.DoNotTrack,
Platform: "web",
UserAgent: r.Header.Get("User-Agent"),
UserOS: ua.OS,
UserOSVersion: ua.OSVersion,
UserBrowser: ua.Browser,
UserBrowserVersion: ua.BrowserVersion,
UserDevice: ua.Device,
UserDeviceType: ua.DeviceType,
UserCountry: country,
})
if err != nil {
log.Printf("Unable to insert Unstarted Session: %v\n", err)
}
w.WriteHeader(http.StatusOK)
}

View file

@ -1,28 +1,27 @@
package main
package router
import (
gzip "github.com/klauspost/pgzip"
"io"
"io/ioutil"
"log"
"net/http"
gzip "github.com/klauspost/pgzip"
)
const JSON_SIZE_LIMIT int64 = 1e3 // 1Kb
func pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64, topicName string) {
body := http.MaxBytesReader(w, r.Body, BEACON_SIZE_LIMIT)
func (e *Router) pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64, topicName string) {
body := http.MaxBytesReader(w, r.Body, e.cfg.BeaconSizeLimit)
defer body.Close()
var reader io.ReadCloser
var err error
switch r.Header.Get("Content-Encoding") {
case "gzip":
log.Println("Gzip", reader)
reader, err = gzip.NewReader(body)
if err != nil {
responseWithError(w, http.StatusInternalServerError, err) // TODO: stage-dependent responce
ResponseWithError(w, http.StatusInternalServerError, err) // TODO: stage-dependent response
return
}
log.Println("Gzip reader init", reader)
@ -33,9 +32,9 @@ func pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64, topi
log.Println("Reader after switch:", reader)
buf, err := ioutil.ReadAll(reader)
if err != nil {
responseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
ResponseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
return
}
producer.Produce(topicName, sessionID, buf) // What if not able to send?
e.services.Producer.Produce(topicName, sessionID, buf) // What if not able to send?
w.WriteHeader(http.StatusOK)
}

View file

@ -0,0 +1,49 @@
package router
type StartSessionRequest struct {
Token string `json:"token"`
UserUUID *string `json:"userUUID"`
RevID string `json:"revID"`
Timestamp uint64 `json:"timestamp"`
TrackerVersion string `json:"trackerVersion"`
IsSnippet bool `json:"isSnippet"`
DeviceMemory uint64 `json:"deviceMemory"`
JsHeapSizeLimit uint64 `json:"jsHeapSizeLimit"`
ProjectKey *string `json:"projectKey"`
Reset bool `json:"reset"`
UserID string `json:"userID"`
}
type StartSessionResponse struct {
Timestamp int64 `json:"timestamp"`
Delay int64 `json:"delay"`
Token string `json:"token"`
UserUUID string `json:"userUUID"`
SessionID string `json:"sessionID"`
BeaconSizeLimit int64 `json:"beaconSizeLimit"`
}
type NotStartedRequest struct {
ProjectKey *string `json:"projectKey"`
TrackerVersion string `json:"trackerVersion"`
DoNotTrack bool `json:"DoNotTrack"`
}
type StartIOSSessionRequest struct {
Token string `json:"token"`
ProjectKey *string `json:"projectKey"`
TrackerVersion string `json:"trackerVersion"`
RevID string `json:"revID"`
UserUUID *string `json:"userUUID"`
UserOSVersion string `json:"userOSVersion"`
UserDevice string `json:"userDevice"`
Timestamp uint64 `json:"timestamp"`
}
type StartIOSSessionResponse struct {
Token string `json:"token"`
ImagesHashList []string `json:"imagesHashList"`
UserUUID string `json:"userUUID"`
BeaconSizeLimit int64 `json:"beaconSizeLimit"`
SessionID string `json:"sessionID"`
}

View file

@ -1,4 +1,4 @@
package main
package router
import (
"encoding/json"
@ -6,7 +6,7 @@ import (
"net/http"
)
func responseWithJSON(w http.ResponseWriter, res interface{}) {
func ResponseWithJSON(w http.ResponseWriter, res interface{}) {
body, err := json.Marshal(res)
if err != nil {
log.Println(err)
@ -15,10 +15,10 @@ func responseWithJSON(w http.ResponseWriter, res interface{}) {
w.Write(body)
}
func responseWithError(w http.ResponseWriter, code int, err error) {
func ResponseWithError(w http.ResponseWriter, code int, err error) {
type response struct {
Error string `json:"error"`
}
w.WriteHeader(code)
responseWithJSON(w, &response{err.Error()})
ResponseWithJSON(w, &response{err.Error()})
}

View file

@ -0,0 +1,70 @@
package router
import (
"github.com/gorilla/mux"
"log"
"net/http"
"openreplay/backend/internal/config"
http2 "openreplay/backend/internal/services"
)
type Router struct {
router *mux.Router
cfg *config.Config
services *http2.ServicesBuilder
}
func NewRouter(cfg *config.Config, services *http2.ServicesBuilder) (*Router, error) {
e := &Router{
cfg: cfg,
services: services,
}
e.init()
return e, nil
}
func (e *Router) init() {
e.router = mux.NewRouter()
// Root path
e.router.HandleFunc("/", e.root).Methods("POST")
// Web handlers
e.router.HandleFunc("/v1/web/not-started", e.notStartedHandlerWeb).Methods("POST")
e.router.HandleFunc("/v1/web/start", e.startSessionHandlerWeb).Methods("POST")
e.router.HandleFunc("/v1/web/i", e.pushMessagesHandlerWeb).Methods("POST")
// iOS handlers
e.router.HandleFunc("/v1/ios/start", e.startSessionHandlerIOS).Methods("POST")
e.router.HandleFunc("/v1/ios/i", e.pushMessagesHandlerIOS).Methods("POST")
e.router.HandleFunc("/v1/ios/late", e.pushLateMessagesHandlerIOS).Methods("POST")
e.router.HandleFunc("/v1/ios/images", e.imagesUploadHandlerIOS).Methods("POST")
// CORS middleware
e.router.Use(e.corsMiddleware)
}
func (e *Router) root(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}
func (e *Router) corsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Prepare headers for preflight requests
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type,Authorization")
if r.Method == http.MethodOptions {
w.Header().Set("Cache-Control", "max-age=86400")
w.WriteHeader(http.StatusOK)
return
}
log.Printf("Request: %v - %v ", r.Method, r.URL.Path)
// Serve request
next.ServeHTTP(w, r)
})
}
func (e *Router) GetHandler() http.Handler {
return e.router
}

View file

@ -0,0 +1,46 @@
package server
import (
"context"
"errors"
"fmt"
"golang.org/x/net/http2"
"log"
"net/http"
"time"
)
type Server struct {
server *http.Server
}
func New(handler http.Handler, host, port string, timeout time.Duration) (*Server, error) {
switch {
case port == "":
return nil, errors.New("empty server port")
case handler == nil:
return nil, errors.New("empty handler")
case timeout < 1:
return nil, fmt.Errorf("invalid timeout %d", timeout)
}
server := &http.Server{
Addr: fmt.Sprintf("%s:%s", host, port),
Handler: handler,
ReadTimeout: timeout,
WriteTimeout: timeout,
}
if err := http2.ConfigureServer(server, nil); err != nil {
log.Printf("can't configure http2 server: %s", err)
}
return &Server{
server: server,
}, nil
}
func (s *Server) Start() error {
return s.server.ListenAndServe()
}
func (s *Server) Stop() {
s.server.Shutdown(context.Background())
}

View file

@ -0,0 +1,39 @@
package services
import (
"openreplay/backend/internal/assetscache"
"openreplay/backend/internal/config"
"openreplay/backend/internal/geoip"
"openreplay/backend/internal/uaparser"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/flakeid"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/storage"
"openreplay/backend/pkg/token"
"openreplay/backend/pkg/url/assets"
)
type ServicesBuilder struct {
Database *cache.PGCache
Producer types.Producer
Assets *assetscache.AssetsCache
Flaker *flakeid.Flaker
UaParser *uaparser.UAParser
GeoIP *geoip.GeoIP
Tokenizer *token.Tokenizer
Storage *storage.S3
}
func New(cfg *config.Config, producer types.Producer, pgconn *cache.PGCache) *ServicesBuilder {
rewriter := assets.NewRewriter(cfg.AssetsOrigin)
return &ServicesBuilder{
Database: pgconn,
Producer: producer,
Assets: assetscache.New(cfg, rewriter, producer),
Storage: storage.NewS3(cfg.AWSRegion, cfg.S3BucketIOSImages),
Tokenizer: token.NewTokenizer(cfg.TokenSecret),
UaParser: uaparser.NewUAParser(cfg.UAParserFile),
GeoIP: geoip.NewGeoIP(cfg.MaxMinDBFile),
Flaker: flakeid.NewFlaker(cfg.WorkerID),
}
}

View file

@ -1,10 +1,10 @@
package main
package uuid
import (
"github.com/google/uuid"
)
func getUUID(u *string) string {
func GetUUID(u *string) string {
if u != nil {
_, err := uuid.Parse(*u)
if err == nil {

View file

@ -1,17 +1,12 @@
package messages
import (
"bytes"
"io"
"github.com/pkg/errors"
)
func ReadBatch(b []byte, callback func(Message)) error {
return ReadBatchReader(bytes.NewReader(b), callback)
}
func ReadBatchReader(reader io.Reader, callback func(Message)) error {
func ReadBatchReader(reader io.Reader, messageHandler func(Message)) error {
var index uint64
var timestamp int64
for {
@ -21,7 +16,7 @@ func ReadBatchReader(reader io.Reader, callback func(Message)) error {
} else if err != nil {
return errors.Wrapf(err, "Batch Message decoding error on message with index %v", index)
}
msg = transformDepricated(msg)
msg = transformDeprecated(msg)
isBatchMeta := false
switch m := msg.(type) {
@ -48,37 +43,11 @@ func ReadBatchReader(reader io.Reader, callback func(Message)) error {
}
msg.Meta().Index = index
msg.Meta().Timestamp = timestamp
callback(msg)
messageHandler(msg)
if !isBatchMeta { // Without that indexes will be unique anyway, though shifted by 1 because BatchMeta is not counted in tracker
index++
}
}
return errors.New("Error of the codeflow. (Should return on EOF)")
}
const AVG_MESSAGE_SIZE = 40 // TODO: calculate OR calculate dynamically
func WriteBatch(mList []Message) []byte {
batch := make([]byte, AVG_MESSAGE_SIZE*len(mList))
p := 0
for _, msg := range mList {
msgBytes := msg.Encode()
if len(batch) < p+len(msgBytes) {
newBatch := make([]byte, 2*len(batch)+len(msgBytes))
copy(newBatch, batch)
batch = newBatch
}
copy(batch[p:], msgBytes)
p += len(msgBytes)
}
return batch[:p]
}
func RewriteBatch(reader io.Reader, rewrite func(Message) Message) ([]byte, error) {
mList := make([]Message, 0, 10) // 10?
if err := ReadBatchReader(reader, func(m Message) {
mList = append(mList, rewrite(m))
}); err != nil {
return nil, err
}
return WriteBatch(mList), nil
}

View file

@ -1,6 +1,6 @@
package messages
func transformDepricated(msg Message) Message {
func transformDeprecated(msg Message) Message {
switch m := msg.(type) {
case *MouseClickDepricated:
meta := m.Meta()
@ -10,19 +10,7 @@ func transformDepricated(msg Message) Message {
ID: m.ID,
HesitationTime: m.HesitationTime,
Label: m.Label,
// Selector: '',
}
// case *FetchDepricated:
// return &Fetch {
// Method: m.Method,
// URL: m.URL,
// Request: m.Request,
// Response: m.Response,
// Status: m.Status,
// Timestamp: m.Timestamp,
// Duration: m.Duration,
// // Headers: ''
// }
default:
return msg
}

View file

@ -1,6 +1,7 @@
package queue
import (
"bytes"
"log"
"openreplay/backend/pkg/messages"
@ -9,7 +10,7 @@ import (
func NewMessageConsumer(group string, topics []string, handler types.DecodedMessageHandler, autoCommit bool) types.Consumer {
return NewConsumer(group, topics, func(sessionID uint64, value []byte, meta *types.Meta) {
if err := messages.ReadBatch(value, func(msg messages.Message) {
if err := messages.ReadBatchReader(bytes.NewReader(value), func(msg messages.Message) {
handler(sessionID, msg, meta)
}); err != nil {
log.Printf("Decode error: %v\n", err)

View file

@ -1,36 +0,0 @@
package main
import (
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/url/assets"
)
func sendAssetForCache(sessionID uint64, baseURL string, relativeURL string) {
if fullURL, cacheable := assets.GetFullCachableURL(baseURL, relativeURL); cacheable {
producer.Produce(TOPIC_CACHE, sessionID, messages.Encode(&messages.AssetCache{
URL: fullURL,
}))
}
}
func sendAssetsForCacheFromCSS(sessionID uint64, baseURL string, css string) {
for _, u := range assets.ExtractURLsFromCSS(css) { // TODO: in one shot with rewriting
sendAssetForCache(sessionID, baseURL, u)
}
}
func handleURL(sessionID uint64, baseURL string, url string) string {
if CACHE_ASSESTS {
sendAssetForCache(sessionID, baseURL, url)
return rewriter.RewriteURL(sessionID, baseURL, url)
}
return assets.ResolveURL(baseURL, url)
}
func handleCSS(sessionID uint64, baseURL string, css string) string {
if CACHE_ASSESTS {
sendAssetsForCacheFromCSS(sessionID, baseURL, css)
return rewriter.RewriteCSS(sessionID, baseURL, css)
}
return assets.ResolveCSS(baseURL, css)
}

View file

View file

@ -1 +0,0 @@
package main

View file

@ -1,195 +0,0 @@
package main
import (
"encoding/json"
"errors"
"log"
"math/rand"
"net/http"
"strconv"
"time"
"openreplay/backend/pkg/db/postgres"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/token"
)
const FILES_SIZE_LIMIT int64 = 1e7 // 10Mb
func startSessionHandlerIOS(w http.ResponseWriter, r *http.Request) {
type request struct {
Token string `json:"token"`
ProjectKey *string `json:"projectKey"`
TrackerVersion string `json:"trackerVersion"`
RevID string `json:"revID"`
UserUUID *string `json:"userUUID"`
//UserOS string `json"userOS"` //hardcoded 'MacOS'
UserOSVersion string `json:"userOSVersion"`
UserDevice string `json:"userDevice"`
Timestamp uint64 `json:"timestamp"`
// UserDeviceType uint 0:phone 1:pad 2:tv 3:carPlay 5:mac
// “performances”:{
// “activeProcessorCount”:8,
// “isLowPowerModeEnabled”:0,
// “orientation”:0,
// “systemUptime”:585430,
// “batteryState”:0,
// “thermalState”:0,
// “batteryLevel”:0,
// “processorCount”:8,
// “physicalMemory”:17179869184
// },
}
type response struct {
Token string `json:"token"`
ImagesHashList []string `json:"imagesHashList"`
UserUUID string `json:"userUUID"`
BeaconSizeLimit int64 `json:"beaconSizeLimit"`
SessionID string `json:"sessionID"`
}
startTime := time.Now()
req := &request{}
body := http.MaxBytesReader(w, r.Body, JSON_SIZE_LIMIT)
defer body.Close()
if err := json.NewDecoder(body).Decode(req); err != nil {
responseWithError(w, http.StatusBadRequest, err)
return
}
if req.ProjectKey == nil {
responseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"))
return
}
p, err := pgconn.GetProjectByKey(*req.ProjectKey)
if err != nil {
if postgres.IsNoRowsErr(err) {
responseWithError(w, http.StatusNotFound, errors.New("Project doesn't exist or is not active"))
} else {
responseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
}
return
}
userUUID := getUUID(req.UserUUID)
tokenData, err := tokenizer.Parse(req.Token)
if err != nil { // Starting the new one
dice := byte(rand.Intn(100)) // [0, 100)
if dice >= p.SampleRate {
responseWithError(w, http.StatusForbidden, errors.New("cancel"))
return
}
ua := uaParser.ParseFromHTTPRequest(r)
if ua == nil {
responseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
return
}
sessionID, err := flaker.Compose(uint64(startTime.UnixMilli()))
if err != nil {
responseWithError(w, http.StatusInternalServerError, err)
return
}
// TODO: if EXPIRED => send message for two sessions association
expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond)
tokenData = &token.TokenData{sessionID, expTime.UnixMilli()}
country := geoIP.ExtractISOCodeFromHTTPRequest(r)
// The difference with web is mostly here:
producer.Produce(TOPIC_RAW_IOS, tokenData.ID, Encode(&IOSSessionStart{
Timestamp: req.Timestamp,
ProjectID: uint64(p.ProjectID),
TrackerVersion: req.TrackerVersion,
RevID: req.RevID,
UserUUID: userUUID,
UserOS: "IOS",
UserOSVersion: req.UserOSVersion,
UserDevice: MapIOSDevice(req.UserDevice),
UserDeviceType: GetIOSDeviceType(req.UserDevice),
UserCountry: country,
}))
}
// imagesHashList, err := s3.GetFrequentlyUsedKeys(*(req.EncodedProjectID)) // TODO: reuse index: ~ frequency * size
// if err != nil {
// responseWithError(w, http.StatusInternalServerError, err)
// return
// }
responseWithJSON(w, &response{
// ImagesHashList: imagesHashList,
Token: tokenizer.Compose(*tokenData),
UserUUID: userUUID,
SessionID: strconv.FormatUint(tokenData.ID, 10),
BeaconSizeLimit: BEACON_SIZE_LIMIT,
})
}
func pushMessagesHandlerIOS(w http.ResponseWriter, r *http.Request) {
sessionData, err := tokenizer.ParseFromHTTPRequest(r)
if err != nil {
responseWithError(w, http.StatusUnauthorized, err)
return
}
pushMessages(w, r, sessionData.ID, TOPIC_RAW_IOS)
}
func pushLateMessagesHandlerIOS(w http.ResponseWriter, r *http.Request) {
sessionData, err := tokenizer.ParseFromHTTPRequest(r)
if err != nil && err != token.EXPIRED {
responseWithError(w, http.StatusUnauthorized, err)
return
}
// Check timestamps here?
pushMessages(w, r, sessionData.ID, TOPIC_RAW_IOS)
}
func imagesUploadHandlerIOS(w http.ResponseWriter, r *http.Request) {
log.Printf("recieved imagerequest")
sessionData, err := tokenizer.ParseFromHTTPRequest(r)
if err != nil { // Should accept expired token?
responseWithError(w, http.StatusUnauthorized, err)
return
}
r.Body = http.MaxBytesReader(w, r.Body, FILES_SIZE_LIMIT)
defer r.Body.Close()
err = r.ParseMultipartForm(1e6) // ~1Mb
if err == http.ErrNotMultipart || err == http.ErrMissingBoundary {
responseWithError(w, http.StatusUnsupportedMediaType, err)
// } else if err == multipart.ErrMessageTooLarge // if non-files part exceeds 10 MB
} else if err != nil {
responseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
}
if r.MultipartForm == nil {
responseWithError(w, http.StatusInternalServerError, errors.New("Multipart not parsed"))
}
if len(r.MultipartForm.Value["projectKey"]) == 0 {
responseWithError(w, http.StatusBadRequest, errors.New("projectKey parameter missing")) // status for missing/wrong parameter?
return
}
prefix := r.MultipartForm.Value["projectKey"][0] + "/" + strconv.FormatUint(sessionData.ID, 10) + "/"
for _, fileHeaderList := range r.MultipartForm.File {
for _, fileHeader := range fileHeaderList {
file, err := fileHeader.Open()
if err != nil {
continue // TODO: send server error or accumulate successful files
}
key := prefix + fileHeader.Filename
log.Printf("Uploading image... %v", key)
go func() { //TODO: mime type from header
if err := s3.Upload(file, key, "image/jpeg", false); err != nil {
log.Printf("Upload ios screen error. %v", err)
}
}()
}
}
w.WriteHeader(http.StatusOK)
}

View file

@ -1,241 +0,0 @@
package main
import (
"encoding/json"
"errors"
"log"
"math/rand"
"net/http"
"strconv"
"time"
"openreplay/backend/pkg/db/postgres"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/token"
)
func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
type request struct {
Token string `json:"token"`
UserUUID *string `json:"userUUID"`
RevID string `json:"revID"`
Timestamp uint64 `json:"timestamp"`
TrackerVersion string `json:"trackerVersion"`
IsSnippet bool `json:"isSnippet"`
DeviceMemory uint64 `json:"deviceMemory"`
JsHeapSizeLimit uint64 `json:"jsHeapSizeLimit"`
ProjectKey *string `json:"projectKey"`
Reset bool `json:"reset"`
UserID string `json:"userID"`
}
type response struct {
Timestamp int64 `json:"timestamp"`
Delay int64 `json:"delay"`
Token string `json:"token"`
UserUUID string `json:"userUUID"`
SessionID string `json:"sessionID"`
BeaconSizeLimit int64 `json:"beaconSizeLimit"`
}
startTime := time.Now()
req := &request{}
body := http.MaxBytesReader(w, r.Body, JSON_SIZE_LIMIT) // what if Body == nil?? // use r.ContentLength to return specific error?
defer body.Close()
if err := json.NewDecoder(body).Decode(req); err != nil {
responseWithError(w, http.StatusBadRequest, err)
return
}
if req.ProjectKey == nil {
responseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"))
return
}
p, err := pgconn.GetProjectByKey(*req.ProjectKey)
if err != nil {
if postgres.IsNoRowsErr(err) {
responseWithError(w, http.StatusNotFound, errors.New("Project doesn't exist or capture limit has been reached"))
} else {
responseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
}
return
}
userUUID := getUUID(req.UserUUID)
tokenData, err := tokenizer.Parse(req.Token)
if err != nil || req.Reset { // Starting the new one
dice := byte(rand.Intn(100)) // [0, 100)
if dice >= p.SampleRate {
responseWithError(w, http.StatusForbidden, errors.New("cancel"))
return
}
ua := uaParser.ParseFromHTTPRequest(r)
if ua == nil {
responseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
return
}
sessionID, err := flaker.Compose(uint64(startTime.UnixNano() / 1e6))
if err != nil {
responseWithError(w, http.StatusInternalServerError, err)
return
}
// TODO: if EXPIRED => send message for two sessions association
expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond)
tokenData = &token.TokenData{sessionID, expTime.UnixNano() / 1e6}
country := geoIP.ExtractISOCodeFromHTTPRequest(r)
producer.Produce(TOPIC_RAW_WEB, tokenData.ID, Encode(&SessionStart{
Timestamp: req.Timestamp,
ProjectID: uint64(p.ProjectID),
TrackerVersion: req.TrackerVersion,
RevID: req.RevID,
UserUUID: userUUID,
UserAgent: r.Header.Get("User-Agent"),
UserOS: ua.OS,
UserOSVersion: ua.OSVersion,
UserBrowser: ua.Browser,
UserBrowserVersion: ua.BrowserVersion,
UserDevice: ua.Device,
UserDeviceType: ua.DeviceType,
UserCountry: country,
UserDeviceMemorySize: req.DeviceMemory,
UserDeviceHeapSize: req.JsHeapSizeLimit,
UserID: req.UserID,
}))
}
//delayDuration := time.Now().Sub(startTime)
responseWithJSON(w, &response{
//Timestamp: startTime.UnixNano() / 1e6,
//Delay: delayDuration.Nanoseconds() / 1e6,
Token: tokenizer.Compose(*tokenData),
UserUUID: userUUID,
SessionID: strconv.FormatUint(tokenData.ID, 10),
BeaconSizeLimit: BEACON_SIZE_LIMIT,
})
}
func pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request) {
sessionData, err := tokenizer.ParseFromHTTPRequest(r)
if err != nil {
responseWithError(w, http.StatusUnauthorized, err)
return
}
body := http.MaxBytesReader(w, r.Body, BEACON_SIZE_LIMIT)
defer body.Close()
rewritenBuf, err := RewriteBatch(body, func(msg Message) Message {
switch m := msg.(type) {
case *SetNodeAttributeURLBased:
if m.Name == "src" || m.Name == "href" {
msg = &SetNodeAttribute{
ID: m.ID,
Name: m.Name,
Value: handleURL(sessionData.ID, m.BaseURL, m.Value),
}
} else if m.Name == "style" {
msg = &SetNodeAttribute{
ID: m.ID,
Name: m.Name,
Value: handleCSS(sessionData.ID, m.BaseURL, m.Value),
}
}
case *SetCSSDataURLBased:
msg = &SetCSSData{
ID: m.ID,
Data: handleCSS(sessionData.ID, m.BaseURL, m.Data),
}
case *CSSInsertRuleURLBased:
msg = &CSSInsertRule{
ID: m.ID,
Index: m.Index,
Rule: handleCSS(sessionData.ID, m.BaseURL, m.Rule),
}
}
// switch msg.(type) {
// case *BatchMeta, // TODO: watchout! Meta().Index'es are changed here (though it is still unique for the topic-session pair)
// *SetPageLocation,
// *PageLoadTiming,
// *PageRenderTiming,
// *PerformanceTrack,
// *SetInputTarget,
// *SetInputValue,
// *MouseClick,
// *RawErrorEvent,
// *JSException,
// *ResourceTiming,
// *RawCustomEvent,
// *CustomIssue,
// *Fetch,
// *StateAction,
// *GraphQL,
// *CreateElementNode,
// *CreateTextNode,
// *RemoveNode,
// *CreateDocument,
// *RemoveNodeAttribute,
// *MoveNode,
// *SetCSSData,
// *CSSInsertRule,
// *CSSDeleteRule:
// analyticsMessages = append(analyticsMessages, msg)
//}
return msg
})
if err != nil {
responseWithError(w, http.StatusForbidden, err)
return
}
producer.Produce(TOPIC_RAW_WEB, sessionData.ID, rewritenBuf)
//producer.Produce(TOPIC_ANALYTICS, sessionData.ID, WriteBatch(analyticsMessages))
//duration := time.Now().Sub(startTime)
//log.Printf("Sended batch within %v nsec; %v nsek/byte", duration.Nanoseconds(), duration.Nanoseconds()/int64(len(buf)))
w.WriteHeader(http.StatusOK)
}
func notStartedHandlerWeb(w http.ResponseWriter, r *http.Request) {
type request struct {
ProjectKey *string `json:"projectKey"`
TrackerVersion string `json:"trackerVersion"`
DoNotTrack bool `json:"DoNotTrack"`
// RevID string `json:"revID"`
}
req := &request{}
body := http.MaxBytesReader(w, r.Body, JSON_SIZE_LIMIT)
defer body.Close()
if err := json.NewDecoder(body).Decode(req); err != nil {
responseWithError(w, http.StatusBadRequest, err)
return
}
if req.ProjectKey == nil {
responseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"))
return
}
ua := uaParser.ParseFromHTTPRequest(r) // TODO?: insert anyway
if ua == nil {
responseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
return
}
country := geoIP.ExtractISOCodeFromHTTPRequest(r)
err := pgconn.InsertUnstartedSession(postgres.UnstartedSession{
ProjectKey: *req.ProjectKey,
TrackerVersion: req.TrackerVersion,
DoNotTrack: req.DoNotTrack,
Platform: "web",
UserAgent: r.Header.Get("User-Agent"),
UserOS: ua.OS,
UserOSVersion: ua.OSVersion,
UserBrowser: ua.Browser,
UserBrowserVersion: ua.BrowserVersion,
UserDevice: ua.Device,
UserDeviceType: ua.DeviceType,
UserCountry: country,
})
if err != nil {
log.Printf("Unable to insert Unstarted Session: %v\n", err)
}
w.WriteHeader(http.StatusOK)
}

View file

@ -1,156 +0,0 @@
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"golang.org/x/net/http2"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/env"
"openreplay/backend/pkg/flakeid"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/storage"
"openreplay/backend/pkg/token"
"openreplay/backend/pkg/url/assets"
"openreplay/backend/services/http/geoip"
"openreplay/backend/services/http/uaparser"
"openreplay/backend/pkg/pprof"
)
var rewriter *assets.Rewriter
var producer types.Producer
var pgconn *cache.PGCache
var flaker *flakeid.Flaker
var uaParser *uaparser.UAParser
var geoIP *geoip.GeoIP
var tokenizer *token.Tokenizer
var s3 *storage.S3
var TOPIC_RAW_WEB string
var TOPIC_RAW_IOS string
var TOPIC_CACHE string
var TOPIC_TRIGGER string
//var TOPIC_ANALYTICS string
var CACHE_ASSESTS bool
var BEACON_SIZE_LIMIT int64
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
pprof.StartProfilingServer()
producer = queue.NewProducer()
defer producer.Close(15000)
TOPIC_RAW_WEB = env.String("TOPIC_RAW_WEB")
TOPIC_RAW_IOS = env.String("TOPIC_RAW_IOS")
TOPIC_CACHE = env.String("TOPIC_CACHE")
TOPIC_TRIGGER = env.String("TOPIC_TRIGGER")
//TOPIC_ANALYTICS = env.String("TOPIC_ANALYTICS")
rewriter = assets.NewRewriter(env.String("ASSETS_ORIGIN"))
pgconn = cache.NewPGCache(postgres.NewConn(env.String("POSTGRES_STRING")), 1000*60*20)
defer pgconn.Close()
s3 = storage.NewS3(env.String("AWS_REGION"), env.String("S3_BUCKET_IOS_IMAGES"))
tokenizer = token.NewTokenizer(env.String("TOKEN_SECRET"))
uaParser = uaparser.NewUAParser(env.String("UAPARSER_FILE"))
geoIP = geoip.NewGeoIP(env.String("MAXMINDDB_FILE"))
flaker = flakeid.NewFlaker(env.WorkerID())
CACHE_ASSESTS = env.Bool("CACHE_ASSETS")
BEACON_SIZE_LIMIT = int64(env.Uint64("BEACON_SIZE_LIMIT"))
HTTP_PORT := env.String("HTTP_PORT")
server := &http.Server{
Addr: ":" + HTTP_PORT,
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// TODO: agree with specification
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type,Authorization")
if r.Method == http.MethodOptions {
w.Header().Set("Cache-Control", "max-age=86400")
w.WriteHeader(http.StatusOK)
return
}
log.Printf("Request: %v - %v ", r.Method, r.URL.Path)
switch r.URL.Path {
case "/":
w.WriteHeader(http.StatusOK)
case "/v1/web/not-started":
switch r.Method {
case http.MethodPost:
notStartedHandlerWeb(w, r)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
case "/v1/web/start":
switch r.Method {
case http.MethodPost:
startSessionHandlerWeb(w, r)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
case "/v1/web/i":
switch r.Method {
case http.MethodPost:
pushMessagesHandlerWeb(w, r)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
case "/v1/ios/start":
switch r.Method {
case http.MethodPost:
startSessionHandlerIOS(w, r)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
case "/v1/ios/i":
switch r.Method {
case http.MethodPost:
pushMessagesHandlerIOS(w, r)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
case "/v1/ios/late":
switch r.Method {
case http.MethodPost:
pushLateMessagesHandlerIOS(w, r)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
case "/v1/ios/images":
switch r.Method {
case http.MethodPost:
imagesUploadHandlerIOS(w, r)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
default:
w.WriteHeader(http.StatusNotFound)
}
}),
}
http2.ConfigureServer(server, nil)
go func() {
if err := server.ListenAndServe(); err != nil {
log.Printf("Server error: %v\n", err)
log.Fatal("Server error")
}
}()
log.Printf("Server successfully started on port %v\n", HTTP_PORT)
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
<-sigchan
log.Printf("Shutting down the server\n")
server.Shutdown(context.Background())
}

View file

@ -38,8 +38,8 @@ Use the following instructions if youre running Windows 10 or Windows 8:
<ip address from vagrant output> openreplay.local
Select File > Save to save your changes.
**Open browser**
http://openreplay.local
**Open the below URL and create an account**
http://openreplay.local/signup
```
### To start developing