feat(ui) - funnels - merged dev

This commit is contained in:
Shekar Siri 2022-05-10 12:10:18 +02:00
commit 89db14bdbf
222 changed files with 9038 additions and 5420 deletions

View file

@ -47,7 +47,13 @@ jobs:
#
# Getting the images to build
#
git diff --name-only HEAD HEAD~1 | grep backend/services | cut -d '/' -f3 | uniq > backend/images_to_build.txt
{
git diff --name-only HEAD HEAD~1 | grep -E "backend/cmd|backend/services" | grep -vE ^ee/ | cut -d '/' -f3
git diff --name-only HEAD HEAD~1 | grep -E "backend/pkg|backend/internal" | grep -vE ^ee/ | cut -d '/' -f3 | uniq | while read -r pkg_name ; do
grep -rl "pkg/$pkg_name" backend/services backend/cmd | cut -d '/' -f3
done
} | uniq > backend/images_to_build.txt
[[ $(cat backend/images_to_build.txt) != "" ]] || (echo "Nothing to build here"; exit 0)
#
# Pushing image to registry

View file

@ -49,10 +49,10 @@ jobs:
#
{
git diff --name-only HEAD HEAD~1 | grep backend/services | grep -vE ^ee/ | cut -d '/' -f3
git diff --name-only HEAD HEAD~1 | grep -E "backend/cmd|backend/services" | grep -vE ^ee/ | cut -d '/' -f3
git diff --name-only HEAD HEAD~1 | grep backend/pkg | grep -vE ^ee/ | cut -d '/' -f3 | uniq | while read -r pkg_name ; do
grep -rl "pkg/$pkg_name" backend/services | cut -d '/' -f3
git diff --name-only HEAD HEAD~1 | grep -E "backend/pkg|backend/internal" | grep -vE ^ee/ | cut -d '/' -f3 | uniq | while read -r pkg_name ; do
grep -rl "pkg/$pkg_name" backend/services backend/cmd | cut -d '/' -f3
done
} | uniq > backend/images_to_build.txt

6
api/.dockerignore Normal file
View file

@ -0,0 +1,6 @@
# ignore .git and .cache folders
.git
.cache
**/build.sh
**/build_*.sh
**/*deploy.sh

View file

@ -11,8 +11,8 @@ RUN apt update && apt install -y curl && \
curl -fsSL https://deb.nodesource.com/setup_12.x | bash - && \
apt install -y nodejs && \
apt remove --purge -y curl && \
rm -rf /var/lib/apt/lists/* && \
cd sourcemap-reader && \
rm -rf /var/lib/apt/lists/*
RUN cd sourcemap-reader && \
npm install
# Add Tini
@ -23,4 +23,4 @@ ENV ENTERPRISE_BUILD ${envarg}
ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini /tini
RUN chmod +x /tini
ENTRYPOINT ["/tini", "--"]
CMD ./entrypoint.sh
CMD ./entrypoint.sh

View file

@ -12,9 +12,9 @@ envarg="default-foss"
check_prereq() {
which docker || {
echo "Docker not installed, please install docker."
exit=1
exit 1
}
[[ exit -eq 1 ]] && exit 1
return
}
function build_api(){
@ -32,9 +32,11 @@ function build_api(){
docker push ${DOCKER_REPO:-'local'}/chalice:${git_sha1}
docker tag ${DOCKER_REPO:-'local'}/chalice:${git_sha1} ${DOCKER_REPO:-'local'}/chalice:${tag}latest
docker push ${DOCKER_REPO:-'local'}/chalice:${tag}latest
}
}
echo "api docker build completed"
}
check_prereq
build_api $1
IMAGE_TAG=$IMAGE_TAG PUSH_IMAGE=$PUSH_IMAGE DOCKER_REPO=$DOCKER_REPO bash build_alerts.sh $1
echo buil_complete
IMAGE_TAG=$IMAGE_TAG PUSH_IMAGE=$PUSH_IMAGE DOCKER_REPO=$DOCKER_REPO bash build_alerts.sh $1

View file

@ -27,7 +27,7 @@ function make_submodule() {
mkdir -p ./alerts/chalicelib/
cp -R ./chalicelib/__init__.py ./alerts/chalicelib/
mkdir -p ./alerts/chalicelib/core/
cp -R ./chalicelib/core/{__init__,alerts_processor,alerts_listener,sessions,events,issues,sessions_metas,metadata,projects,users,authorizers,tenants,roles,assist,events_ios,sessions_mobs,errors,dashboard,sourcemaps,sourcemaps_parser,resources,performance_event,alerts,notifications,slack,collaboration_slack,webhook}.py ./alerts/chalicelib/core/
cp -R ./chalicelib/core/{__init__,alerts_processor,alerts_listener,sessions,events,issues,sessions_metas,metadata,projects,users,authorizers,tenants,roles,assist,events_ios,sessions_mobs,errors,metrics,sourcemaps,sourcemaps_parser,resources,performance_event,alerts,notifications,slack,collaboration_slack,webhook}.py ./alerts/chalicelib/core/
mkdir -p ./alerts/chalicelib/utils/
cp -R ./chalicelib/utils/{__init__,TimeUTC,pg_client,helper,event_filter_definition,dev,SAML2_helper,email_helper,email_handler,smtp,s3,args_transformer,ch_client,metrics_helper}.py ./alerts/chalicelib/utils/
# -- end of generated part
@ -64,7 +64,8 @@ function build_api(){
docker tag ${DOCKER_REPO:-'local'}/alerts:${git_sha1} ${DOCKER_REPO:-'local'}/alerts:${tag}latest
docker push ${DOCKER_REPO:-'local'}/alerts:${tag}latest
}
echo "completed alerts build"
}
check_prereq
build_api $1
build_api $1

View file

@ -41,26 +41,14 @@ def __create(tenant_id, name):
return get_project(tenant_id=tenant_id, project_id=project_id, include_gdpr=True)
def get_projects(tenant_id, recording_state=False, gdpr=None, recorded=False, stack_integrations=False, version=False,
last_tracker_version=None):
def get_projects(tenant_id, recording_state=False, gdpr=None, recorded=False, stack_integrations=False):
with pg_client.PostgresClient() as cur:
tracker_query = ""
if last_tracker_version is not None and len(last_tracker_version) > 0:
tracker_query = cur.mogrify(
""",(SELECT tracker_version FROM public.sessions
WHERE sessions.project_id = s.project_id
AND tracker_version=%(version)s AND tracker_version IS NOT NULL LIMIT 1) AS tracker_version""",
{"version": last_tracker_version}).decode('UTF-8')
elif version:
tracker_query = ",(SELECT tracker_version FROM public.sessions WHERE sessions.project_id = s.project_id ORDER BY start_ts DESC LIMIT 1) AS tracker_version"
cur.execute(f"""\
SELECT
s.project_id, s.name, s.project_key, s.save_request_payloads
{',s.gdpr' if gdpr else ''}
{',COALESCE((SELECT TRUE FROM public.sessions WHERE sessions.project_id = s.project_id LIMIT 1), FALSE) AS recorded' if recorded else ''}
{',stack_integrations.count>0 AS stack_integrations' if stack_integrations else ''}
{tracker_query}
FROM public.projects AS s
{'LEFT JOIN LATERAL (SELECT COUNT(*) AS count FROM public.integrations WHERE s.project_id = integrations.project_id LIMIT 1) AS stack_integrations ON TRUE' if stack_integrations else ''}
WHERE s.deleted_at IS NULL
@ -90,19 +78,8 @@ def get_projects(tenant_id, recording_state=False, gdpr=None, recorded=False, st
return helper.list_to_camel_case(rows)
def get_project(tenant_id, project_id, include_last_session=False, include_gdpr=None, version=False,
last_tracker_version=None):
def get_project(tenant_id, project_id, include_last_session=False, include_gdpr=None):
with pg_client.PostgresClient() as cur:
tracker_query = ""
if last_tracker_version is not None and len(last_tracker_version) > 0:
tracker_query = cur.mogrify(
""",(SELECT tracker_version FROM public.sessions
WHERE sessions.project_id = s.project_id
AND tracker_version=%(version)s AND tracker_version IS NOT NULL LIMIT 1) AS tracker_version""",
{"version": last_tracker_version}).decode('UTF-8')
elif version:
tracker_query = ",(SELECT tracker_version FROM public.sessions WHERE sessions.project_id = s.project_id ORDER BY start_ts DESC LIMIT 1) AS tracker_version"
query = cur.mogrify(f"""\
SELECT
s.project_id,
@ -111,7 +88,6 @@ def get_project(tenant_id, project_id, include_last_session=False, include_gdpr=
s.save_request_payloads
{",(SELECT max(ss.start_ts) FROM public.sessions AS ss WHERE ss.project_id = %(project_id)s) AS last_recorded_session_at" if include_last_session else ""}
{',s.gdpr' if include_gdpr else ''}
{tracker_query}
FROM public.projects AS s
where s.project_id =%(project_id)s
AND s.deleted_at IS NULL

30
api/development.md Normal file
View file

@ -0,0 +1,30 @@
### Prerequisites
- [Vagrant](../scripts/vagrant/README.md)
- Python 3.9
- Pipenv
### Development environment
```bash
cd openreplay/api
# Make your own copy of .env file and edit it as you want
cp .env.dev .env
# Create a .venv folder to contain all you dependencies
mkdir .venv
# Installing dependencies (pipenv will detect the .venv folder and use it as a target)
pipenv install -r requirements.txt [--skip-lock]
```
### Building and deploying locally
```bash
cd openreplay-contributions
vagrant ssh
cd openreplay-dev/openreplay/scripts/helmcharts
# For complete list of options
# bash local_deploy.sh help
bash local_deploy.sh api
```

View file

@ -91,10 +91,9 @@ def get_projects_limit(context: schemas.CurrentContext = Depends(OR_context)):
@app.get('/projects/{projectId}', tags=['projects'])
def get_project(projectId: int, last_tracker_version: Optional[str] = None,
context: schemas.CurrentContext = Depends(OR_context)):
def get_project(projectId: int, context: schemas.CurrentContext = Depends(OR_context)):
data = projects.get_project(tenant_id=context.tenant_id, project_id=projectId, include_last_session=True,
include_gdpr=True, last_tracker_version=last_tracker_version)
include_gdpr=True)
if data is None:
return {"errors": ["project not found"]}
return {"data": data}
@ -223,7 +222,6 @@ def get_client(context: schemas.CurrentContext = Depends(OR_context)):
@app.get('/projects', tags=['projects'])
def get_projects(last_tracker_version: Optional[str] = None, context: schemas.CurrentContext = Depends(OR_context)):
def get_projects(context: schemas.CurrentContext = Depends(OR_context)):
return {"data": projects.get_projects(tenant_id=context.tenant_id, recording_state=True, gdpr=True, recorded=True,
stack_integrations=True, version=True,
last_tracker_version=last_tracker_version)}
stack_integrations=True)}

6
backend/.dockerignore Normal file
View file

@ -0,0 +1,6 @@
# ignore .git and .cache folders
.git
.cache
**/build.sh
**/build_*.sh
**/*deploy.sh

View file

@ -1,4 +1,4 @@
FROM golang:1.13-alpine3.10 AS prepare
FROM golang:1.18-alpine3.15 AS prepare
RUN apk add --no-cache git openssh openssl-dev pkgconf gcc g++ make libc-dev bash
@ -13,7 +13,7 @@ FROM prepare AS build
COPY pkg pkg
COPY services services
RUN for name in alerts assets db ender http integrations sink storage;do CGO_ENABLED=1 GOOS=linux GOARCH=amd64 go build -o bin/$name -tags musl openreplay/backend/services/$name; done
RUN for name in assets db ender http integrations sink storage;do CGO_ENABLED=1 GOOS=linux GOARCH=amd64 go build -o bin/$name -tags musl openreplay/backend/services/$name; done
FROM alpine
@ -26,8 +26,9 @@ ENV TZ=UTC \
MAXMINDDB_FILE=/root/geoip.mmdb \
UAPARSER_FILE=/root/regexes.yaml \
HTTP_PORT=80 \
BEACON_SIZE_LIMIT=1000000 \
BEACON_SIZE_LIMIT=7000000 \
KAFKA_USE_SSL=true \
KAFKA_MAX_POLL_INTERVAL_MS=400000 \
REDIS_STREAMS_MAX_LEN=3000 \
TOPIC_RAW_WEB=raw \
TOPIC_RAW_IOS=raw-ios \
@ -42,10 +43,10 @@ ENV TZ=UTC \
AWS_REGION_WEB=eu-central-1 \
AWS_REGION_IOS=eu-west-1 \
AWS_REGION_ASSETS=eu-central-1 \
CACHE_ASSETS=false \
CACHE_ASSETS=true \
ASSETS_SIZE_LIMIT=6291456 \
FS_CLEAN_HRS=12
FS_CLEAN_HRS=12 \
LOG_QUEUE_STATS_INTERVAL_SEC=60
RUN mkdir $FS_DIR
#VOLUME [ $FS_DIR ] # Uncomment in case of using Bind mount.

38
backend/build.sh Normal file → Executable file
View file

@ -13,9 +13,31 @@ ee="false"
check_prereq() {
which docker || {
echo "Docker not installed, please install docker."
exit=1
exit 1
}
[[ exit -eq 1 ]] && exit 1
return
}
function build_service() {
image="$1"
echo "BUILDING $image"
case "$image" in
http | db)
echo build http
docker build -t ${DOCKER_REPO:-'local'}/$image:${git_sha1} --platform linux/amd64 --build-arg SERVICE_NAME=$image -f ./cmd/Dockerfile .
[[ $PUSH_IMAGE -eq 1 ]] && {
docker push ${DOCKER_REPO:-'local'}/$image:${git_sha1}
}
;;
*)
docker build -t ${DOCKER_REPO:-'local'}/$image:${git_sha1} --platform linux/amd64 --build-arg SERVICE_NAME=$image .
[[ $PUSH_IMAGE -eq 1 ]] && {
docker push ${DOCKER_REPO:-'local'}/$image:${git_sha1}
}
;;
esac
return
}
function build_api(){
@ -25,21 +47,15 @@ function build_api(){
ee="true"
}
[[ $2 != "" ]] && {
image="$2"
docker build -t ${DOCKER_REPO:-'local'}/$image:${git_sha1} --build-arg SERVICE_NAME=$image .
[[ $PUSH_IMAGE -eq 1 ]] && {
docker push ${DOCKER_REPO:-'local'}/$image:${git_sha1}
}
build_service $2
return
}
for image in $(ls services);
do
docker build -t ${DOCKER_REPO:-'local'}/$image:${git_sha1} --build-arg SERVICE_NAME=$image .
[[ $PUSH_IMAGE -eq 1 ]] && {
docker push ${DOCKER_REPO:-'local'}/$image:${git_sha1}
}
build_service $image
echo "::set-output name=image::${DOCKER_REPO:-'local'}/$image:${git_sha1}"
done
echo "backend build completed"
}
check_prereq

60
backend/cmd/Dockerfile Normal file
View file

@ -0,0 +1,60 @@
FROM golang:1.18-alpine3.15 AS prepare
RUN apk add --no-cache git openssh openssl-dev pkgconf gcc g++ make libc-dev bash
WORKDIR /root
COPY go.mod .
COPY go.sum .
RUN go mod download
FROM prepare AS build
COPY pkg pkg
COPY services services
COPY internal internal
COPY cmd cmd
ARG SERVICE_NAME
RUN CGO_ENABLED=1 GOOS=linux GOARCH=amd64 go build -o service -tags musl openreplay/backend/cmd/$SERVICE_NAME
FROM alpine AS entrypoint
RUN apk add --no-cache ca-certificates
ENV TZ=UTC \
FS_ULIMIT=1000 \
FS_DIR=/mnt/efs \
MAXMINDDB_FILE=/root/geoip.mmdb \
UAPARSER_FILE=/root/regexes.yaml \
HTTP_PORT=80 \
BEACON_SIZE_LIMIT=7000000 \
KAFKA_USE_SSL=true \
KAFKA_MAX_POLL_INTERVAL_MS=400000 \
REDIS_STREAMS_MAX_LEN=3000 \
TOPIC_RAW_WEB=raw \
TOPIC_RAW_IOS=raw-ios \
TOPIC_CACHE=cache \
TOPIC_ANALYTICS=analytics \
TOPIC_TRIGGER=trigger \
GROUP_SINK=sink \
GROUP_STORAGE=storage \
GROUP_DB=db \
GROUP_ENDER=ender \
GROUP_CACHE=cache \
AWS_REGION_WEB=eu-central-1 \
AWS_REGION_IOS=eu-west-1 \
AWS_REGION_ASSETS=eu-central-1 \
CACHE_ASSETS=true \
ASSETS_SIZE_LIMIT=6291456 \
FS_CLEAN_HRS=72 \
LOG_QUEUE_STATS_INTERVAL_SEC=60
ARG SERVICE_NAME
RUN if [ "$SERVICE_NAME" = "http" ]; then \
wget https://raw.githubusercontent.com/ua-parser/uap-core/master/regexes.yaml -O "$UAPARSER_FILE" &&\
wget https://static.openreplay.com/geoip/GeoLite2-Country.mmdb -O "$MAXMINDDB_FILE"; fi
COPY --from=build /root/service /root/service
ENTRYPOINT /root/service

117
backend/cmd/db/main.go Normal file
View file

@ -0,0 +1,117 @@
package main
import (
"log"
"openreplay/backend/internal/config/db"
"openreplay/backend/internal/datasaver"
"openreplay/backend/internal/heuristics"
"time"
"os"
"os/signal"
"syscall"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/postgres"
logger "openreplay/backend/pkg/log"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
)
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := db.New()
// Init database
pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres), cfg.ProjectExpirationTimeoutMs)
defer pg.Close()
// Init modules
heurFinder := heuristics.NewHandler()
saver := datasaver.New(pg)
statsLogger := logger.NewQueueStats(cfg.LoggerTimeout)
// Handler logic
handler := func(sessionID uint64, msg messages.Message, meta *types.Meta) {
statsLogger.Collect(sessionID, meta)
// Just save session data into db without additional checks
if err := saver.InsertMessage(sessionID, msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg)
}
return
}
// Try to get session from db for the following handlers
session, err := pg.GetSession(sessionID)
if err != nil {
// Might happen due to the assets-related message TODO: log only if session is necessary for this kind of message
log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, sessionID, msg)
return
}
// Save statistics to db
err = saver.InsertStats(session, msg)
if err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg)
}
// Handle heuristics and save to temporary queue in memory
heurFinder.HandleMessage(session, msg)
// Process saved heuristics messages as usual messages above in the code
heurFinder.IterateSessionReadyMessages(sessionID, func(msg messages.Message) {
// TODO: DRY code (carefully with the return statement logic)
if err := saver.InsertMessage(sessionID, msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
return
}
if err := saver.InsertStats(session, msg); err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
})
}
// Init consumer
consumer := queue.NewMessageConsumer(
cfg.GroupDB,
[]string{
cfg.TopicRawIOS,
cfg.TopicTrigger,
},
handler,
false,
)
log.Printf("Db service started\n")
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
tick := time.Tick(cfg.CommitBatchTimeout)
for {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
consumer.Close()
os.Exit(0)
case <-tick:
pg.CommitBatches()
// TODO?: separate stats & regular messages
if err := consumer.Commit(); err != nil {
log.Printf("Error on consumer commit: %v", err)
}
default:
err := consumer.ConsumeNext()
if err != nil {
log.Fatalf("Error on consumption: %v", err) // TODO: is always fatal?
}
}
}
}

62
backend/cmd/http/main.go Normal file
View file

@ -0,0 +1,62 @@
package main
import (
"log"
"openreplay/backend/internal/config"
"openreplay/backend/internal/router"
"openreplay/backend/internal/server"
"openreplay/backend/internal/services"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/pprof"
"openreplay/backend/pkg/queue"
"os"
"os/signal"
"syscall"
)
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
pprof.StartProfilingServer()
// Load configuration
cfg := config.New()
// Connect to queue
producer := queue.NewProducer()
defer producer.Close(15000)
// Connect to database
dbConn := cache.NewPGCache(postgres.NewConn(cfg.Postgres), 1000*60*20)
defer dbConn.Close()
// Build all services
services := services.New(cfg, producer, dbConn)
// Init server's routes
router, err := router.NewRouter(cfg, services)
if err != nil {
log.Fatalf("failed while creating engine: %s", err)
}
// Init server
server, err := server.New(router.GetHandler(), cfg.HTTPHost, cfg.HTTPPort, cfg.HTTPTimeout)
if err != nil {
log.Fatalf("failed while creating server: %s", err)
}
// Run server
go func() {
if err := server.Start(); err != nil {
log.Fatalf("Server error: %v\n", err)
}
}()
log.Printf("Server successfully started on port %v\n", cfg.HTTPPort)
// Wait stop signal to shut down server gracefully
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
<-sigchan
log.Printf("Shutting down the server\n")
server.Stop()
}

14
backend/development.md Normal file
View file

@ -0,0 +1,14 @@
### Prerequisites
- [Vagrant](../scripts/vagrant/README.md)
### Building and deploying locally
```bash
cd openreplay-contributions
vagrant ssh
cd openreplay-dev/openreplay/scripts/helmcharts
# For complete list of options
# bash local_deploy.sh help
bash local_deploy.sh <worker name>
```

View file

@ -0,0 +1,83 @@
package assetscache
import (
"openreplay/backend/internal/config"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/url/assets"
)
type AssetsCache struct {
cfg *config.Config
rewriter *assets.Rewriter
producer types.Producer
}
func New(cfg *config.Config, rewriter *assets.Rewriter, producer types.Producer) *AssetsCache {
return &AssetsCache{
cfg: cfg,
rewriter: rewriter,
producer: producer,
}
}
func (e *AssetsCache) ParseAssets(sessID uint64, msg messages.Message) messages.Message {
switch m := msg.(type) {
case *messages.SetNodeAttributeURLBased:
if m.Name == "src" || m.Name == "href" {
return &messages.SetNodeAttribute{
ID: m.ID,
Name: m.Name,
Value: e.handleURL(sessID, m.BaseURL, m.Value),
}
} else if m.Name == "style" {
return &messages.SetNodeAttribute{
ID: m.ID,
Name: m.Name,
Value: e.handleCSS(sessID, m.BaseURL, m.Value),
}
}
case *messages.SetCSSDataURLBased:
return &messages.SetCSSData{
ID: m.ID,
Data: e.handleCSS(sessID, m.BaseURL, m.Data),
}
case *messages.CSSInsertRuleURLBased:
return &messages.CSSInsertRule{
ID: m.ID,
Index: m.Index,
Rule: e.handleCSS(sessID, m.BaseURL, m.Rule),
}
}
return msg
}
func (e *AssetsCache) sendAssetForCache(sessionID uint64, baseURL string, relativeURL string) {
if fullURL, cacheable := assets.GetFullCachableURL(baseURL, relativeURL); cacheable {
e.producer.Produce(e.cfg.TopicCache, sessionID, messages.Encode(&messages.AssetCache{
URL: fullURL,
}))
}
}
func (e *AssetsCache) sendAssetsForCacheFromCSS(sessionID uint64, baseURL string, css string) {
for _, u := range assets.ExtractURLsFromCSS(css) { // TODO: in one shot with rewriting
e.sendAssetForCache(sessionID, baseURL, u)
}
}
func (e *AssetsCache) handleURL(sessionID uint64, baseURL string, url string) string {
if e.cfg.CacheAssets {
e.sendAssetForCache(sessionID, baseURL, url)
return e.rewriter.RewriteURL(sessionID, baseURL, url)
}
return assets.ResolveURL(baseURL, url)
}
func (e *AssetsCache) handleCSS(sessionID uint64, baseURL string, css string) string {
if e.cfg.CacheAssets {
e.sendAssetsForCacheFromCSS(sessionID, baseURL, css)
return e.rewriter.RewriteCSS(sessionID, baseURL, css)
}
return assets.ResolveCSS(baseURL, css)
}

View file

@ -0,0 +1,50 @@
package config
import (
"openreplay/backend/pkg/env"
"time"
)
type Config struct {
HTTPHost string
HTTPPort string
HTTPTimeout time.Duration
TopicRawWeb string
TopicRawIOS string
TopicCache string
CacheAssets bool
BeaconSizeLimit int64
JsonSizeLimit int64
FileSizeLimit int64
AssetsOrigin string
AWSRegion string
S3BucketIOSImages string
Postgres string
TokenSecret string
UAParserFile string
MaxMinDBFile string
WorkerID uint16
}
func New() *Config {
return &Config{
HTTPHost: "", // empty by default
HTTPPort: env.String("HTTP_PORT"),
HTTPTimeout: time.Second * 60,
TopicRawWeb: env.String("TOPIC_RAW_WEB"),
TopicRawIOS: env.String("TOPIC_RAW_IOS"),
TopicCache: env.String("TOPIC_CACHE"),
CacheAssets: env.Bool("CACHE_ASSETS"),
BeaconSizeLimit: int64(env.Uint64("BEACON_SIZE_LIMIT")),
JsonSizeLimit: 1e3, // 1Kb
FileSizeLimit: 1e7, // 10Mb
AssetsOrigin: env.String("ASSETS_ORIGIN"),
AWSRegion: env.String("AWS_REGION"),
S3BucketIOSImages: env.String("S3_BUCKET_IOS_IMAGES"),
Postgres: env.String("POSTGRES_STRING"),
TokenSecret: env.String("TOKEN_SECRET"),
UAParserFile: env.String("UAPARSER_FILE"),
MaxMinDBFile: env.String("MAXMINDDB_FILE"),
WorkerID: env.WorkerID(),
}
}

View file

@ -0,0 +1,28 @@
package db
import (
"openreplay/backend/pkg/env"
"time"
)
type Config struct {
Postgres string
ProjectExpirationTimeoutMs int64
LoggerTimeout int
GroupDB string
TopicRawIOS string
TopicTrigger string
CommitBatchTimeout time.Duration
}
func New() *Config {
return &Config{
Postgres: env.String("POSTGRES_STRING"),
ProjectExpirationTimeoutMs: 1000 * 60 * 20,
LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"),
GroupDB: env.String("GROUP_DB"),
TopicRawIOS: env.String("TOPIC_RAW_IOS"),
TopicTrigger: env.String("TOPIC_TRIGGER"),
CommitBatchTimeout: 15 * time.Second,
}
}

View file

@ -0,0 +1,66 @@
package datasaver
import (
. "openreplay/backend/pkg/messages"
)
func (mi *Saver) InsertMessage(sessionID uint64, msg Message) error {
switch m := msg.(type) {
// Common
case *Metadata:
return mi.pg.InsertMetadata(sessionID, m)
case *IssueEvent:
return mi.pg.InsertIssueEvent(sessionID, m)
//TODO: message adapter (transformer) (at the level of pkg/message) for types: *IOSMetadata, *IOSIssueEvent and others
// Web
case *SessionStart:
return mi.pg.InsertWebSessionStart(sessionID, m)
case *SessionEnd:
return mi.pg.InsertWebSessionEnd(sessionID, m)
case *UserID:
return mi.pg.InsertWebUserID(sessionID, m)
case *UserAnonymousID:
return mi.pg.InsertWebUserAnonymousID(sessionID, m)
case *CustomEvent:
return mi.pg.InsertWebCustomEvent(sessionID, m)
case *ClickEvent:
return mi.pg.InsertWebClickEvent(sessionID, m)
case *InputEvent:
return mi.pg.InsertWebInputEvent(sessionID, m)
// Unique Web messages
case *PageEvent:
return mi.pg.InsertWebPageEvent(sessionID, m)
case *ErrorEvent:
return mi.pg.InsertWebErrorEvent(sessionID, m)
case *FetchEvent:
return mi.pg.InsertWebFetchEvent(sessionID, m)
case *GraphQLEvent:
return mi.pg.InsertWebGraphQLEvent(sessionID, m)
// IOS
case *IOSSessionStart:
return mi.pg.InsertIOSSessionStart(sessionID, m)
case *IOSSessionEnd:
return mi.pg.InsertIOSSessionEnd(sessionID, m)
case *IOSUserID:
return mi.pg.InsertIOSUserID(sessionID, m)
case *IOSUserAnonymousID:
return mi.pg.InsertIOSUserAnonymousID(sessionID, m)
case *IOSCustomEvent:
return mi.pg.InsertIOSCustomEvent(sessionID, m)
case *IOSClickEvent:
return mi.pg.InsertIOSClickEvent(sessionID, m)
case *IOSInputEvent:
return mi.pg.InsertIOSInputEvent(sessionID, m)
// Unique IOS messages
case *IOSNetworkCall:
return mi.pg.InsertIOSNetworkCall(sessionID, m)
case *IOSScreenEnter:
return mi.pg.InsertIOSScreenEnter(sessionID, m)
case *IOSCrash:
return mi.pg.InsertIOSCrash(sessionID, m)
}
return nil // "Not implemented"
}

View file

@ -0,0 +1,11 @@
package datasaver
import "openreplay/backend/pkg/db/cache"
type Saver struct {
pg *cache.PGCache
}
func New(pg *cache.PGCache) *Saver {
return &Saver{pg: pg}
}

View file

@ -0,0 +1,19 @@
package datasaver
import (
. "openreplay/backend/pkg/db/types"
. "openreplay/backend/pkg/messages"
)
func (si *Saver) InsertStats(session *Session, msg Message) error {
switch m := msg.(type) {
// Web
case *PerformanceTrackAggr:
return si.pg.InsertWebStatsPerformance(session.SessionID, m)
case *ResourceEvent:
return si.pg.InsertWebStatsResourceEvent(session.SessionID, m)
case *LongTask:
return si.pg.InsertWebStatsLongtask(session.SessionID, m)
}
return nil
}

View file

@ -1,26 +1,23 @@
package heuristics
import (
. "openreplay/backend/pkg/messages"
. "openreplay/backend/pkg/messages"
)
const MIN_TIME_AFTER_LAST_HEARTBEAT = 60 * 1000
type anr struct {
readyMessageStore
lastLabel string
lastLabel string
lastHeartbeatTimestamp uint64
lastHeartbeatIndex uint64
lastHeartbeatIndex uint64
}
func (h *anr) buildIf(timestamp uint64) {
if h.lastHeartbeatTimestamp != 0 && h.lastHeartbeatTimestamp + MIN_TIME_AFTER_LAST_HEARTBEAT <= timestamp {
if h.lastHeartbeatTimestamp != 0 && h.lastHeartbeatTimestamp+MIN_TIME_AFTER_LAST_HEARTBEAT <= timestamp {
m := &IOSIssueEvent{
Type: "anr",
Type: "anr",
ContextString: h.lastLabel,
//Context: "{}",
//Payload: fmt.SPrint
}
m.Timestamp = h.lastHeartbeatTimestamp
m.Index = h.lastHeartbeatIndex // Associated Index/ MessageID ?
@ -49,4 +46,4 @@ func (h *anr) HandleMessage(msg Message) {
case *IOSSessionEnd:
h.buildIf(m.Timestamp)
}
}
}

View file

@ -1,29 +1,26 @@
package heuristics
import (
. "openreplay/backend/pkg/messages"
. "openreplay/backend/pkg/messages"
)
const CLICK_TIME_DIFF = 200
const MIN_CLICKS_IN_A_ROW = 3
type clickrage struct {
readyMessageStore
lastTimestamp uint64
lastLabel string
lastTimestamp uint64
lastLabel string
firstInARawTimestamp uint64
firstInARawSeqIndex uint64
countsInARow int
firstInARawSeqIndex uint64
countsInARow int
}
func (h *clickrage) build() {
if h.countsInARow >= MIN_CLICKS_IN_A_ROW {
m := &IOSIssueEvent{
Type: "click_rage",
Type: "click_rage",
ContextString: h.lastLabel,
//Context: "{}",
//Payload: fmt.SPrint
}
m.Timestamp = h.firstInARawTimestamp
m.Index = h.firstInARawSeqIndex // Associated Index/ MessageID ?
@ -39,7 +36,7 @@ func (h *clickrage) build() {
func (h *clickrage) HandleMessage(msg Message) {
switch m := msg.(type) {
case *IOSClickEvent:
if h.lastTimestamp + CLICK_TIME_DIFF < m.Timestamp && h.lastLabel == m.Label {
if h.lastTimestamp+CLICK_TIME_DIFF < m.Timestamp && h.lastLabel == m.Label {
h.lastTimestamp = m.Timestamp
h.countsInARow += 1
return
@ -55,4 +52,4 @@ func (h *clickrage) HandleMessage(msg Message) {
case *IOSSessionEnd:
h.build()
}
}
}

View file

@ -1,8 +1,8 @@
package heuristics
import (
. "openreplay/backend/pkg/messages"
. "openreplay/backend/pkg/db/types"
. "openreplay/backend/pkg/db/types"
. "openreplay/backend/pkg/messages"
)
type MessageHandler interface {
@ -19,7 +19,6 @@ type Handler interface {
type mainHandler map[uint64]*sessHandler
func NewHandler() mainHandler {
return make(mainHandler)
}
@ -43,8 +42,10 @@ func (m mainHandler) HandleMessage(session *Session, msg Message) {
}
func (m mainHandler) IterateSessionReadyMessages(sessionID uint64, iter func(msg Message)) {
s, ok := m[ sessionID ]
if !ok { return }
s, ok := m[sessionID]
if !ok {
return
}
s.IterateReadyMessages(iter)
if s.IsEnded() {
delete(m, sessionID)
@ -61,5 +62,3 @@ func (m mainHandler) IterateReadyMessages(iter func(sessionID uint64, msg Messag
}
}
}

View file

@ -1,31 +1,30 @@
package heuristics
import (
. "openreplay/backend/pkg/messages"
. "openreplay/backend/pkg/messages"
)
const AGGR_TIME = 15 * 60 * 1000
type valueAggregator struct {
sum float64
sum float64
count float64
}
func (va *valueAggregator) aggregate() uint64 {
if va.count == 0 {
return 0
}
return uint64(va.sum/va.count)
return uint64(va.sum / va.count)
}
type performanceAggregator struct {
readyMessageStore
pa *IOSPerformanceAggregated
fps valueAggregator
cpu valueAggregator
memory valueAggregator
battery valueAggregator
pa *IOSPerformanceAggregated
fps valueAggregator
cpu valueAggregator
memory valueAggregator
battery valueAggregator
}
func (h *performanceAggregator) build(timestamp uint64) {
@ -56,7 +55,7 @@ func (h *performanceAggregator) HandleMessage(msg Message) {
if h.pa.TimestampStart == 0 {
h.pa.TimestampStart = m.Timestamp
}
if h.pa.TimestampStart + AGGR_TIME <= m.Timestamp {
if h.pa.TimestampStart+AGGR_TIME <= m.Timestamp {
h.build(m.Timestamp)
}
switch m.Name {
@ -96,8 +95,8 @@ func (h *performanceAggregator) HandleMessage(msg Message) {
if m.Value > h.pa.MaxBattery {
h.pa.MaxBattery = m.Value
}
}
}
case *IOSSessionEnd:
h.build(m.Timestamp)
}
}
}

View file

@ -1,10 +1,9 @@
package heuristics
import (
. "openreplay/backend/pkg/messages"
. "openreplay/backend/pkg/messages"
)
type readyMessageStore struct {
store []Message
}
@ -18,4 +17,4 @@ func (s *readyMessageStore) IterateReadyMessages(cb func(msg Message)) {
cb(msg)
}
s.store = nil
}
}

View file

@ -1,18 +1,16 @@
package heuristics
import (
. "openreplay/backend/pkg/messages"
. "openreplay/backend/pkg/db/types"
. "openreplay/backend/pkg/db/types"
. "openreplay/backend/pkg/messages"
)
type sessHandler struct {
session *Session
session *Session
handlers []Handler
ended bool
ended bool
}
func newSessHandler(session *Session) *sessHandler {
return &sessHandler{
session: session,
@ -44,4 +42,4 @@ func (s *sessHandler) IterateReadyMessages(cb func(msg Message)) {
func (s *sessHandler) IsEnded() bool {
return s.ended
}
}

View file

@ -0,0 +1,138 @@
package ios
import (
"strings"
)
func MapIOSDevice(identifier string) string {
switch identifier {
case "iPod5,1":
return "iPod touch (5th generation)"
case "iPod7,1":
return "iPod touch (6th generation)"
case "iPod9,1":
return "iPod touch (7th generation)"
case "iPhone3,1", "iPhone3,2", "iPhone3,3":
return "iPhone 4"
case "iPhone4,1":
return "iPhone 4s"
case "iPhone5,1", "iPhone5,2":
return "iPhone 5"
case "iPhone5,3", "iPhone5,4":
return "iPhone 5c"
case "iPhone6,1", "iPhone6,2":
return "iPhone 5s"
case "iPhone7,2":
return "iPhone 6"
case "iPhone7,1":
return "iPhone 6 Plus"
case "iPhone8,1":
return "iPhone 6s"
case "iPhone8,2":
return "iPhone 6s Plus"
case "iPhone8,4":
return "iPhone SE"
case "iPhone9,1", "iPhone9,3":
return "iPhone 7"
case "iPhone9,2", "iPhone9,4":
return "iPhone 7 Plus"
case "iPhone10,1", "iPhone10,4":
return "iPhone 8"
case "iPhone10,2", "iPhone10,5":
return "iPhone 8 Plus"
case "iPhone10,3", "iPhone10,6":
return "iPhone X"
case "iPhone11,2":
return "iPhone XS"
case "iPhone11,4", "iPhone11,6":
return "iPhone XS Max"
case "iPhone11,8":
return "iPhone XR"
case "iPhone12,1":
return "iPhone 11"
case "iPhone12,3":
return "iPhone 11 Pro"
case "iPhone12,5":
return "iPhone 11 Pro Max"
case "iPhone12,8":
return "iPhone SE (2nd generation)"
case "iPhone13,1":
return "iPhone 12 mini"
case "iPhone13,2":
return "iPhone 12"
case "iPhone13,3":
return "iPhone 12 Pro"
case "iPhone13,4":
return "iPhone 12 Pro Max"
case "iPad2,1", "iPad2,2", "iPad2,3", "iPad2,4":
return "iPad 2"
case "iPad3,1", "iPad3,2", "iPad3,3":
return "iPad (3rd generation)"
case "iPad3,4", "iPad3,5", "iPad3,6":
return "iPad (4th generation)"
case "iPad6,11", "iPad6,12":
return "iPad (5th generation)"
case "iPad7,5", "iPad7,6":
return "iPad (6th generation)"
case "iPad7,11", "iPad7,12":
return "iPad (7th generation)"
case "iPad11,6", "iPad11,7":
return "iPad (8th generation)"
case "iPad4,1", "iPad4,2", "iPad4,3":
return "iPad Air"
case "iPad5,3", "iPad5,4":
return "iPad Air 2"
case "iPad11,3", "iPad11,4":
return "iPad Air (3rd generation)"
case "iPad13,1", "iPad13,2":
return "iPad Air (4th generation)"
case "iPad2,5", "iPad2,6", "iPad2,7":
return "iPad mini"
case "iPad4,4", "iPad4,5", "iPad4,6":
return "iPad mini 2"
case "iPad4,7", "iPad4,8", "iPad4,9":
return "iPad mini 3"
case "iPad5,1", "iPad5,2":
return "iPad mini 4"
case "iPad11,1", "iPad11,2":
return "iPad mini (5th generation)"
case "iPad6,3", "iPad6,4":
return "iPad Pro (9.7-inch)"
case "iPad7,3", "iPad7,4":
return "iPad Pro (10.5-inch)"
case "iPad8,1", "iPad8,2", "iPad8,3", "iPad8,4":
return "iPad Pro (11-inch) (1st generation)"
case "iPad8,9", "iPad8,10":
return "iPad Pro (11-inch) (2nd generation)"
case "iPad6,7", "iPad6,8":
return "iPad Pro (12.9-inch) (1st generation)"
case "iPad7,1", "iPad7,2":
return "iPad Pro (12.9-inch) (2nd generation)"
case "iPad8,5", "iPad8,6", "iPad8,7", "iPad8,8":
return "iPad Pro (12.9-inch) (3rd generation)"
case "iPad8,11", "iPad8,12":
return "iPad Pro (12.9-inch) (4th generation)"
case "AppleTV5,3":
return "Apple TV"
case "AppleTV6,2":
return "Apple TV 4K"
case "AudioAccessory1,1":
return "HomePod"
case "AudioAccessory5,1":
return "HomePod mini"
case "i386", "x86_64":
return "Simulator"
default:
return identifier
}
}
func GetIOSDeviceType(identifier string) string {
if strings.Contains(identifier, "iPhone") {
return "mobile" //"phone"
}
if strings.Contains(identifier, "iPad") {
return "tablet"
}
return "other"
}

View file

@ -0,0 +1,172 @@
package router
import (
"encoding/json"
"errors"
"log"
"math/rand"
"net/http"
"openreplay/backend/internal/ios"
"openreplay/backend/internal/uuid"
"strconv"
"time"
"openreplay/backend/pkg/db/postgres"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/token"
)
func (e *Router) startSessionHandlerIOS(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
req := &StartIOSSessionRequest{}
if r.Body == nil {
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
return
}
body := http.MaxBytesReader(w, r.Body, e.cfg.JsonSizeLimit)
defer body.Close()
if err := json.NewDecoder(body).Decode(req); err != nil {
ResponseWithError(w, http.StatusBadRequest, err)
return
}
if req.ProjectKey == nil {
ResponseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"))
return
}
p, err := e.services.Database.GetProjectByKey(*req.ProjectKey)
if err != nil {
if postgres.IsNoRowsErr(err) {
ResponseWithError(w, http.StatusNotFound, errors.New("Project doesn't exist or is not active"))
} else {
ResponseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
}
return
}
userUUID := uuid.GetUUID(req.UserUUID)
tokenData, err := e.services.Tokenizer.Parse(req.Token)
if err != nil { // Starting the new one
dice := byte(rand.Intn(100)) // [0, 100)
if dice >= p.SampleRate {
ResponseWithError(w, http.StatusForbidden, errors.New("cancel"))
return
}
ua := e.services.UaParser.ParseFromHTTPRequest(r)
if ua == nil {
ResponseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
return
}
sessionID, err := e.services.Flaker.Compose(uint64(startTime.UnixMilli()))
if err != nil {
ResponseWithError(w, http.StatusInternalServerError, err)
return
}
// TODO: if EXPIRED => send message for two sessions association
expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond)
tokenData = &token.TokenData{sessionID, expTime.UnixMilli()}
country := e.services.GeoIP.ExtractISOCodeFromHTTPRequest(r)
// The difference with web is mostly here:
e.services.Producer.Produce(e.cfg.TopicRawIOS, tokenData.ID, Encode(&IOSSessionStart{
Timestamp: req.Timestamp,
ProjectID: uint64(p.ProjectID),
TrackerVersion: req.TrackerVersion,
RevID: req.RevID,
UserUUID: userUUID,
UserOS: "IOS",
UserOSVersion: req.UserOSVersion,
UserDevice: ios.MapIOSDevice(req.UserDevice),
UserDeviceType: ios.GetIOSDeviceType(req.UserDevice),
UserCountry: country,
}))
}
ResponseWithJSON(w, &StartIOSSessionResponse{
Token: e.services.Tokenizer.Compose(*tokenData),
UserUUID: userUUID,
SessionID: strconv.FormatUint(tokenData.ID, 10),
BeaconSizeLimit: e.cfg.BeaconSizeLimit,
})
}
func (e *Router) pushMessagesHandlerIOS(w http.ResponseWriter, r *http.Request) {
sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r)
if err != nil {
ResponseWithError(w, http.StatusUnauthorized, err)
return
}
e.pushMessages(w, r, sessionData.ID, e.cfg.TopicRawIOS)
}
func (e *Router) pushLateMessagesHandlerIOS(w http.ResponseWriter, r *http.Request) {
sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r)
if err != nil && err != token.EXPIRED {
ResponseWithError(w, http.StatusUnauthorized, err)
return
}
// Check timestamps here?
e.pushMessages(w, r, sessionData.ID, e.cfg.TopicRawIOS)
}
func (e *Router) imagesUploadHandlerIOS(w http.ResponseWriter, r *http.Request) {
log.Printf("recieved imagerequest")
sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r)
if err != nil { // Should accept expired token?
ResponseWithError(w, http.StatusUnauthorized, err)
return
}
if r.Body == nil {
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
return
}
r.Body = http.MaxBytesReader(w, r.Body, e.cfg.FileSizeLimit)
defer r.Body.Close()
err = r.ParseMultipartForm(1e6) // ~1Mb
if err == http.ErrNotMultipart || err == http.ErrMissingBoundary {
ResponseWithError(w, http.StatusUnsupportedMediaType, err)
return
// } else if err == multipart.ErrMessageTooLarge // if non-files part exceeds 10 MB
} else if err != nil {
ResponseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
return
}
if r.MultipartForm == nil {
ResponseWithError(w, http.StatusInternalServerError, errors.New("Multipart not parsed"))
return
}
if len(r.MultipartForm.Value["projectKey"]) == 0 {
ResponseWithError(w, http.StatusBadRequest, errors.New("projectKey parameter missing")) // status for missing/wrong parameter?
return
}
prefix := r.MultipartForm.Value["projectKey"][0] + "/" + strconv.FormatUint(sessionData.ID, 10) + "/"
for _, fileHeaderList := range r.MultipartForm.File {
for _, fileHeader := range fileHeaderList {
file, err := fileHeader.Open()
if err != nil {
continue // TODO: send server error or accumulate successful files
}
key := prefix + fileHeader.Filename
log.Printf("Uploading image... %v", key)
go func() { //TODO: mime type from header
if err := e.services.Storage.Upload(file, key, "image/jpeg", false); err != nil {
log.Printf("Upload ios screen error. %v", err)
}
}()
}
}
w.WriteHeader(http.StatusOK)
}

View file

@ -0,0 +1,187 @@
package router
import (
"bytes"
"encoding/json"
"errors"
"log"
"math/rand"
"net/http"
"openreplay/backend/internal/uuid"
"strconv"
"time"
"openreplay/backend/pkg/db/postgres"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/token"
)
func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
// Check request body
if r.Body == nil {
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
return
}
body := http.MaxBytesReader(w, r.Body, e.cfg.JsonSizeLimit)
defer body.Close()
// Parse request body
req := &StartSessionRequest{}
if err := json.NewDecoder(body).Decode(req); err != nil {
ResponseWithError(w, http.StatusBadRequest, err)
return
}
// Handler's logic
if req.ProjectKey == nil {
ResponseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"))
return
}
p, err := e.services.Database.GetProjectByKey(*req.ProjectKey)
if err != nil {
if postgres.IsNoRowsErr(err) {
ResponseWithError(w, http.StatusNotFound, errors.New("Project doesn't exist or capture limit has been reached"))
} else {
ResponseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
}
return
}
userUUID := uuid.GetUUID(req.UserUUID)
tokenData, err := e.services.Tokenizer.Parse(req.Token)
if err != nil || req.Reset { // Starting the new one
dice := byte(rand.Intn(100)) // [0, 100)
if dice >= p.SampleRate {
ResponseWithError(w, http.StatusForbidden, errors.New("cancel"))
return
}
ua := e.services.UaParser.ParseFromHTTPRequest(r)
if ua == nil {
ResponseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
return
}
sessionID, err := e.services.Flaker.Compose(uint64(startTime.UnixNano() / 1e6))
if err != nil {
ResponseWithError(w, http.StatusInternalServerError, err)
return
}
// TODO: if EXPIRED => send message for two sessions association
expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond)
tokenData = &token.TokenData{ID: sessionID, ExpTime: expTime.UnixNano() / 1e6}
e.services.Producer.Produce(e.cfg.TopicRawWeb, tokenData.ID, Encode(&SessionStart{
Timestamp: req.Timestamp,
ProjectID: uint64(p.ProjectID),
TrackerVersion: req.TrackerVersion,
RevID: req.RevID,
UserUUID: userUUID,
UserAgent: r.Header.Get("User-Agent"),
UserOS: ua.OS,
UserOSVersion: ua.OSVersion,
UserBrowser: ua.Browser,
UserBrowserVersion: ua.BrowserVersion,
UserDevice: ua.Device,
UserDeviceType: ua.DeviceType,
UserCountry: e.services.GeoIP.ExtractISOCodeFromHTTPRequest(r),
UserDeviceMemorySize: req.DeviceMemory,
UserDeviceHeapSize: req.JsHeapSizeLimit,
UserID: req.UserID,
}))
}
ResponseWithJSON(w, &StartSessionResponse{
Token: e.services.Tokenizer.Compose(*tokenData),
UserUUID: userUUID,
SessionID: strconv.FormatUint(tokenData.ID, 10),
BeaconSizeLimit: e.cfg.BeaconSizeLimit,
})
}
func (e *Router) pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request) {
// Check authorization
sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r)
if err != nil {
ResponseWithError(w, http.StatusUnauthorized, err)
return
}
// Check request body
if r.Body == nil {
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
return
}
body := http.MaxBytesReader(w, r.Body, e.cfg.BeaconSizeLimit)
defer body.Close()
var handledMessages bytes.Buffer
// Process each message in request data
err = ReadBatchReader(body, func(msg Message) {
msg = e.services.Assets.ParseAssets(sessionData.ID, msg)
handledMessages.Write(msg.Encode())
})
if err != nil {
ResponseWithError(w, http.StatusForbidden, err)
return
}
// Send processed messages to queue as array of bytes
err = e.services.Producer.Produce(e.cfg.TopicRawWeb, sessionData.ID, handledMessages.Bytes())
if err != nil {
log.Printf("can't send processed messages to queue: %s", err)
}
w.WriteHeader(http.StatusOK)
}
func (e *Router) notStartedHandlerWeb(w http.ResponseWriter, r *http.Request) {
// Check request body
if r.Body == nil {
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
return
}
body := http.MaxBytesReader(w, r.Body, e.cfg.JsonSizeLimit)
defer body.Close()
// Parse request body
req := &NotStartedRequest{}
if err := json.NewDecoder(body).Decode(req); err != nil {
ResponseWithError(w, http.StatusBadRequest, err)
return
}
// Handler's logic
if req.ProjectKey == nil {
ResponseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"))
return
}
ua := e.services.UaParser.ParseFromHTTPRequest(r) // TODO?: insert anyway
if ua == nil {
ResponseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
return
}
country := e.services.GeoIP.ExtractISOCodeFromHTTPRequest(r)
err := e.services.Database.InsertUnstartedSession(postgres.UnstartedSession{
ProjectKey: *req.ProjectKey,
TrackerVersion: req.TrackerVersion,
DoNotTrack: req.DoNotTrack,
Platform: "web",
UserAgent: r.Header.Get("User-Agent"),
UserOS: ua.OS,
UserOSVersion: ua.OSVersion,
UserBrowser: ua.Browser,
UserBrowserVersion: ua.BrowserVersion,
UserDevice: ua.Device,
UserDeviceType: ua.DeviceType,
UserCountry: country,
})
if err != nil {
log.Printf("Unable to insert Unstarted Session: %v\n", err)
}
w.WriteHeader(http.StatusOK)
}

View file

@ -1,28 +1,27 @@
package main
package router
import (
gzip "github.com/klauspost/pgzip"
"io"
"io/ioutil"
"log"
"net/http"
gzip "github.com/klauspost/pgzip"
)
const JSON_SIZE_LIMIT int64 = 1e3 // 1Kb
func pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64, topicName string) {
body := http.MaxBytesReader(w, r.Body, BEACON_SIZE_LIMIT)
func (e *Router) pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64, topicName string) {
body := http.MaxBytesReader(w, r.Body, e.cfg.BeaconSizeLimit)
defer body.Close()
var reader io.ReadCloser
var err error
switch r.Header.Get("Content-Encoding") {
case "gzip":
log.Println("Gzip", reader)
reader, err = gzip.NewReader(body)
if err != nil {
responseWithError(w, http.StatusInternalServerError, err) // TODO: stage-dependent responce
ResponseWithError(w, http.StatusInternalServerError, err) // TODO: stage-dependent response
return
}
log.Println("Gzip reader init", reader)
@ -33,9 +32,9 @@ func pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64, topi
log.Println("Reader after switch:", reader)
buf, err := ioutil.ReadAll(reader)
if err != nil {
responseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
ResponseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
return
}
producer.Produce(topicName, sessionID, buf) // What if not able to send?
e.services.Producer.Produce(topicName, sessionID, buf) // What if not able to send?
w.WriteHeader(http.StatusOK)
}

View file

@ -0,0 +1,49 @@
package router
type StartSessionRequest struct {
Token string `json:"token"`
UserUUID *string `json:"userUUID"`
RevID string `json:"revID"`
Timestamp uint64 `json:"timestamp"`
TrackerVersion string `json:"trackerVersion"`
IsSnippet bool `json:"isSnippet"`
DeviceMemory uint64 `json:"deviceMemory"`
JsHeapSizeLimit uint64 `json:"jsHeapSizeLimit"`
ProjectKey *string `json:"projectKey"`
Reset bool `json:"reset"`
UserID string `json:"userID"`
}
type StartSessionResponse struct {
Timestamp int64 `json:"timestamp"`
Delay int64 `json:"delay"`
Token string `json:"token"`
UserUUID string `json:"userUUID"`
SessionID string `json:"sessionID"`
BeaconSizeLimit int64 `json:"beaconSizeLimit"`
}
type NotStartedRequest struct {
ProjectKey *string `json:"projectKey"`
TrackerVersion string `json:"trackerVersion"`
DoNotTrack bool `json:"DoNotTrack"`
}
type StartIOSSessionRequest struct {
Token string `json:"token"`
ProjectKey *string `json:"projectKey"`
TrackerVersion string `json:"trackerVersion"`
RevID string `json:"revID"`
UserUUID *string `json:"userUUID"`
UserOSVersion string `json:"userOSVersion"`
UserDevice string `json:"userDevice"`
Timestamp uint64 `json:"timestamp"`
}
type StartIOSSessionResponse struct {
Token string `json:"token"`
ImagesHashList []string `json:"imagesHashList"`
UserUUID string `json:"userUUID"`
BeaconSizeLimit int64 `json:"beaconSizeLimit"`
SessionID string `json:"sessionID"`
}

View file

@ -1,4 +1,4 @@
package main
package router
import (
"encoding/json"
@ -6,7 +6,7 @@ import (
"net/http"
)
func responseWithJSON(w http.ResponseWriter, res interface{}) {
func ResponseWithJSON(w http.ResponseWriter, res interface{}) {
body, err := json.Marshal(res)
if err != nil {
log.Println(err)
@ -15,10 +15,10 @@ func responseWithJSON(w http.ResponseWriter, res interface{}) {
w.Write(body)
}
func responseWithError(w http.ResponseWriter, code int, err error) {
func ResponseWithError(w http.ResponseWriter, code int, err error) {
type response struct {
Error string `json:"error"`
}
w.WriteHeader(code)
responseWithJSON(w, &response{err.Error()})
ResponseWithJSON(w, &response{err.Error()})
}

View file

@ -0,0 +1,70 @@
package router
import (
"github.com/gorilla/mux"
"log"
"net/http"
"openreplay/backend/internal/config"
http2 "openreplay/backend/internal/services"
)
type Router struct {
router *mux.Router
cfg *config.Config
services *http2.ServicesBuilder
}
func NewRouter(cfg *config.Config, services *http2.ServicesBuilder) (*Router, error) {
e := &Router{
cfg: cfg,
services: services,
}
e.init()
return e, nil
}
func (e *Router) init() {
e.router = mux.NewRouter()
// Root path
e.router.HandleFunc("/", e.root).Methods("POST")
// Web handlers
e.router.HandleFunc("/v1/web/not-started", e.notStartedHandlerWeb).Methods("POST")
e.router.HandleFunc("/v1/web/start", e.startSessionHandlerWeb).Methods("POST")
e.router.HandleFunc("/v1/web/i", e.pushMessagesHandlerWeb).Methods("POST")
// iOS handlers
e.router.HandleFunc("/v1/ios/start", e.startSessionHandlerIOS).Methods("POST")
e.router.HandleFunc("/v1/ios/i", e.pushMessagesHandlerIOS).Methods("POST")
e.router.HandleFunc("/v1/ios/late", e.pushLateMessagesHandlerIOS).Methods("POST")
e.router.HandleFunc("/v1/ios/images", e.imagesUploadHandlerIOS).Methods("POST")
// CORS middleware
e.router.Use(e.corsMiddleware)
}
func (e *Router) root(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}
func (e *Router) corsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Prepare headers for preflight requests
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type,Authorization")
if r.Method == http.MethodOptions {
w.Header().Set("Cache-Control", "max-age=86400")
w.WriteHeader(http.StatusOK)
return
}
log.Printf("Request: %v - %v ", r.Method, r.URL.Path)
// Serve request
next.ServeHTTP(w, r)
})
}
func (e *Router) GetHandler() http.Handler {
return e.router
}

View file

@ -0,0 +1,46 @@
package server
import (
"context"
"errors"
"fmt"
"golang.org/x/net/http2"
"log"
"net/http"
"time"
)
type Server struct {
server *http.Server
}
func New(handler http.Handler, host, port string, timeout time.Duration) (*Server, error) {
switch {
case port == "":
return nil, errors.New("empty server port")
case handler == nil:
return nil, errors.New("empty handler")
case timeout < 1:
return nil, fmt.Errorf("invalid timeout %d", timeout)
}
server := &http.Server{
Addr: fmt.Sprintf("%s:%s", host, port),
Handler: handler,
ReadTimeout: timeout,
WriteTimeout: timeout,
}
if err := http2.ConfigureServer(server, nil); err != nil {
log.Printf("can't configure http2 server: %s", err)
}
return &Server{
server: server,
}, nil
}
func (s *Server) Start() error {
return s.server.ListenAndServe()
}
func (s *Server) Stop() {
s.server.Shutdown(context.Background())
}

View file

@ -0,0 +1,39 @@
package services
import (
"openreplay/backend/internal/assetscache"
"openreplay/backend/internal/config"
"openreplay/backend/internal/geoip"
"openreplay/backend/internal/uaparser"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/flakeid"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/storage"
"openreplay/backend/pkg/token"
"openreplay/backend/pkg/url/assets"
)
type ServicesBuilder struct {
Database *cache.PGCache
Producer types.Producer
Assets *assetscache.AssetsCache
Flaker *flakeid.Flaker
UaParser *uaparser.UAParser
GeoIP *geoip.GeoIP
Tokenizer *token.Tokenizer
Storage *storage.S3
}
func New(cfg *config.Config, producer types.Producer, pgconn *cache.PGCache) *ServicesBuilder {
rewriter := assets.NewRewriter(cfg.AssetsOrigin)
return &ServicesBuilder{
Database: pgconn,
Producer: producer,
Assets: assetscache.New(cfg, rewriter, producer),
Storage: storage.NewS3(cfg.AWSRegion, cfg.S3BucketIOSImages),
Tokenizer: token.NewTokenizer(cfg.TokenSecret),
UaParser: uaparser.NewUAParser(cfg.UAParserFile),
GeoIP: geoip.NewGeoIP(cfg.MaxMinDBFile),
Flaker: flakeid.NewFlaker(cfg.WorkerID),
}
}

View file

@ -1,10 +1,10 @@
package main
package uuid
import (
"github.com/google/uuid"
)
func getUUID(u *string) string {
func GetUUID(u *string) string {
if u != nil {
_, err := uuid.Parse(*u)
if err == nil {
@ -12,4 +12,4 @@ func getUUID(u *string) string {
}
}
return uuid.New().String()
}
}

View file

@ -1,24 +1,23 @@
package profiling
import (
"log"
"net/http"
"github.com/gorilla/mux"
_ "net/http/pprof"
)
"github.com/gorilla/mux"
"log"
"net/http"
_ "net/http/pprof"
)
func Profile() {
go func() {
router := mux.NewRouter()
router.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux)
log.Println("Starting profiler...")
if err := http.ListenAndServe(":6060", router); err != nil {
panic(err)
}
router := mux.NewRouter()
router.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux)
log.Println("Starting profiler...")
if err := http.ListenAndServe(":6060", router); err != nil {
panic(err)
}
}()
}
/*
docker run -p 6060:6060 -e REQUIRED_ENV=http://value -e ANOTHER_ENV=anothervalue workername
@ -34,4 +33,4 @@ go tool pprof http://localhost:6060/debug/pprof/profile?seconds=30
THEN
https://www.speedscope.app/
*/
*/

View file

@ -19,7 +19,7 @@ func AWSSessionOnRegion(region string) *_session.Session {
if AWS_ENDPOINT != "" {
config.Endpoint = aws.String(AWS_ENDPOINT)
config.DisableSSL = aws.Bool(true)
config.S3ForcePathStyle = aws.Bool(true)
config.S3ForcePathStyle = aws.Bool(true)
}
aws_session, err := _session.NewSession(config)
if err != nil {

View file

@ -22,7 +22,7 @@ func Uint64(key string) uint64 {
v := String(key)
n, err := strconv.ParseUint(v, 10, 64)
if err != nil {
log.Fatalln(key + " has a wrong value. ", err)
log.Fatalln(key+" has a wrong value. ", err)
}
return n
}
@ -31,12 +31,13 @@ func Uint16(key string) uint16 {
v := String(key)
n, err := strconv.ParseUint(v, 10, 16)
if err != nil {
log.Fatalln(key + " has a wrong value. ", err)
log.Fatalln(key+" has a wrong value. ", err)
}
return uint16(n)
}
const MAX_INT = uint64(^uint(0) >> 1)
func Int(key string) int {
val := Uint64(key)
if val > MAX_INT {
@ -54,4 +55,4 @@ func Bool(key string) bool {
return true
}
return false
}
}

View file

@ -5,9 +5,9 @@ import (
)
func hashHostname(hostname string) uint16 {
var h uint16 ;
var h uint16
for i, b := range hostname {
h += uint16(i+1)*uint16(b)
h += uint16(i+1) * uint16(b)
}
return h
}

View file

@ -8,7 +8,7 @@ const (
TIMESTAMP_MAX = 1<<TIMESTAMP_SIZE - 1
TIMESTAMP_SHIFT = SEQ_ID_SIZE + SHARD_ID_SHIFT
SHARD_ID_SHIFT = SEQ_ID_SIZE
EPOCH = 1550000000000
EPOCH = 1550000000000
)
func compose(timestamp uint64, shardID uint16, seqID byte) uint64 {

View file

@ -16,7 +16,6 @@ func IssueID(projectID uint32, e *messages.IssueEvent) string {
return strconv.FormatUint(uint64(projectID), 16) + hex.EncodeToString(hash.Sum(nil))
}
func IOSCrashID(projectID uint32, crash *messages.IOSCrash) string {
hash := fnv.New128a()
hash.Write([]byte(crash.Name))

View file

@ -5,7 +5,7 @@ const HEARTBEAT_INTERVAL = 2 * 60 * 1000
const INTEGRATIONS_REQUEST_INTERVAL = 1 * 60 * 1000
const EVENTS_PAGE_EVENT_TIMEOUT = 2 * 60 * 1000
const EVENTS_INPUT_EVENT_TIMEOUT = 2 * 60 * 1000
const EVENTS_PERFORMANCE_AGGREGATION_TIMEOUT = 2 * 60 * 1000
const EVENTS_SESSION_END_TIMEOUT = HEARTBEAT_INTERVAL + 30 * 1000
const EVENTS_SESSION_END_TIMEOUT_WITH_INTEGRATIONS = HEARTBEAT_INTERVAL + 3 * 60 * 1000
const EVENTS_PERFORMANCE_AGGREGATION_TIMEOUT = 2 * 60 * 1000
const EVENTS_SESSION_END_TIMEOUT = HEARTBEAT_INTERVAL + 30*1000
const EVENTS_SESSION_END_TIMEOUT_WITH_INTEGRATIONS = HEARTBEAT_INTERVAL + 3*60*1000
const EVENTS_BACK_COMMIT_GAP = EVENTS_SESSION_END_TIMEOUT_WITH_INTEGRATIONS + 1*60*1000

View file

@ -1,77 +1,76 @@
package log
import (
"time"
"fmt"
"log"
"fmt"
"log"
"time"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue/types"
//"openreplay/backend/pkg/env"
"openreplay/backend/pkg/queue/types"
//"openreplay/backend/pkg/env"
)
type partitionStats struct {
maxts int64
mints int64
lastts int64
lastID uint64
count int
maxts int64
mints int64
lastts int64
lastID uint64
count int
}
// Update partition statistic
func (prt *partitionStats) update(m *types.Meta) {
if prt.maxts < m.Timestamp {
prt.maxts = m.Timestamp
}
if prt.mints > m.Timestamp || prt.mints == 0 {
prt.mints = m.Timestamp
}
prt.lastts = m.Timestamp
prt.lastID = m.ID
prt.count += 1
}
type queueStats struct {
prts map[int32]*partitionStats
tick <-chan time.Time
prts map[int32]*partitionStats
tick <-chan time.Time
}
func NewQueueStats(sec int)*queueStats {
return &queueStats{
prts: make(map[int32]*partitionStats),
tick: time.Tick(time.Duration(sec) * time.Second),
}
func NewQueueStats(sec int) *queueStats {
return &queueStats{
prts: make(map[int32]*partitionStats),
tick: time.Tick(time.Duration(sec) * time.Second),
}
}
func (qs *queueStats) HandleAndLog(sessionID uint64, m *types.Meta) {
prti := int32(sessionID % 16) // TODO use GetKeyPartition from kafka/key.go
prt, ok := qs.prts[prti]
if !ok {
qs.prts[prti] = &partitionStats{}
prt = qs.prts[prti]
}
// Collect writes new data to partition statistic
func (qs *queueStats) Collect(sessionID uint64, m *types.Meta) {
prti := int32(sessionID % 16) // TODO use GetKeyPartition from kafka/key.go
prt, ok := qs.prts[prti]
if !ok {
qs.prts[prti] = &partitionStats{}
prt = qs.prts[prti]
}
prt.update(m)
if prt.maxts < m.Timestamp {
prt.maxts = m.Timestamp
}
if prt.mints > m.Timestamp || prt.mints == 0 {
prt.mints = m.Timestamp
}
prt.lastts = m.Timestamp
prt.lastID = m.ID
prt.count += 1
select {
case <-qs.tick:
qs.LogThenReset()
default:
}
select {
case <-qs.tick:
qs.log()
qs.reset()
default:
}
}
func (qs *queueStats) LogThenReset() {
s := "Queue Statistics: "
for i, p := range qs.prts {
s = fmt.Sprintf("%v | %v:: lastTS %v, lastID %v, count %v, maxTS %v, minTS %v",
s, i, p.lastts, p.lastID, p.count, p.maxts, p.mints)
}
log.Println(s)
// reset
qs.prts = make(map[int32]*partitionStats)
// Print to console collected statistics
func (qs *queueStats) log() {
s := "Queue Statistics: "
for i, p := range qs.prts {
s = fmt.Sprintf("%v | %v:: lastTS %v, lastID %v, count %v, maxTS %v, minTS %v",
s, i, p.lastts, p.lastID, p.count, p.maxts, p.mints)
}
log.Println(s)
}
// TODO: list of message id to log (mb filter function with callback in messages/utils.go or something)
func LogMessage(s string, sessionID uint64, msg messages.Message, m *types.Meta) {
log.Printf("%v | SessionID: %v, Queue info: %v, Message: %v", s, sessionID, m, msg)
// Clear all queue partitions
func (qs *queueStats) reset() {
qs.prts = make(map[int32]*partitionStats)
}

View file

@ -1,17 +1,12 @@
package messages
import (
"bytes"
"io"
"github.com/pkg/errors"
)
func ReadBatch(b []byte, callback func(Message)) error {
return ReadBatchReader(bytes.NewReader(b), callback)
}
func ReadBatchReader(reader io.Reader, callback func(Message)) error {
func ReadBatchReader(reader io.Reader, messageHandler func(Message)) error {
var index uint64
var timestamp int64
for {
@ -21,7 +16,7 @@ func ReadBatchReader(reader io.Reader, callback func(Message)) error {
} else if err != nil {
return errors.Wrapf(err, "Batch Message decoding error on message with index %v", index)
}
msg = transformDepricated(msg)
msg = transformDeprecated(msg)
isBatchMeta := false
switch m := msg.(type) {
@ -48,37 +43,11 @@ func ReadBatchReader(reader io.Reader, callback func(Message)) error {
}
msg.Meta().Index = index
msg.Meta().Timestamp = timestamp
callback(msg)
messageHandler(msg)
if !isBatchMeta { // Without that indexes will be unique anyway, though shifted by 1 because BatchMeta is not counted in tracker
index++
}
}
return errors.New("Error of the codeflow. (Should return on EOF)")
}
const AVG_MESSAGE_SIZE = 40 // TODO: calculate OR calculate dynamically
func WriteBatch(mList []Message) []byte {
batch := make([]byte, AVG_MESSAGE_SIZE*len(mList))
p := 0
for _, msg := range mList {
msgBytes := msg.Encode()
if len(batch) < p+len(msgBytes) {
newBatch := make([]byte, 2*len(batch)+len(msgBytes))
copy(newBatch, batch)
batch = newBatch
}
copy(batch[p:], msgBytes)
p += len(msgBytes)
}
return batch[:p]
}
func RewriteBatch(reader io.Reader, rewrite func(Message) Message) ([]byte, error) {
mList := make([]Message, 0, 10) // 10?
if err := ReadBatchReader(reader, func(m Message) {
mList = append(mList, rewrite(m))
}); err != nil {
return nil, err
}
return WriteBatch(mList), nil
}

View file

@ -36,6 +36,6 @@ func Encode(msg Message) []byte {
// }
func GetMessageTypeID(b []byte) (uint64, error) {
reader := bytes.NewReader(b)
reader := bytes.NewReader(b)
return ReadUint(reader)
}

View file

@ -1,11 +1,10 @@
// Auto-generated, do not edit
package messages
func IsReplayerType(id uint64) bool {
return 0 == id || 2 == id || 4 == id || 5 == id || 6 == id || 7 == id || 8 == id || 9 == id || 10 == id || 11 == id || 12 == id || 13 == id || 14 == id || 15 == id || 16 == id || 18 == id || 19 == id || 20 == id || 22 == id || 37 == id || 38 == id || 39 == id || 40 == id || 41 == id || 44 == id || 45 == id || 46 == id || 47 == id || 48 == id || 49 == id || 54 == id || 55 == id || 59 == id || 69 == id || 70 == id || 90 == id || 93 == id || 96 == id || 100 == id || 102 == id || 103 == id || 105 == id
func IsReplayerType(id int) bool {
return 0 == id || 2 == id || 4 == id || 5 == id || 6 == id || 7 == id || 8 == id || 9 == id || 10 == id || 11 == id || 12 == id || 13 == id || 14 == id || 15 == id || 16 == id || 18 == id || 19 == id || 20 == id || 22 == id || 37 == id || 38 == id || 39 == id || 40 == id || 41 == id || 44 == id || 45 == id || 46 == id || 47 == id || 48 == id || 49 == id || 54 == id || 55 == id || 59 == id || 69 == id || 70 == id || 90 == id || 93 == id || 96 == id || 100 == id || 102 == id || 103 == id || 105 == id
}
func IsIOSType(id uint64) bool {
func IsIOSType(id int) bool {
return 107 == id || 90 == id || 91 == id || 92 == id || 93 == id || 94 == id || 95 == id || 96 == id || 97 == id || 98 == id || 99 == id || 100 == id || 101 == id || 102 == id || 103 == id || 104 == id || 105 == id || 110 == id || 111 == id
}

View file

@ -1,65 +1,63 @@
// Auto-generated, do not edit
package messages
func GetTimestamp(message Message) uint64 {
switch msg := message.(type) {
case *IOSBatchMeta:
return msg.Timestamp
case *IOSSessionStart:
return msg.Timestamp
case *IOSSessionEnd:
return msg.Timestamp
case *IOSMetadata:
return msg.Timestamp
case *IOSCustomEvent:
return msg.Timestamp
case *IOSUserID:
return msg.Timestamp
case *IOSUserAnonymousID:
return msg.Timestamp
case *IOSScreenChanges:
return msg.Timestamp
case *IOSCrash:
return msg.Timestamp
case *IOSScreenEnter:
return msg.Timestamp
case *IOSScreenLeave:
return msg.Timestamp
case *IOSClickEvent:
return msg.Timestamp
case *IOSInputEvent:
return msg.Timestamp
case *IOSPerformanceEvent:
return msg.Timestamp
case *IOSLog:
return msg.Timestamp
case *IOSInternalError:
return msg.Timestamp
case *IOSNetworkCall:
return msg.Timestamp
case *IOSIssueEvent:
return msg.Timestamp
}
return uint64(message.Meta().Timestamp)
}
switch msg := message.(type) {
case *IOSBatchMeta:
return msg.Timestamp
case *IOSSessionStart:
return msg.Timestamp
case *IOSSessionEnd:
return msg.Timestamp
case *IOSMetadata:
return msg.Timestamp
case *IOSCustomEvent:
return msg.Timestamp
case *IOSUserID:
return msg.Timestamp
case *IOSUserAnonymousID:
return msg.Timestamp
case *IOSScreenChanges:
return msg.Timestamp
case *IOSCrash:
return msg.Timestamp
case *IOSScreenEnter:
return msg.Timestamp
case *IOSScreenLeave:
return msg.Timestamp
case *IOSClickEvent:
return msg.Timestamp
case *IOSInputEvent:
return msg.Timestamp
case *IOSPerformanceEvent:
return msg.Timestamp
case *IOSLog:
return msg.Timestamp
case *IOSInternalError:
return msg.Timestamp
case *IOSNetworkCall:
return msg.Timestamp
case *IOSIssueEvent:
return msg.Timestamp
}
return uint64(message.Meta().Timestamp)
}

View file

@ -1,32 +1,14 @@
package messages
func transformDepricated(msg Message) Message {
func transformDeprecated(msg Message) Message {
switch m := msg.(type) {
case *MouseClickDepricated:
meta := m.Meta()
meta.TypeID = 33
return &MouseClick{
meta: meta,
ID: m.ID,
ID: m.ID,
HesitationTime: m.HesitationTime,
Label: m.Label,
// Selector: '',
Label: m.Label,
}
// case *FetchDepricated:
// return &Fetch {
// Method: m.Method,
// URL: m.URL,
// Request: m.Request,
// Response: m.Response,
// Status: m.Status,
// Timestamp: m.Timestamp,
// Duration: m.Duration,
// // Headers: ''
// }
default:
return msg
return msg
}
}

View file

@ -0,0 +1,16 @@
package messages
type message struct {
Timestamp int64
Index uint64
}
func (m *message) Meta() *message {
return m
}
type Message interface {
Encode() []byte
TypeID() int
Meta() *message
}

File diff suppressed because it is too large Load diff

View file

@ -4,7 +4,6 @@ import (
"math"
)
func TimeDiff(t1 uint64, t2 uint64) uint64 {
if t1 < t2 {
return 0
@ -30,4 +29,4 @@ func CPURateFromTickRate(tickRate float64) uint64 {
func CPURate(ticks int64, dt uint64) uint64 {
return CPURateFromTickRate(TickRate(ticks, dt))
}
}

View file

@ -1,9 +1,9 @@
package messages
import (
"encoding/json"
"errors"
"io"
"encoding/json"
"log"
)
@ -37,7 +37,7 @@ func ReadData(reader io.Reader) ([]byte, error) {
}
return p, nil
}
func ReadUint(reader io.Reader) (uint64, error) {
var x uint64
var s uint
@ -152,4 +152,4 @@ func WriteJson(v interface{}, buf []byte, p int) int {
return WriteString("null", buf, p)
}
return WriteData(data, buf, p)
}
}

File diff suppressed because it is too large Load diff

View file

@ -8,6 +8,6 @@ import (
func StartProfilingServer() {
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
log.Println(http.ListenAndServe(":6060", nil))
}()
}

View file

@ -1,6 +1,7 @@
package queue
import (
"bytes"
"log"
"openreplay/backend/pkg/messages"
@ -9,7 +10,7 @@ import (
func NewMessageConsumer(group string, topics []string, handler types.DecodedMessageHandler, autoCommit bool) types.Consumer {
return NewConsumer(group, topics, func(sessionID uint64, value []byte, meta *types.Meta) {
if err := messages.ReadBatch(value, func(msg messages.Message) {
if err := messages.ReadBatchReader(bytes.NewReader(value), func(msg messages.Message) {
handler(sessionID, msg, meta)
}); err != nil {
log.Printf("Decode error: %v\n", err)

View file

@ -6,25 +6,24 @@ import (
"openreplay/backend/pkg/env"
)
type Producer struct {
redis *redis.Client
maxLenApprox int64
redis *redis.Client
maxLenApprox int64
}
func NewProducer() *Producer {
return &Producer{
redis: getRedisClient(),
redis: getRedisClient(),
maxLenApprox: int64(env.Uint64("REDIS_STREAMS_MAX_LEN")),
}
}
func (p *Producer) Produce(topic string, key uint64, value []byte) error {
args := &redis.XAddArgs{
args := &redis.XAddArgs{
Stream: topic,
Values: map[string]interface{}{
"sessionID": key,
"value": value,
"value": value,
},
}
args.MaxLenApprox = p.maxLenApprox
@ -35,7 +34,7 @@ func (p *Producer) Produce(topic string, key uint64, value []byte) error {
}
return nil
}
func (p *Producer) Close(_ int) {
// noop
}

View file

@ -2,15 +2,13 @@ package redisstream
import (
"log"
"github.com/go-redis/redis"
"openreplay/backend/pkg/env"
)
var redisClient *redis.Client
var redisClient *redis.Client
func getRedisClient() *redis.Client {
if redisClient != nil {
@ -23,4 +21,4 @@ func getRedisClient() *redis.Client {
log.Fatalln(err)
}
return redisClient
}
}

View file

@ -2,8 +2,8 @@ package storage
import (
"io"
"strconv"
"sort"
"strconv"
_s3 "github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
@ -12,18 +12,17 @@ import (
)
type S3 struct {
uploader *s3manager.Uploader
svc *_s3.S3
bucket *string
uploader *s3manager.Uploader
svc *_s3.S3
bucket *string
}
func NewS3(region string, bucket string) *S3 {
sess := env.AWSSessionOnRegion(region)
return &S3{
uploader: s3manager.NewUploader(sess),
svc: _s3.New(sess), // AWS Docs: "These clients are safe to use concurrently."
bucket: &bucket,
svc: _s3.New(sess), // AWS Docs: "These clients are safe to use concurrently."
bucket: &bucket,
}
}
@ -35,14 +34,14 @@ func (s3 *S3) Upload(reader io.Reader, key string, contentType string, gzipped b
contentEncoding = &gzipStr
}
_, err := s3.uploader.Upload(&s3manager.UploadInput{
Body: reader,
Bucket: s3.bucket,
Key: &key,
ContentType: &contentType,
CacheControl: &cacheControl,
Body: reader,
Bucket: s3.bucket,
Key: &key,
ContentType: &contentType,
CacheControl: &cacheControl,
ContentEncoding: contentEncoding,
})
return err
})
return err
}
func (s3 *S3) Get(key string) (io.ReadCloser, error) {
@ -67,8 +66,8 @@ func (s3 *S3) Exists(key string) bool {
return false
}
const MAX_RETURNING_COUNT = 40
func (s3 *S3) GetFrequentlyUsedKeys(projectID uint64) ([]string, error) {
prefix := strconv.FormatUint(projectID, 10) + "/"
output, err := s3.svc.ListObjectsV2(&_s3.ListObjectsV2Input{
@ -82,7 +81,7 @@ func (s3 *S3) GetFrequentlyUsedKeys(projectID uint64) ([]string, error) {
list := output.Contents
max := len(list)
if (max > MAX_RETURNING_COUNT) {
if max > MAX_RETURNING_COUNT {
max = MAX_RETURNING_COUNT
sort.Slice(list, func(i, j int) bool {
return list[i].LastModified.After(*(list[j].LastModified))
@ -91,8 +90,8 @@ func (s3 *S3) GetFrequentlyUsedKeys(projectID uint64) ([]string, error) {
var keyList []string
s := len(prefix)
for _, obj := range list[:max] {
keyList = append(keyList, (*obj.Key)[s:])
}
return keyList, nil
}
for _, obj := range list[:max] {
keyList = append(keyList, (*obj.Key)[s:])
}
return keyList, nil
}

View file

@ -39,7 +39,7 @@ func unquote(str string) (string, string) {
}
func ExtractURLsFromCSS(css string) []string {
indexes := cssUrlsIndex(css)
indexes := cssUrlsIndex(css)
urls := make([]string, len(indexes))
for _, idx := range indexes {

View file

@ -1,12 +1,12 @@
package url
var METHODS = []string{ "GET", "HEAD", "POST" , "PUT" , "DELETE" , "CONNECT" , "OPTIONS" , "TRACE" , "PATCH" }
var METHODS = []string{"GET", "HEAD", "POST", "PUT", "DELETE", "CONNECT", "OPTIONS", "TRACE", "PATCH"}
func EnsureMethod(method string) string {
for _, m := range METHODS {
if m == method {
return method
}
if m == method {
return method
}
}
return ""
}
}

View file

@ -1,16 +1,14 @@
package main
package main
import (
"encoding/json"
"strings"
)
type frame struct {
FileName string `json:"fileName"`
}
func extractJSExceptionSources(payload *string) ([]string, error) {
var frameList []frame
err := json.Unmarshal([]byte(*payload), &frameList)
@ -25,8 +23,8 @@ func extractJSExceptionSources(payload *string) ([]string, error) {
fn := strings.Split(f.FileName, "?")[0]
if strings.HasPrefix(fn, "http") && !presentedFileName[fn] {
fileNamesList = append(fileNamesList, f.FileName)
presentedFileName[fn] = true
presentedFileName[fn] = true
}
}
return fileNamesList, nil
}
}

View file

@ -66,6 +66,7 @@ func main() {
os.Exit(0)
case err := <-cacher.Errors:
log.Printf("Error while caching: %v", err)
// TODO: notify user
case <-tick:
cacher.UpdateTimeouts()
default:

View file

View file

@ -1,109 +0,0 @@
package main
import (
"log"
"time"
"os"
"os/signal"
"syscall"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/env"
logger "openreplay/backend/pkg/log"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/services/db/heuristics"
)
var pg *cache.PGCache
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
initStats()
pg = cache.NewPGCache(postgres.NewConn(env.String("POSTGRES_STRING")), 1000*60*20)
defer pg.Close()
heurFinder := heuristics.NewHandler()
statsLogger := logger.NewQueueStats(env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"))
consumer := queue.NewMessageConsumer(
env.String("GROUP_DB"),
[]string{
env.String("TOPIC_RAW_IOS"),
env.String("TOPIC_TRIGGER"),
},
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
statsLogger.HandleAndLog(sessionID, meta)
if err := insertMessage(sessionID, msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, sessionID, msg)
}
return
}
session, err := pg.GetSession(sessionID)
if err != nil {
// Might happen due to the assets-related message TODO: log only if session is necessary for this kind of message
log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, sessionID, msg)
return
}
err = insertStats(session, msg)
if err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg)
}
heurFinder.HandleMessage(session, msg)
heurFinder.IterateSessionReadyMessages(sessionID, func(msg messages.Message) {
// TODO: DRY code (carefully with the return statement logic)
if err := insertMessage(sessionID, msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
return
}
if err := insertStats(session, msg); err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
})
},
false,
)
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
tick := time.Tick(15 * time.Second)
log.Printf("Db service started\n")
for {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
consumer.Close()
os.Exit(0)
case <-tick:
pg.CommitBatches()
if err := commitStats(); err != nil {
log.Printf("Error on stats commit: %v", err)
}
// TODO?: separate stats & regular messages
if err := consumer.Commit(); err != nil {
log.Printf("Error on consumer commit: %v", err)
}
default:
err := consumer.ConsumeNext()
if err != nil {
log.Fatalf("Error on consumption: %v", err) // TODO: is always fatal?
}
}
}
}

View file

@ -1,68 +0,0 @@
package main
import (
. "openreplay/backend/pkg/messages"
)
func insertMessage(sessionID uint64, msg Message) error {
switch m := msg.(type) {
// Common
case *Metadata:
return pg.InsertMetadata(sessionID, m)
case *IssueEvent:
return pg.InsertIssueEvent(sessionID, m)
//TODO: message adapter (transformer) (at the level of pkg/message) for types:
// case *IOSMetadata, *IOSIssueEvent and others
// Web
case *SessionStart:
return pg.InsertWebSessionStart(sessionID, m)
case *SessionEnd:
return pg.InsertWebSessionEnd(sessionID, m)
case *UserID:
return pg.InsertWebUserID(sessionID, m)
case *UserAnonymousID:
return pg.InsertWebUserAnonymousID(sessionID, m)
case *CustomEvent:
return pg.InsertWebCustomEvent(sessionID, m)
case *ClickEvent:
return pg.InsertWebClickEvent(sessionID, m)
case *InputEvent:
return pg.InsertWebInputEvent(sessionID, m)
// Unique Web messages
// case *ResourceEvent:
// return pg.InsertWebResourceEvent(sessionID, m)
case *PageEvent:
return pg.InsertWebPageEvent(sessionID, m)
case *ErrorEvent:
return pg.InsertWebErrorEvent(sessionID, m)
case *FetchEvent:
return pg.InsertWebFetchEvent(sessionID, m)
case *GraphQLEvent:
return pg.InsertWebGraphQLEvent(sessionID, m)
// IOS
case *IOSSessionStart:
return pg.InsertIOSSessionStart(sessionID, m)
case *IOSSessionEnd:
return pg.InsertIOSSessionEnd(sessionID, m)
case *IOSUserID:
return pg.InsertIOSUserID(sessionID, m)
case *IOSUserAnonymousID:
return pg.InsertIOSUserAnonymousID(sessionID, m)
case *IOSCustomEvent:
return pg.InsertIOSCustomEvent(sessionID, m)
case *IOSClickEvent:
return pg.InsertIOSClickEvent(sessionID, m)
case *IOSInputEvent:
return pg.InsertIOSInputEvent(sessionID, m)
// Unique IOS messages
case *IOSNetworkCall:
return pg.InsertIOSNetworkCall(sessionID, m)
case *IOSScreenEnter:
return pg.InsertIOSScreenEnter(sessionID, m)
case *IOSCrash:
return pg.InsertIOSCrash(sessionID, m)
}
return nil // "Not implemented"
}

View file

@ -1,35 +0,0 @@
package main
import (
. "openreplay/backend/pkg/messages"
. "openreplay/backend/pkg/db/types"
)
func initStats() {
// noop
}
func insertStats(session *Session, msg Message) error {
switch m := msg.(type) {
// Web
case *PerformanceTrackAggr:
return pg.InsertWebStatsPerformance(session.SessionID, m)
case *ResourceEvent:
return pg.InsertWebStatsResourceEvent(session.SessionID, m)
case *LongTask:
return pg.InsertWebStatsLongtask(session.SessionID, m)
// IOS
// case *IOSPerformanceAggregated:
// return pg.InsertIOSPerformanceAggregated(session, m)
// case *IOSNetworkCall:
// return pg.InsertIOSNetworkCall(session, m)
}
return nil
}
func commitStats() error {
return nil
}

View file

@ -218,14 +218,16 @@ func (b *builder) handleMessage(message Message, messageID uint64) {
Type: tp,
Success: success,
})
if !success && tp == "fetch" {
if !success {
issueType := "missing_resource"
if tp == "fetch" {
issueType = "bad_request"
}
b.appendReadyMessage(&IssueEvent{
Type: "bad_request",
Type: issueType,
MessageID: messageID,
Timestamp: msg.Timestamp,
ContextString: msg.URL,
Context: "",
Payload: "",
})
}
case *RawCustomEvent:
@ -254,6 +256,14 @@ func (b *builder) handleMessage(message Message, messageID uint64) {
Status: msg.Status,
Duration: msg.Duration,
})
if msg.Status >= 400 {
b.appendReadyMessage(&IssueEvent{
Type: "bad_request",
MessageID: messageID,
Timestamp: msg.Timestamp,
ContextString: msg.URL,
})
}
case *GraphQL:
b.appendReadyMessage(&GraphQLEvent{
MessageID: messageID,

View file

@ -6,7 +6,6 @@ import (
type builderMap map[uint64]*builder
func NewBuilderMap() builderMap {
return make(builderMap)
}
@ -28,8 +27,10 @@ func (m builderMap) HandleMessage(sessionID uint64, msg Message, messageID uint6
}
func (m builderMap) IterateSessionReadyMessages(sessionID uint64, operatingTs int64, iter func(msg Message)) {
b, ok := m[ sessionID ]
if !ok { return }
b, ok := m[sessionID]
if !ok {
return
}
sessionEnded := b.checkTimeouts(operatingTs)
b.iterateReadyMessage(iter)
if sessionEnded {
@ -48,5 +49,3 @@ func (m builderMap) IterateReadyMessages(operatingTs int64, iter func(sessionID
}
}
}

View file

@ -1,34 +1,32 @@
package builder
import (
"encoding/json"
"encoding/json"
. "openreplay/backend/pkg/messages"
)
const CLICK_TIME_DIFF = 300
const MIN_CLICKS_IN_A_ROW = 3
type clickRageDetector struct {
lastTimestamp uint64
lastLabel string
lastTimestamp uint64
lastLabel string
firstInARawTimestamp uint64
firstInARawMessageId uint64
countsInARow int
countsInARow int
}
func (crd *clickRageDetector) Build() *IssueEvent {
var i *IssueEvent
if crd.countsInARow >= MIN_CLICKS_IN_A_ROW {
payload, _ := json.Marshal(struct{Count int }{crd.countsInARow,})
payload, _ := json.Marshal(struct{ Count int }{crd.countsInARow})
i = &IssueEvent{
Type: "click_rage",
Type: "click_rage",
ContextString: crd.lastLabel,
Payload: string(payload), // TODO: json encoder
Timestamp: crd.firstInARawTimestamp,
MessageID: crd.firstInARawMessageId,
Payload: string(payload), // TODO: json encoder
Timestamp: crd.firstInARawTimestamp,
MessageID: crd.firstInARawMessageId,
}
}
crd.lastTimestamp = 0
@ -39,8 +37,8 @@ func (crd *clickRageDetector) Build() *IssueEvent {
return i
}
func (crd *clickRageDetector) HandleMouseClick(msg *MouseClick, messageID uint64, timestamp uint64) *IssueEvent {
if crd.lastTimestamp + CLICK_TIME_DIFF > timestamp && crd.lastLabel == msg.Label {
func (crd *clickRageDetector) HandleMouseClick(msg *MouseClick, messageID uint64, timestamp uint64) *IssueEvent {
if crd.lastTimestamp+CLICK_TIME_DIFF > timestamp && crd.lastLabel == msg.Label {
crd.lastTimestamp = timestamp
crd.countsInARow += 1
return nil
@ -54,4 +52,4 @@ func (crd *clickRageDetector) HandleMouseClick(msg *MouseClick, messageID uint6
crd.countsInARow = 1
}
return i
}
}

View file

@ -3,20 +3,19 @@ package builder
import (
"encoding/json"
"openreplay/backend/pkg/messages/performance"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/messages/performance"
)
const CPU_THRESHOLD = 70 // % out of 100
const CPU_THRESHOLD = 70 // % out of 100
const CPU_MIN_DURATION_TRIGGER = 6 * 1000
type cpuIssueFinder struct {
startTimestamp uint64
startMessageID uint64
lastTimestamp uint64
maxRate uint64
contextString string
lastTimestamp uint64
maxRate uint64
contextString string
}
func (f *cpuIssueFinder) Build() *IssueEvent {
@ -35,16 +34,16 @@ func (f *cpuIssueFinder) Build() *IssueEvent {
return nil
}
payload, _ := json.Marshal(struct{
payload, _ := json.Marshal(struct {
Duration uint64
Rate uint64
}{duration,maxRate})
Rate uint64
}{duration, maxRate})
return &IssueEvent{
Type: "cpu",
Timestamp: timestamp,
MessageID: messageID,
Type: "cpu",
Timestamp: timestamp,
MessageID: messageID,
ContextString: f.contextString,
Payload: string(payload),
Payload: string(payload),
}
}
@ -52,8 +51,6 @@ func (f *cpuIssueFinder) HandleSetPageLocation(msg *SetPageLocation) {
f.contextString = msg.URL
}
func (f *cpuIssueFinder) HandlePerformanceTrack(msg *PerformanceTrack, messageID uint64, timestamp uint64) *IssueEvent {
dt := performance.TimeDiff(timestamp, f.lastTimestamp)
if dt == 0 {
@ -82,5 +79,3 @@ func (f *cpuIssueFinder) HandlePerformanceTrack(msg *PerformanceTrack, messageID
return nil
}

View file

@ -4,25 +4,23 @@ import (
. "openreplay/backend/pkg/messages"
)
const CLICK_RELATION_TIME = 1400
type deadClickDetector struct {
lastMouseClick *MouseClick
lastTimestamp uint64
lastMessageID uint64
inputIDSet map[uint64]bool
lastMouseClick *MouseClick
lastTimestamp uint64
lastMessageID uint64
inputIDSet map[uint64]bool
}
func (d *deadClickDetector) HandleReaction(timestamp uint64) *IssueEvent {
var i *IssueEvent
if d.lastMouseClick != nil && d.lastTimestamp + CLICK_RELATION_TIME < timestamp {
if d.lastMouseClick != nil && d.lastTimestamp+CLICK_RELATION_TIME < timestamp {
i = &IssueEvent{
Type: "dead_click",
Type: "dead_click",
ContextString: d.lastMouseClick.Label,
Timestamp: d.lastTimestamp,
MessageID: d.lastMessageID,
Timestamp: d.lastTimestamp,
MessageID: d.lastMessageID,
}
}
d.inputIDSet = nil
@ -53,8 +51,8 @@ func (d *deadClickDetector) HandleMessage(msg Message, messageID uint64, timesta
d.lastMouseClick = m
d.lastTimestamp = timestamp
d.lastMessageID = messageID
case *SetNodeAttribute,
*RemoveNodeAttribute,
case *SetNodeAttribute,
*RemoveNodeAttribute,
*CreateElementNode,
*CreateTextNode,
*MoveNode,
@ -66,5 +64,3 @@ func (d *deadClickDetector) HandleMessage(msg Message, messageID uint64, timesta
}
return i
}

View file

@ -4,14 +4,13 @@ import (
. "openreplay/backend/pkg/messages"
)
type domDropDetector struct {
removedCount int
removedCount int
lastDropTimestamp uint64
}
const DROP_WINDOW = 200 //ms
const CRITICAL_COUNT = 1 // Our login page contains 20. But on crush it removes only roots (1-3 nodes).
const DROP_WINDOW = 200 //ms
const CRITICAL_COUNT = 1 // Our login page contains 20. But on crush it removes only roots (1-3 nodes).
func (dd *domDropDetector) HandleNodeCreation() {
dd.removedCount = 0
@ -19,7 +18,7 @@ func (dd *domDropDetector) HandleNodeCreation() {
}
func (dd *domDropDetector) HandleNodeRemoval(ts uint64) {
if dd.lastDropTimestamp + DROP_WINDOW > ts {
if dd.lastDropTimestamp+DROP_WINDOW > ts {
dd.removedCount += 1
} else {
dd.removedCount = 1
@ -27,7 +26,6 @@ func (dd *domDropDetector) HandleNodeRemoval(ts uint64) {
dd.lastDropTimestamp = ts
}
func (dd *domDropDetector) Build() *DOMDrop {
var domDrop *DOMDrop
if dd.removedCount >= CRITICAL_COUNT {
@ -39,4 +37,3 @@ func (dd *domDropDetector) Build() *DOMDrop {
dd.lastDropTimestamp = 0
return domDrop
}

View file

@ -7,9 +7,9 @@ import (
type inputLabels map[uint64]string
type inputEventBuilder struct {
inputEvent *InputEvent
inputLabels inputLabels
inputID uint64
inputEvent *InputEvent
inputLabels inputLabels
inputID uint64
}
func NewInputEventBuilder() *inputEventBuilder {
@ -18,7 +18,6 @@ func NewInputEventBuilder() *inputEventBuilder {
return ieBuilder
}
func (b *inputEventBuilder) ClearLabels() {
b.inputLabels = make(inputLabels)
}
@ -57,11 +56,11 @@ func (b *inputEventBuilder) HasInstance() bool {
return b.inputEvent != nil
}
func (b * inputEventBuilder) GetTimestamp() uint64 {
func (b *inputEventBuilder) GetTimestamp() uint64 {
if b.inputEvent == nil {
return 0
}
return b.inputEvent.Timestamp;
return b.inputEvent.Timestamp
}
func (b *inputEventBuilder) Build() *InputEvent {

View file

@ -1,21 +1,21 @@
package builder
import (
"math"
"encoding/json"
"math"
. "openreplay/backend/pkg/messages"
)
const MIN_COUNT = 3
const MEM_RATE_THRESHOLD = 300 // % to average
const MEM_RATE_THRESHOLD = 300 // % to average
type memoryIssueFinder struct {
startMessageID uint64
startTimestamp uint64
rate int
count float64
sum float64
sum float64
contextString string
}
@ -23,13 +23,13 @@ func (f *memoryIssueFinder) Build() *IssueEvent {
if f.startTimestamp == 0 {
return nil
}
payload, _ := json.Marshal(struct{Rate int }{f.rate - 100,})
payload, _ := json.Marshal(struct{ Rate int }{f.rate - 100})
i := &IssueEvent{
Type: "memory",
Timestamp: f.startTimestamp,
MessageID: f.startMessageID,
Type: "memory",
Timestamp: f.startTimestamp,
MessageID: f.startMessageID,
ContextString: f.contextString,
Payload: string(payload),
Payload: string(payload),
}
f.startTimestamp = 0
f.startMessageID = 0
@ -48,8 +48,8 @@ func (f *memoryIssueFinder) HandlePerformanceTrack(msg *PerformanceTrack, messag
return nil
}
average := f.sum/f.count
rate := int(math.Round(float64(msg.UsedJSHeapSize)/average * 100))
average := f.sum / f.count
rate := int(math.Round(float64(msg.UsedJSHeapSize) / average * 100))
f.sum += float64(msg.UsedJSHeapSize)
f.count++
@ -68,5 +68,3 @@ func (f *memoryIssueFinder) HandlePerformanceTrack(msg *PerformanceTrack, messag
return nil
}

View file

@ -5,8 +5,8 @@ import (
)
type pageEventBuilder struct {
pageEvent *PageEvent
firstTimingHandled bool
pageEvent *PageEvent
firstTimingHandled bool
}
func (b *pageEventBuilder) buildIfTimingsComplete() *PageEvent {
@ -28,7 +28,7 @@ func (b *pageEventBuilder) HandleSetPageLocation(msg *SetPageLocation, messageID
}
}
func (b * pageEventBuilder) HandlePageLoadTiming(msg *PageLoadTiming) *PageEvent {
func (b *pageEventBuilder) HandlePageLoadTiming(msg *PageLoadTiming) *PageEvent {
if !b.HasInstance() {
return nil
}
@ -62,7 +62,7 @@ func (b * pageEventBuilder) HandlePageLoadTiming(msg *PageLoadTiming) *PageEvent
return b.buildIfTimingsComplete()
}
func (b * pageEventBuilder) HandlePageRenderTiming(msg *PageRenderTiming) *PageEvent {
func (b *pageEventBuilder) HandlePageRenderTiming(msg *PageRenderTiming) *PageEvent {
if !b.HasInstance() {
return nil
}
@ -76,16 +76,16 @@ func (b *pageEventBuilder) HasInstance() bool {
return b.pageEvent != nil
}
func (b * pageEventBuilder) GetTimestamp() uint64 {
func (b *pageEventBuilder) GetTimestamp() uint64 {
if b.pageEvent == nil {
return 0
}
return b.pageEvent.Timestamp;
return b.pageEvent.Timestamp
}
func (b * pageEventBuilder) Build() *PageEvent {
func (b *pageEventBuilder) Build() *PageEvent {
pageEvent := b.pageEvent
b.pageEvent = nil
b.firstTimingHandled = false
return pageEvent
}
}

View file

@ -3,22 +3,20 @@ package builder
import (
"math"
"openreplay/backend/pkg/messages/performance"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/messages/performance"
)
type performanceTrackAggrBuilder struct {
performanceTrackAggr *PerformanceTrackAggr
lastTimestamp uint64
count float64
sumFrameRate float64
sumTickRate float64
sumTotalJSHeapSize float64
sumUsedJSHeapSize float64
performanceTrackAggr *PerformanceTrackAggr
lastTimestamp uint64
count float64
sumFrameRate float64
sumTickRate float64
sumTotalJSHeapSize float64
sumUsedJSHeapSize float64
}
func (b *performanceTrackAggrBuilder) start(timestamp uint64) {
b.performanceTrackAggr = &PerformanceTrackAggr{
TimestampStart: timestamp,
@ -39,7 +37,7 @@ func (b *performanceTrackAggrBuilder) HandlePerformanceTrack(msg *PerformanceTra
}
frameRate := performance.FrameRate(msg.Frames, dt)
tickRate := performance.TickRate(msg.Ticks, dt)
tickRate := performance.TickRate(msg.Ticks, dt)
fps := uint64(math.Round(frameRate))
cpu := performance.CPURateFromTickRate(tickRate)
@ -84,7 +82,7 @@ func (b *performanceTrackAggrBuilder) GetStartTimestamp() uint64 {
if b.performanceTrackAggr == nil {
return 0
}
return b.performanceTrackAggr.TimestampStart;
return b.performanceTrackAggr.TimestampStart
}
func (b *performanceTrackAggrBuilder) Build() *PerformanceTrackAggr {
@ -106,4 +104,3 @@ func (b *performanceTrackAggrBuilder) Build() *PerformanceTrackAggr {
b.lastTimestamp = 0
return performanceTrackAggr
}

View file

@ -35,7 +35,7 @@ func main() {
env.String("TOPIC_RAW_IOS"),
},
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
statsLogger.HandleAndLog(sessionID, meta)
statsLogger.Collect(sessionID, meta)
builderMap.HandleMessage(sessionID, msg, msg.Meta().Index)
},
false,

View file

@ -1,36 +0,0 @@
package main
import (
"openreplay/backend/pkg/url/assets"
"openreplay/backend/pkg/messages"
)
func sendAssetForCache(sessionID uint64, baseURL string, relativeURL string) {
if fullURL, cacheable := assets.GetFullCachableURL(baseURL, relativeURL); cacheable {
producer.Produce(TOPIC_CACHE, sessionID, messages.Encode(&messages.AssetCache{
URL: fullURL,
}))
}
}
func sendAssetsForCacheFromCSS(sessionID uint64, baseURL string, css string) {
for _, u := range assets.ExtractURLsFromCSS(css) { // TODO: in one shot with rewriting
sendAssetForCache(sessionID, baseURL, u)
}
}
func handleURL(sessionID uint64, baseURL string, url string) string {
if CACHE_ASSESTS {
sendAssetForCache(sessionID, baseURL, url)
return rewriter.RewriteURL(sessionID, baseURL, url)
}
return assets.ResolveURL(baseURL, url)
}
func handleCSS(sessionID uint64, baseURL string, css string) string {
if CACHE_ASSESTS {
sendAssetsForCacheFromCSS(sessionID, baseURL, css)
return rewriter.RewriteCSS(sessionID, baseURL, css)
}
return assets.ResolveCSS(baseURL, css)
}

View file

View file

@ -1 +0,0 @@
package main

View file

@ -1,195 +0,0 @@
package main
import (
"encoding/json"
"errors"
"log"
"math/rand"
"net/http"
"strconv"
"time"
"openreplay/backend/pkg/db/postgres"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/token"
)
const FILES_SIZE_LIMIT int64 = 1e7 // 10Mb
func startSessionHandlerIOS(w http.ResponseWriter, r *http.Request) {
type request struct {
Token string `json:"token"`
ProjectKey *string `json:"projectKey"`
TrackerVersion string `json:"trackerVersion"`
RevID string `json:"revID"`
UserUUID *string `json:"userUUID"`
//UserOS string `json"userOS"` //hardcoded 'MacOS'
UserOSVersion string `json:"userOSVersion"`
UserDevice string `json:"userDevice"`
Timestamp uint64 `json:"timestamp"`
// UserDeviceType uint 0:phone 1:pad 2:tv 3:carPlay 5:mac
// “performances”:{
// “activeProcessorCount”:8,
// “isLowPowerModeEnabled”:0,
// “orientation”:0,
// “systemUptime”:585430,
// “batteryState”:0,
// “thermalState”:0,
// “batteryLevel”:0,
// “processorCount”:8,
// “physicalMemory”:17179869184
// },
}
type response struct {
Token string `json:"token"`
ImagesHashList []string `json:"imagesHashList"`
UserUUID string `json:"userUUID"`
BeaconSizeLimit int64 `json:"beaconSizeLimit"`
SessionID string `json:"sessionID"`
}
startTime := time.Now()
req := &request{}
body := http.MaxBytesReader(w, r.Body, JSON_SIZE_LIMIT)
defer body.Close()
if err := json.NewDecoder(body).Decode(req); err != nil {
responseWithError(w, http.StatusBadRequest, err)
return
}
if req.ProjectKey == nil {
responseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"))
return
}
p, err := pgconn.GetProjectByKey(*req.ProjectKey)
if err != nil {
if postgres.IsNoRowsErr(err) {
responseWithError(w, http.StatusNotFound, errors.New("Project doesn't exist or is not active"))
} else {
responseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
}
return
}
userUUID := getUUID(req.UserUUID)
tokenData, err := tokenizer.Parse(req.Token)
if err != nil { // Starting the new one
dice := byte(rand.Intn(100)) // [0, 100)
if dice >= p.SampleRate {
responseWithError(w, http.StatusForbidden, errors.New("cancel"))
return
}
ua := uaParser.ParseFromHTTPRequest(r)
if ua == nil {
responseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
return
}
sessionID, err := flaker.Compose(uint64(startTime.UnixMilli()))
if err != nil {
responseWithError(w, http.StatusInternalServerError, err)
return
}
// TODO: if EXPIRED => send message for two sessions association
expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond)
tokenData = &token.TokenData{sessionID, expTime.UnixMilli()}
country := geoIP.ExtractISOCodeFromHTTPRequest(r)
// The difference with web is mostly here:
producer.Produce(TOPIC_RAW_IOS, tokenData.ID, Encode(&IOSSessionStart{
Timestamp: req.Timestamp,
ProjectID: uint64(p.ProjectID),
TrackerVersion: req.TrackerVersion,
RevID: req.RevID,
UserUUID: userUUID,
UserOS: "IOS",
UserOSVersion: req.UserOSVersion,
UserDevice: MapIOSDevice(req.UserDevice),
UserDeviceType: GetIOSDeviceType(req.UserDevice),
UserCountry: country,
}))
}
// imagesHashList, err := s3.GetFrequentlyUsedKeys(*(req.EncodedProjectID)) // TODO: reuse index: ~ frequency * size
// if err != nil {
// responseWithError(w, http.StatusInternalServerError, err)
// return
// }
responseWithJSON(w, &response{
// ImagesHashList: imagesHashList,
Token: tokenizer.Compose(*tokenData),
UserUUID: userUUID,
SessionID: strconv.FormatUint(tokenData.ID, 10),
BeaconSizeLimit: BEACON_SIZE_LIMIT,
})
}
func pushMessagesHandlerIOS(w http.ResponseWriter, r *http.Request) {
sessionData, err := tokenizer.ParseFromHTTPRequest(r)
if err != nil {
responseWithError(w, http.StatusUnauthorized, err)
return
}
pushMessages(w, r, sessionData.ID, TOPIC_RAW_IOS)
}
func pushLateMessagesHandlerIOS(w http.ResponseWriter, r *http.Request) {
sessionData, err := tokenizer.ParseFromHTTPRequest(r)
if err != nil && err != token.EXPIRED {
responseWithError(w, http.StatusUnauthorized, err)
return
}
// Check timestamps here?
pushMessages(w, r, sessionData.ID, TOPIC_RAW_IOS)
}
func imagesUploadHandlerIOS(w http.ResponseWriter, r *http.Request) {
log.Printf("recieved imagerequest")
sessionData, err := tokenizer.ParseFromHTTPRequest(r)
if err != nil { // Should accept expired token?
responseWithError(w, http.StatusUnauthorized, err)
return
}
r.Body = http.MaxBytesReader(w, r.Body, FILES_SIZE_LIMIT)
defer r.Body.Close()
err = r.ParseMultipartForm(1e6) // ~1Mb
if err == http.ErrNotMultipart || err == http.ErrMissingBoundary {
responseWithError(w, http.StatusUnsupportedMediaType, err)
// } else if err == multipart.ErrMessageTooLarge // if non-files part exceeds 10 MB
} else if err != nil {
responseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
}
if r.MultipartForm == nil {
responseWithError(w, http.StatusInternalServerError, errors.New("Multipart not parsed"))
}
if len(r.MultipartForm.Value["projectKey"]) == 0 {
responseWithError(w, http.StatusBadRequest, errors.New("projectKey parameter missing")) // status for missing/wrong parameter?
return
}
prefix := r.MultipartForm.Value["projectKey"][0] + "/" + strconv.FormatUint(sessionData.ID, 10) + "/"
for _, fileHeaderList := range r.MultipartForm.File {
for _, fileHeader := range fileHeaderList {
file, err := fileHeader.Open()
if err != nil {
continue // TODO: send server error or accumulate successful files
}
key := prefix + fileHeader.Filename
log.Printf("Uploading image... %v", key)
go func() { //TODO: mime type from header
if err := s3.Upload(file, key, "image/jpeg", false); err != nil {
log.Printf("Upload ios screen error. %v", err)
}
}()
}
}
w.WriteHeader(http.StatusOK)
}

View file

@ -1,241 +0,0 @@
package main
import (
"encoding/json"
"errors"
"log"
"math/rand"
"net/http"
"strconv"
"time"
"openreplay/backend/pkg/db/postgres"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/token"
)
func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
type request struct {
Token string `json:"token"`
UserUUID *string `json:"userUUID"`
RevID string `json:"revID"`
Timestamp uint64 `json:"timestamp"`
TrackerVersion string `json:"trackerVersion"`
IsSnippet bool `json:"isSnippet"`
DeviceMemory uint64 `json:"deviceMemory"`
JsHeapSizeLimit uint64 `json:"jsHeapSizeLimit"`
ProjectKey *string `json:"projectKey"`
Reset bool `json:"reset"`
UserID string `json:"userID"`
}
type response struct {
Timestamp int64 `json:"timestamp"`
Delay int64 `json:"delay"`
Token string `json:"token"`
UserUUID string `json:"userUUID"`
SessionID string `json:"sessionID"`
BeaconSizeLimit int64 `json:"beaconSizeLimit"`
}
startTime := time.Now()
req := &request{}
body := http.MaxBytesReader(w, r.Body, JSON_SIZE_LIMIT) // what if Body == nil?? // use r.ContentLength to return specific error?
defer body.Close()
if err := json.NewDecoder(body).Decode(req); err != nil {
responseWithError(w, http.StatusBadRequest, err)
return
}
if req.ProjectKey == nil {
responseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"))
return
}
p, err := pgconn.GetProjectByKey(*req.ProjectKey)
if err != nil {
if postgres.IsNoRowsErr(err) {
responseWithError(w, http.StatusNotFound, errors.New("Project doesn't exist or capture limit has been reached"))
} else {
responseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
}
return
}
userUUID := getUUID(req.UserUUID)
tokenData, err := tokenizer.Parse(req.Token)
if err != nil || req.Reset { // Starting the new one
dice := byte(rand.Intn(100)) // [0, 100)
if dice >= p.SampleRate {
responseWithError(w, http.StatusForbidden, errors.New("cancel"))
return
}
ua := uaParser.ParseFromHTTPRequest(r)
if ua == nil {
responseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
return
}
sessionID, err := flaker.Compose(uint64(startTime.UnixNano() / 1e6))
if err != nil {
responseWithError(w, http.StatusInternalServerError, err)
return
}
// TODO: if EXPIRED => send message for two sessions association
expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond)
tokenData = &token.TokenData{sessionID, expTime.UnixNano() / 1e6}
country := geoIP.ExtractISOCodeFromHTTPRequest(r)
producer.Produce(TOPIC_RAW_WEB, tokenData.ID, Encode(&SessionStart{
Timestamp: req.Timestamp,
ProjectID: uint64(p.ProjectID),
TrackerVersion: req.TrackerVersion,
RevID: req.RevID,
UserUUID: userUUID,
UserAgent: r.Header.Get("User-Agent"),
UserOS: ua.OS,
UserOSVersion: ua.OSVersion,
UserBrowser: ua.Browser,
UserBrowserVersion: ua.BrowserVersion,
UserDevice: ua.Device,
UserDeviceType: ua.DeviceType,
UserCountry: country,
UserDeviceMemorySize: req.DeviceMemory,
UserDeviceHeapSize: req.JsHeapSizeLimit,
UserID: req.UserID,
}))
}
//delayDuration := time.Now().Sub(startTime)
responseWithJSON(w, &response{
//Timestamp: startTime.UnixNano() / 1e6,
//Delay: delayDuration.Nanoseconds() / 1e6,
Token: tokenizer.Compose(*tokenData),
UserUUID: userUUID,
SessionID: strconv.FormatUint(tokenData.ID, 10),
BeaconSizeLimit: BEACON_SIZE_LIMIT,
})
}
func pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request) {
sessionData, err := tokenizer.ParseFromHTTPRequest(r)
if err != nil {
responseWithError(w, http.StatusUnauthorized, err)
return
}
body := http.MaxBytesReader(w, r.Body, BEACON_SIZE_LIMIT)
defer body.Close()
rewritenBuf, err := RewriteBatch(body, func(msg Message) Message {
switch m := msg.(type) {
case *SetNodeAttributeURLBased:
if m.Name == "src" || m.Name == "href" {
msg = &SetNodeAttribute{
ID: m.ID,
Name: m.Name,
Value: handleURL(sessionData.ID, m.BaseURL, m.Value),
}
} else if m.Name == "style" {
msg = &SetNodeAttribute{
ID: m.ID,
Name: m.Name,
Value: handleCSS(sessionData.ID, m.BaseURL, m.Value),
}
}
case *SetCSSDataURLBased:
msg = &SetCSSData{
ID: m.ID,
Data: handleCSS(sessionData.ID, m.BaseURL, m.Data),
}
case *CSSInsertRuleURLBased:
msg = &CSSInsertRule{
ID: m.ID,
Index: m.Index,
Rule: handleCSS(sessionData.ID, m.BaseURL, m.Rule),
}
}
// switch msg.(type) {
// case *BatchMeta, // TODO: watchout! Meta().Index'es are changed here (though it is still unique for the topic-session pair)
// *SetPageLocation,
// *PageLoadTiming,
// *PageRenderTiming,
// *PerformanceTrack,
// *SetInputTarget,
// *SetInputValue,
// *MouseClick,
// *RawErrorEvent,
// *JSException,
// *ResourceTiming,
// *RawCustomEvent,
// *CustomIssue,
// *Fetch,
// *StateAction,
// *GraphQL,
// *CreateElementNode,
// *CreateTextNode,
// *RemoveNode,
// *CreateDocument,
// *RemoveNodeAttribute,
// *MoveNode,
// *SetCSSData,
// *CSSInsertRule,
// *CSSDeleteRule:
// analyticsMessages = append(analyticsMessages, msg)
//}
return msg
})
if err != nil {
responseWithError(w, http.StatusForbidden, err)
return
}
producer.Produce(TOPIC_RAW_WEB, sessionData.ID, rewritenBuf)
//producer.Produce(TOPIC_ANALYTICS, sessionData.ID, WriteBatch(analyticsMessages))
//duration := time.Now().Sub(startTime)
//log.Printf("Sended batch within %v nsec; %v nsek/byte", duration.Nanoseconds(), duration.Nanoseconds()/int64(len(buf)))
w.WriteHeader(http.StatusOK)
}
func notStartedHandlerWeb(w http.ResponseWriter, r *http.Request) {
type request struct {
ProjectKey *string `json:"projectKey"`
TrackerVersion string `json:"trackerVersion"`
DoNotTrack bool `json:"DoNotTrack"`
// RevID string `json:"revID"`
}
req := &request{}
body := http.MaxBytesReader(w, r.Body, JSON_SIZE_LIMIT)
defer body.Close()
if err := json.NewDecoder(body).Decode(req); err != nil {
responseWithError(w, http.StatusBadRequest, err)
return
}
if req.ProjectKey == nil {
responseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"))
return
}
ua := uaParser.ParseFromHTTPRequest(r) // TODO?: insert anyway
if ua == nil {
responseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
return
}
country := geoIP.ExtractISOCodeFromHTTPRequest(r)
err := pgconn.InsertUnstartedSession(postgres.UnstartedSession{
ProjectKey: *req.ProjectKey,
TrackerVersion: req.TrackerVersion,
DoNotTrack: req.DoNotTrack,
Platform: "web",
UserAgent: r.Header.Get("User-Agent"),
UserOS: ua.OS,
UserOSVersion: ua.OSVersion,
UserBrowser: ua.Browser,
UserBrowserVersion: ua.BrowserVersion,
UserDevice: ua.Device,
UserDeviceType: ua.DeviceType,
UserCountry: country,
})
if err != nil {
log.Printf("Unable to insert Unstarted Session: %v\n", err)
}
w.WriteHeader(http.StatusOK)
}

View file

@ -1,138 +0,0 @@
package main
import (
"strings"
)
func MapIOSDevice(identifier string) string {
switch identifier {
case "iPod5,1":
return "iPod touch (5th generation)"
case "iPod7,1":
return "iPod touch (6th generation)"
case "iPod9,1":
return "iPod touch (7th generation)"
case "iPhone3,1", "iPhone3,2", "iPhone3,3":
return "iPhone 4"
case "iPhone4,1":
return "iPhone 4s"
case "iPhone5,1", "iPhone5,2":
return "iPhone 5"
case "iPhone5,3", "iPhone5,4":
return "iPhone 5c"
case "iPhone6,1", "iPhone6,2":
return "iPhone 5s"
case "iPhone7,2":
return "iPhone 6"
case "iPhone7,1":
return "iPhone 6 Plus"
case "iPhone8,1":
return "iPhone 6s"
case "iPhone8,2":
return "iPhone 6s Plus"
case "iPhone8,4":
return "iPhone SE"
case "iPhone9,1", "iPhone9,3":
return "iPhone 7"
case "iPhone9,2", "iPhone9,4":
return "iPhone 7 Plus"
case "iPhone10,1", "iPhone10,4":
return "iPhone 8"
case "iPhone10,2", "iPhone10,5":
return "iPhone 8 Plus"
case "iPhone10,3", "iPhone10,6":
return "iPhone X"
case "iPhone11,2":
return "iPhone XS"
case "iPhone11,4", "iPhone11,6":
return "iPhone XS Max"
case "iPhone11,8":
return "iPhone XR"
case "iPhone12,1":
return "iPhone 11"
case "iPhone12,3":
return "iPhone 11 Pro"
case "iPhone12,5":
return "iPhone 11 Pro Max"
case "iPhone12,8":
return "iPhone SE (2nd generation)"
case "iPhone13,1":
return "iPhone 12 mini"
case "iPhone13,2":
return "iPhone 12"
case "iPhone13,3":
return "iPhone 12 Pro"
case "iPhone13,4":
return "iPhone 12 Pro Max"
case "iPad2,1", "iPad2,2", "iPad2,3", "iPad2,4":
return "iPad 2"
case "iPad3,1", "iPad3,2", "iPad3,3":
return "iPad (3rd generation)"
case "iPad3,4", "iPad3,5", "iPad3,6":
return "iPad (4th generation)"
case "iPad6,11", "iPad6,12":
return "iPad (5th generation)"
case "iPad7,5", "iPad7,6":
return "iPad (6th generation)"
case "iPad7,11", "iPad7,12":
return "iPad (7th generation)"
case "iPad11,6", "iPad11,7":
return "iPad (8th generation)"
case "iPad4,1", "iPad4,2", "iPad4,3":
return "iPad Air"
case "iPad5,3", "iPad5,4":
return "iPad Air 2"
case "iPad11,3", "iPad11,4":
return "iPad Air (3rd generation)"
case "iPad13,1", "iPad13,2":
return "iPad Air (4th generation)"
case "iPad2,5", "iPad2,6", "iPad2,7":
return "iPad mini"
case "iPad4,4", "iPad4,5", "iPad4,6":
return "iPad mini 2"
case "iPad4,7", "iPad4,8", "iPad4,9":
return "iPad mini 3"
case "iPad5,1", "iPad5,2":
return "iPad mini 4"
case "iPad11,1", "iPad11,2":
return "iPad mini (5th generation)"
case "iPad6,3", "iPad6,4":
return "iPad Pro (9.7-inch)"
case "iPad7,3", "iPad7,4":
return "iPad Pro (10.5-inch)"
case "iPad8,1", "iPad8,2", "iPad8,3", "iPad8,4":
return "iPad Pro (11-inch) (1st generation)"
case "iPad8,9", "iPad8,10":
return "iPad Pro (11-inch) (2nd generation)"
case "iPad6,7", "iPad6,8":
return "iPad Pro (12.9-inch) (1st generation)"
case "iPad7,1", "iPad7,2":
return "iPad Pro (12.9-inch) (2nd generation)"
case "iPad8,5", "iPad8,6", "iPad8,7", "iPad8,8":
return "iPad Pro (12.9-inch) (3rd generation)"
case "iPad8,11", "iPad8,12":
return "iPad Pro (12.9-inch) (4th generation)"
case "AppleTV5,3":
return "Apple TV"
case "AppleTV6,2":
return "Apple TV 4K"
case "AudioAccessory1,1":
return "HomePod"
case "AudioAccessory5,1":
return "HomePod mini"
case "i386", "x86_64":
return "Simulator"
default:
return identifier
}
}
func GetIOSDeviceType(identifier string) string {
if strings.Contains(identifier, "iPhone") {
return "mobile" //"phone"
}
if strings.Contains(identifier, "iPad") {
return "tablet"
}
return "other"
}

View file

@ -1,156 +0,0 @@
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"golang.org/x/net/http2"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/env"
"openreplay/backend/pkg/flakeid"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/storage"
"openreplay/backend/pkg/token"
"openreplay/backend/pkg/url/assets"
"openreplay/backend/services/http/geoip"
"openreplay/backend/services/http/uaparser"
"openreplay/backend/pkg/pprof"
)
var rewriter *assets.Rewriter
var producer types.Producer
var pgconn *cache.PGCache
var flaker *flakeid.Flaker
var uaParser *uaparser.UAParser
var geoIP *geoip.GeoIP
var tokenizer *token.Tokenizer
var s3 *storage.S3
var TOPIC_RAW_WEB string
var TOPIC_RAW_IOS string
var TOPIC_CACHE string
var TOPIC_TRIGGER string
//var TOPIC_ANALYTICS string
var CACHE_ASSESTS bool
var BEACON_SIZE_LIMIT int64
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
pprof.StartProfilingServer()
producer = queue.NewProducer()
defer producer.Close(15000)
TOPIC_RAW_WEB = env.String("TOPIC_RAW_WEB")
TOPIC_RAW_IOS = env.String("TOPIC_RAW_IOS")
TOPIC_CACHE = env.String("TOPIC_CACHE")
TOPIC_TRIGGER = env.String("TOPIC_TRIGGER")
//TOPIC_ANALYTICS = env.String("TOPIC_ANALYTICS")
rewriter = assets.NewRewriter(env.String("ASSETS_ORIGIN"))
pgconn = cache.NewPGCache(postgres.NewConn(env.String("POSTGRES_STRING")), 1000*60*20)
defer pgconn.Close()
s3 = storage.NewS3(env.String("AWS_REGION"), env.String("S3_BUCKET_IOS_IMAGES"))
tokenizer = token.NewTokenizer(env.String("TOKEN_SECRET"))
uaParser = uaparser.NewUAParser(env.String("UAPARSER_FILE"))
geoIP = geoip.NewGeoIP(env.String("MAXMINDDB_FILE"))
flaker = flakeid.NewFlaker(env.WorkerID())
CACHE_ASSESTS = env.Bool("CACHE_ASSETS")
BEACON_SIZE_LIMIT = int64(env.Uint64("BEACON_SIZE_LIMIT"))
HTTP_PORT := env.String("HTTP_PORT")
server := &http.Server{
Addr: ":" + HTTP_PORT,
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// TODO: agree with specification
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type,Authorization")
if r.Method == http.MethodOptions {
w.Header().Set("Cache-Control", "max-age=86400")
w.WriteHeader(http.StatusOK)
return
}
log.Printf("Request: %v - %v ", r.Method, r.URL.Path)
switch r.URL.Path {
case "/":
w.WriteHeader(http.StatusOK)
case "/v1/web/not-started":
switch r.Method {
case http.MethodPost:
notStartedHandlerWeb(w, r)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
case "/v1/web/start":
switch r.Method {
case http.MethodPost:
startSessionHandlerWeb(w, r)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
case "/v1/web/i":
switch r.Method {
case http.MethodPost:
pushMessagesHandlerWeb(w, r)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
case "/v1/ios/start":
switch r.Method {
case http.MethodPost:
startSessionHandlerIOS(w, r)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
case "/v1/ios/i":
switch r.Method {
case http.MethodPost:
pushMessagesHandlerIOS(w, r)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
case "/v1/ios/late":
switch r.Method {
case http.MethodPost:
pushLateMessagesHandlerIOS(w, r)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
case "/v1/ios/images":
switch r.Method {
case http.MethodPost:
imagesUploadHandlerIOS(w, r)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
default:
w.WriteHeader(http.StatusNotFound)
}
}),
}
http2.ConfigureServer(server, nil)
go func() {
if err := server.ListenAndServe(); err != nil {
log.Printf("Server error: %v\n", err)
log.Fatal("Server error")
}
}()
log.Printf("Server successfully started on port %v\n", HTTP_PORT)
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
<-sigchan
log.Printf("Shutting down the server\n")
server.Shutdown(context.Background())
}

View file

@ -7,38 +7,36 @@ import (
"openreplay/backend/services/integrations/integration"
)
type manager struct {
clientMap integration.ClientMap
Events chan *integration.SessionErrorEvent
Errors chan error
RequestDataUpdates chan postgres.Integration // not pointer because it could change in other thread
clientMap integration.ClientMap
Events chan *integration.SessionErrorEvent
Errors chan error
RequestDataUpdates chan postgres.Integration // not pointer because it could change in other thread
}
func NewManager() *manager {
return &manager {
clientMap: make(integration.ClientMap),
return &manager{
clientMap: make(integration.ClientMap),
RequestDataUpdates: make(chan postgres.Integration, 100),
Events: make(chan *integration.SessionErrorEvent, 100),
Errors: make(chan error, 100),
Events: make(chan *integration.SessionErrorEvent, 100),
Errors: make(chan error, 100),
}
}
func (m* manager) Update(i *postgres.Integration) error {
func (m *manager) Update(i *postgres.Integration) error {
key := strconv.Itoa(int(i.ProjectID)) + i.Provider
if i.Options == nil {
delete(m.clientMap, key)
return nil
}
c, exists := m.clientMap[ key ]
c, exists := m.clientMap[key]
if !exists {
c, err := integration.NewClient(i, m.RequestDataUpdates, m.Events, m.Errors)
if err != nil {
return err
}
m.clientMap[ key ] = c
m.clientMap[key] = c
return nil
}
return c.Update(i)

View file

@ -2,43 +2,40 @@ package integration
import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"strings"
"regexp"
"openreplay/backend/pkg/messages"
"regexp"
"strings"
)
var reIsException = regexp.MustCompile(`(?i)exception|error`)
type cloudwatch struct {
AwsAccessKeyId string // `json:"aws_access_key_id"`
AwsSecretAccessKey string // `json:"aws_secret_access_key"`
LogGroupName string // `json:"log_group_name"`
Region string // `json:"region"`
AwsAccessKeyId string // `json:"aws_access_key_id"`
AwsSecretAccessKey string // `json:"aws_secret_access_key"`
LogGroupName string // `json:"log_group_name"`
Region string // `json:"region"`
}
func (cw *cloudwatch) Request(c *client) error {
startTs := int64(c.getLastMessageTimestamp() + 1) // From next millisecond
startTs := int64(c.getLastMessageTimestamp() + 1) // From next millisecond
//endTs := utils.CurrentTimestamp()
sess, err := session.NewSession(aws.NewConfig().
WithRegion(cw.Region).
WithCredentials(
credentials.NewStaticCredentials(cw.AwsAccessKeyId, cw.AwsSecretAccessKey, ""),
),
WithRegion(cw.Region).
WithCredentials(
credentials.NewStaticCredentials(cw.AwsAccessKeyId, cw.AwsSecretAccessKey, ""),
),
)
if err != nil {
return err
}
svc := cloudwatchlogs.New(sess)
filterOptions := new(cloudwatchlogs.FilterLogEventsInput).
SetStartTime(startTs). // Inclusively both startTime and endTime
SetStartTime(startTs). // Inclusively both startTime and endTime
// SetEndTime(endTs). // Default nil?
// SetLimit(10000). // Default 10000
SetLogGroupName(cw.LogGroupName).
@ -56,7 +53,7 @@ func (cw *cloudwatch) Request(c *client) error {
}
if !reIsException.MatchString(*e.Message) { // too weak condition ?
continue
}
}
token, err := GetToken(*e.Message)
if err != nil {
c.errChan <- err
@ -72,18 +69,18 @@ func (cw *cloudwatch) Request(c *client) error {
//SessionID: sessionID,
Token: token,
RawErrorEvent: &messages.RawErrorEvent{
Source: "cloudwatch",
Timestamp: timestamp, // e.IngestionTime ??
Name: name,
Payload: strings.ReplaceAll(e.String(), "\n", ""),
Source: "cloudwatch",
Timestamp: timestamp, // e.IngestionTime ??
Name: name,
Payload: strings.ReplaceAll(e.String(), "\n", ""),
},
}
}
if output.NextToken == nil {
break;
break
}
filterOptions.NextToken = output.NextToken
}
return nil
}
}

View file

@ -53,14 +53,14 @@ func (es *elasticsearch) Request(c *client) error {
"query": map[string]interface{}{
"bool": map[string]interface{}{
"filter": []map[string]interface{}{
map[string]interface{}{
{
"match": map[string]interface{}{
"message": map[string]interface{}{
"query": "openReplaySessionToken=", // asayer_session_id=
},
},
},
map[string]interface{}{
{
"range": map[string]interface{}{
"utc_time": map[string]interface{}{
"gte": strconv.FormatUint(gteTs, 10),
@ -68,7 +68,7 @@ func (es *elasticsearch) Request(c *client) error {
},
},
},
map[string]interface{}{
{
"term": map[string]interface{}{
"tags": "error",
},

View file

@ -1,15 +1,15 @@
package integration
import (
"net/http"
"encoding/json"
"errors"
"fmt"
"time"
"strings"
"strconv"
"io"
"io/ioutil"
"errors"
"io/ioutil"
"net/http"
"strconv"
"strings"
"time"
"openreplay/backend/pkg/messages"
)
@ -17,42 +17,42 @@ import (
// Old name: asayerSessionId
// QUERY: what can be modified?
const RB_QUERY =
"SELECT item.id, item.title,body.message.openReplaySessionToken,item.level,"+
" item.counter,item.environment,body.crash_report.raw,body.message.body,timestamp"+
" FROM item_occurrence"+
" WHERE body.message.openReplaySessionToken != null"+
" AND timestamp>= %v"+
" AND item.level>30"+
" ORDER BY timestamp"+
const RB_QUERY = "SELECT item.id, item.title,body.message.openReplaySessionToken,item.level," +
" item.counter,item.environment,body.crash_report.raw,body.message.body,timestamp" +
" FROM item_occurrence" +
" WHERE body.message.openReplaySessionToken != null" +
" AND timestamp>= %v" +
" AND item.level>30" +
" ORDER BY timestamp" +
" LIMIT 1000"
// ASC by default
// \n\t symbols can spoil the request body, so it wouldn't work (OR probably it happend because of job hashing)
/*
- `read` Access Token required
- timstamp in seconds
- `read` Access Token required
- timstamp in seconds
*/
type rollbar struct {
AccessToken string // `json:"access_token"`
AccessToken string // `json:"access_token"`
}
type rollbarJobResponce struct {
Err int
Err int
Message string
Result struct {
Result struct {
Id int
}
}
type rollbarJobStatusResponce struct {
Err int
Err int
Result struct {
Status string
Result struct {
Rows [][] json.Number
Columns[] string
Rows [][]json.Number
Columns []string
}
}
}
@ -65,7 +65,7 @@ type rollbarEvent map[string]string
*/
func (rb *rollbar) Request(c *client) error {
fromTs := c.getLastMessageTimestamp() + 1000 // From next second
c.setLastMessageTimestamp(fromTs) // anti-job-hashing
c.setLastMessageTimestamp(fromTs) // anti-job-hashing
fromTsSec := fromTs / 1e3
query := fmt.Sprintf(RB_QUERY, fromTsSec)
jsonBody := fmt.Sprintf(`{
@ -111,7 +111,7 @@ func (rb *rollbar) Request(c *client) error {
tick := time.Tick(5 * time.Second)
for {
<- tick
<-tick
resp, err = http.DefaultClient.Do(req)
if err != nil {
return err // continue + timeout/maxAttempts
@ -131,14 +131,14 @@ func (rb *rollbar) Request(c *client) error {
e := make(rollbarEvent)
for i, col := range jobStatus.Result.Result.Columns {
//if len(row) <= i { error }
e[ col ] = row[ i ].String() // here I make them all string. That's not good
e[col] = row[i].String() // here I make them all string. That's not good
}
// sessionID, err := strconv.ParseUint(e[ "body.message.asayerSessionId" ], 10, 64)
// if err != nil {
// c.errChan <- err
// continue
// }
if e[ "body.message.openReplaySessionToken" ] == "" {
if e["body.message.openReplaySessionToken"] == "" {
c.errChan <- errors.New("Token is empty!")
continue
}
@ -147,7 +147,7 @@ func (rb *rollbar) Request(c *client) error {
c.errChan <- err
continue
}
timestampSec, err := strconv.ParseUint(e[ "timestamp" ], 10, 64)
timestampSec, err := strconv.ParseUint(e["timestamp"], 10, 64)
if err != nil {
c.errChan <- err
continue
@ -155,22 +155,22 @@ func (rb *rollbar) Request(c *client) error {
timestamp := timestampSec * 1000
c.setLastMessageTimestamp(timestamp)
c.evChan <- &SessionErrorEvent{
Token: e[ "body.message.openReplaySessionToken" ],
Token: e["body.message.openReplaySessionToken"],
RawErrorEvent: &messages.RawErrorEvent{
Source: "rollbar",
Source: "rollbar",
Timestamp: timestamp,
Name: e[ "item.title" ],
Payload: string(payload),
Name: e["item.title"],
Payload: string(payload),
},
}
}
break
}
if jobStatus.Result.Status != "new" &&
if jobStatus.Result.Status != "new" &&
jobStatus.Result.Status != "running" {
// error
break
}
}
return nil
}
}

View file

@ -1,34 +1,37 @@
package integration
import (
"fmt"
"regexp"
"strconv"
"strings"
"fmt"
)
var reSessionID = regexp.MustCompile(`(?i)asayer_session_id=([0-9]+)`)
func GetAsayerSessionId(s string) (uint64, error) {
func GetAsayerSessionId(s string) (uint64, error) {
matches := reSessionID.FindStringSubmatch(s)
if len(matches) < 2 {
return 0, fmt.Errorf("'asayer_session_id' not found in '%v' ", s)
}
return strconv.ParseUint(matches[ 1 ], 10, 64)
return strconv.ParseUint(matches[1], 10, 64)
}
func GetLinkFromAngularBrackets(s string) string {
beg := strings.Index(s, "<") + 1
end := strings.Index(s, ">")
if end < 0 { return "" }
if end < 0 {
return ""
}
return strings.TrimSpace(s[beg:end])
}
var reToken = regexp.MustCompile(`(?i)openReplaySessionToken=([0-9a-zA-Z\.]+)`)
func GetToken(s string) (string, error) {
func GetToken(s string) (string, error) {
matches := reToken.FindStringSubmatch(s)
if len(matches) < 2 {
return "", fmt.Errorf("'openReplaySessionToken' not found in '%v' ", s)
}
return matches[ 1 ], nil
}
return matches[1], nil
}

View file

@ -34,12 +34,7 @@ func main() {
env.String("TOPIC_RAW_IOS"),
},
func(sessionID uint64, message Message, _ *types.Meta) {
//typeID, err := GetMessageTypeID(value)
// if err != nil {
// log.Printf("Message type decoding error: %v", err)
// return
// }
typeID := message.Meta().TypeID
typeID := message.TypeID()
if !IsReplayerType(typeID) {
return
}

View file

@ -1,19 +1,18 @@
package main
import (
"io"
gzip "github.com/klauspost/pgzip"
"io"
)
func gzipFile(file io.ReadSeeker) io.Reader {
func gzipFile(file io.Reader) io.Reader {
reader, writer := io.Pipe()
go func() {
gw, _ := gzip.NewWriterLevel(writer, gzip.BestSpeed)
io.Copy(gw, file)
go func() {
gw, _ := gzip.NewWriterLevel(writer, gzip.BestSpeed)
io.Copy(gw, file)
gw.Close()
writer.Close()
}()
return reader
}
gw.Close()
writer.Close()
}()
return reader
}

View file

@ -6,6 +6,8 @@ import (
"strconv"
"time"
"bytes"
"os/signal"
"syscall"
@ -16,6 +18,10 @@ import (
"openreplay/backend/pkg/storage"
)
const RetryTimeout = 2 * time.Minute
const SESSION_FILE_SPLIT_SIZE = 200000 // ~200 kB
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
@ -28,16 +34,30 @@ func main() {
if retryCount <= 0 {
return
}
file, err := os.Open(FS_DIR + "/" + key)
defer file.Close()
if err != nil {
log.Printf("File error: %v; Will retry %v more time(s)\n", err, retryCount)
time.AfterFunc(2*time.Minute, func() {
time.AfterFunc(RetryTimeout, func() {
uploadKey(key, retryCount-1)
})
} else {
if err := storage.Upload(gzipFile(file), key, "application/octet-stream", true); err != nil {
log.Fatalf("Storage upload error: %v\n", err)
return
}
defer file.Close()
startBytes := make([]byte, SESSION_FILE_SPLIT_SIZE)
nRead, err := file.Read(startBytes)
if err != nil {
log.Printf("File read error: %f", err)
return
}
startReader := bytes.NewBuffer(startBytes)
if err := storage.Upload(gzipFile(startReader), key, "application/octet-stream", true); err != nil {
log.Fatalf("Storage: start upload failed. %v\n", err)
}
if nRead == SESSION_FILE_SPLIT_SIZE {
if err := storage.Upload(gzipFile(file), key+"e", "application/octet-stream", true); err != nil {
log.Fatalf("Storage: end upload failed. %v\n", err)
}
}
}

Some files were not shown because too many files have changed in this diff Show more