Merge pull request #1070 from openreplay/dev

v1.11.0
This commit is contained in:
Mehdi Osman 2023-03-30 16:47:30 +02:00 committed by GitHub
commit 93c833b836
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
375 changed files with 7491 additions and 4986 deletions

View file

@ -8,7 +8,8 @@ on:
default: 'false'
push:
branches:
- api-v1.10.0
- dev
- api-*
paths:
- "ee/api/**"
- "api/**"

View file

@ -8,7 +8,8 @@ on:
default: 'false'
push:
branches:
- api-v1.10.0
- dev
- api-*
paths:
- "api/**"
- "!api/.gitignore"

View file

@ -8,7 +8,8 @@ on:
default: 'false'
push:
branches:
- api-v1.10.0
- dev
- api-*
paths:
- "ee/api/**"
- "api/**"

View file

@ -8,7 +8,8 @@ on:
default: 'false'
push:
branches:
- api-v1.10.0
- dev
- api-*
paths:
- "api/**"
- "!api/.gitignore"

View file

@ -4,11 +4,12 @@ on:
push:
branches:
- dev
- api-*
paths:
- "ee/utilities/**"
- "utilities/**"
- "!utilities/.gitignore"
- "!utilities/*-dev.sh"
- "ee/assist/**"
- "assist/**"
- "!assist/.gitignore"
- "!assist/*-dev.sh"
name: Build and Deploy Assist EE
@ -43,7 +44,7 @@ jobs:
ENVIRONMENT: staging
run: |
skip_security_checks=${{ github.event.inputs.skip_security_checks }}
cd utilities
cd assist
PUSH_IMAGE=0 bash -x ./build.sh ee
[[ "x$skip_security_checks" == "xtrue" ]] || {
curl -L https://github.com/aquasecurity/trivy/releases/download/v0.34.0/trivy_0.34.0_Linux-64bit.tar.gz | tar -xzf - -C ./
@ -100,9 +101,9 @@ jobs:
cat /tmp/image_override.yaml
# Deploy command
mv openreplay/charts/{ingress-nginx,chalice,quickwit} /tmp
mv openreplay/charts/{ingress-nginx,assist,quickwit} /tmp
rm -rf openreplay/charts/*
mv /tmp/{ingress-nginx,chalice,quickwit} openreplay/charts/
mv /tmp/{ingress-nginx,assist,quickwit} openreplay/charts/
helm template openreplay -n app openreplay -f vars.yaml -f /tmp/image_override.yaml --set ingress-nginx.enabled=false --set skipMigration=true --no-hooks --kube-version=$k_version | kubectl apply -f -
env:
DOCKER_REPO: ${{ secrets.EE_REGISTRY_URL }}

View file

@ -4,10 +4,11 @@ on:
push:
branches:
- dev
- api-*
paths:
- "utilities/**"
- "!utilities/.gitignore"
- "!utilities/*-dev.sh"
- "assist/**"
- "!assist/.gitignore"
- "!assist/*-dev.sh"
name: Build and Deploy Assist
@ -42,7 +43,7 @@ jobs:
ENVIRONMENT: staging
run: |
skip_security_checks=${{ github.event.inputs.skip_security_checks }}
cd utilities
cd assist
PUSH_IMAGE=0 bash -x ./build.sh
[[ "x$skip_security_checks" == "xtrue" ]] || {
curl -L https://github.com/aquasecurity/trivy/releases/download/v0.34.0/trivy_0.34.0_Linux-64bit.tar.gz | tar -xzf - -C ./
@ -99,9 +100,9 @@ jobs:
cat /tmp/image_override.yaml
# Deploy command
mv openreplay/charts/{ingress-nginx,chalice,quickwit} /tmp
mv openreplay/charts/{ingress-nginx,assist,quickwit} /tmp
rm -rf openreplay/charts/*
mv /tmp/{ingress-nginx,chalice,quickwit} openreplay/charts/
mv /tmp/{ingress-nginx,assist,quickwit} openreplay/charts/
helm template openreplay -n app openreplay -f vars.yaml -f /tmp/image_override.yaml --set ingress-nginx.enabled=false --set skipMigration=true --no-hooks --kube-version=$k_version | kubectl apply -f -
env:
DOCKER_REPO: ${{ secrets.OSS_REGISTRY_URL }}

View file

@ -8,7 +8,8 @@ on:
default: 'false'
push:
branches:
- api-v1.10.0
- dev
- api-*
paths:
- "ee/api/**"
- "api/**"

View file

@ -1,16 +1,22 @@
# This action will push the peers changes to aws
on:
workflow_dispatch:
inputs:
skip_security_checks:
description: 'Skip Security checks if there is a unfixable vuln or error. Value: true/false'
required: false
default: 'false'
push:
branches:
- dev
- api-*
paths:
- "ee/peers/**"
- "peers/**"
- "!peers/.gitignore"
- "!peers/*-dev.sh"
name: Build and Deploy Peers
name: Build and Deploy Peers EE
jobs:
deploy:
@ -35,30 +41,98 @@ jobs:
kubeconfig: ${{ secrets.EE_KUBECONFIG }} # Use content of kubeconfig in secret.
id: setcontext
- name: Building and Pushing api image
# Caching docker images
- uses: satackey/action-docker-layer-caching@v0.0.11
# Ignore the failure of a step and avoid terminating the job.
continue-on-error: true
- name: Building and Pushing peers image
id: build-image
env:
DOCKER_REPO: ${{ secrets.EE_REGISTRY_URL }}
IMAGE_TAG: ${{ github.ref_name }}_${{ github.sha }}
IMAGE_TAG: ${{ github.ref_name }}_${{ github.sha }}-ee
ENVIRONMENT: staging
run: |
skip_security_checks=${{ github.event.inputs.skip_security_checks }}
cd peers
PUSH_IMAGE=1 bash build.sh ee
PUSH_IMAGE=0 bash -x ./build.sh ee
[[ "x$skip_security_checks" == "xtrue" ]] || {
curl -L https://github.com/aquasecurity/trivy/releases/download/v0.34.0/trivy_0.34.0_Linux-64bit.tar.gz | tar -xzf - -C ./
images=("peers")
for image in ${images[*]};do
./trivy image --exit-code 1 --security-checks vuln --vuln-type os,library --severity "HIGH,CRITICAL" --ignore-unfixed $DOCKER_REPO/$image:$IMAGE_TAG
done
err_code=$?
[[ $err_code -ne 0 ]] && {
exit $err_code
}
} && {
echo "Skipping Security Checks"
}
images=("peers")
for image in ${images[*]};do
docker push $DOCKER_REPO/$image:$IMAGE_TAG
done
- name: Creating old image input
run: |
#
# Create yaml with existing image tags
#
kubectl get pods -n app -o jsonpath="{.items[*].spec.containers[*].image}" |\
tr -s '[[:space:]]' '\n' | sort | uniq -c | grep '/foss/' | cut -d '/' -f3 > /tmp/image_tag.txt
echo > /tmp/image_override.yaml
for line in `cat /tmp/image_tag.txt`;
do
image_array=($(echo "$line" | tr ':' '\n'))
cat <<EOF >> /tmp/image_override.yaml
${image_array[0]}:
image:
# We've to strip off the -ee, as helm will append it.
tag: `echo ${image_array[1]} | cut -d '-' -f 1`
EOF
done
- name: Deploy to kubernetes
run: |
cd scripts/helmcharts/
sed -i "s#openReplayContainerRegistry.*#openReplayContainerRegistry: \"${{ secrets.EE_REGISTRY_URL }}\"#g" vars.yaml
sed -i "s#minio_access_key.*#minio_access_key: \"${{ secrets.EE_MINIO_ACCESS_KEY }}\" #g" vars.yaml
sed -i "s#minio_secret_key.*#minio_secret_key: \"${{ secrets.EE_MINIO_SECRET_KEY }}\" #g" vars.yaml
sed -i "s#domain_name.*#domain_name: \"ee.openreplay.com\" #g" vars.yaml
sed -i "s#kubeconfig.*#kubeconfig_path: ${KUBECONFIG}#g" vars.yaml
sed -i "s/image_tag:.*/image_tag: \"$IMAGE_TAG\"/g" vars.yaml
bash kube-install.sh --app peers
## Update secerts
sed -i "s#openReplayContainerRegistry.*#openReplayContainerRegistry: \"${{ secrets.OSS_REGISTRY_URL }}\"#g" vars.yaml
sed -i "s/postgresqlPassword: \"changeMePassword\"/postgresqlPassword: \"${{ secrets.EE_PG_PASSWORD }}\"/g" vars.yaml
sed -i "s/accessKey: \"changeMeMinioAccessKey\"/accessKey: \"${{ secrets.EE_MINIO_ACCESS_KEY }}\"/g" vars.yaml
sed -i "s/secretKey: \"changeMeMinioPassword\"/secretKey: \"${{ secrets.EE_MINIO_SECRET_KEY }}\"/g" vars.yaml
sed -i "s/jwt_secret: \"SetARandomStringHere\"/jwt_secret: \"${{ secrets.EE_JWT_SECRET }}\"/g" vars.yaml
sed -i "s/domainName: \"\"/domainName: \"${{ secrets.EE_DOMAIN_NAME }}\"/g" vars.yaml
sed -i "s/enterpriseEditionLicense: \"\"/enterpriseEditionLicense: \"${{ secrets.EE_LICENSE_KEY }}\"/g" vars.yaml
# Update changed image tag
sed -i "/peers/{n;n;n;s/.*/ tag: ${IMAGE_TAG}/}" /tmp/image_override.yaml
cat /tmp/image_override.yaml
# Deploy command
mv openreplay/charts/{ingress-nginx,peers,quickwit} /tmp
rm -rf openreplay/charts/*
mv /tmp/{ingress-nginx,peers,quickwit} openreplay/charts/
helm template openreplay -n app openreplay -f vars.yaml -f /tmp/image_override.yaml --set ingress-nginx.enabled=false --set skipMigration=true --no-hooks --kube-version=$k_version | kubectl apply -f -
env:
DOCKER_REPO: ${{ secrets.EE_REGISTRY_URL }}
IMAGE_TAG: ${{ github.ref_name }}_${{ github.sha }}
ENVIRONMENT: staging
- name: Alert slack
if: ${{ failure() }}
uses: rtCamp/action-slack-notify@v2
env:
SLACK_CHANNEL: ee
SLACK_TITLE: "Failed ${{ github.workflow }}"
SLACK_COLOR: ${{ job.status }} # or a specific color like 'good' or '#ff00ff'
SLACK_WEBHOOK: ${{ secrets.SLACK_WEB_HOOK }}
SLACK_USERNAME: "OR Bot"
SLACK_MESSAGE: 'Build failed :bomb:'
# - name: Debug Job
# if: ${{ failure() }}
# uses: mxschmitt/action-tmate@v3

View file

@ -1,9 +1,15 @@
# This action will push the peers changes to aws
on:
workflow_dispatch:
inputs:
skip_security_checks:
description: 'Skip Security checks if there is a unfixable vuln or error. Value: true/false'
required: false
default: 'false'
push:
branches:
- dev
- api-*
paths:
- "peers/**"
- "!peers/.gitignore"
@ -34,30 +40,96 @@ jobs:
kubeconfig: ${{ secrets.OSS_KUBECONFIG }} # Use content of kubeconfig in secret.
id: setcontext
- name: Building and Pushing api image
# Caching docker images
- uses: satackey/action-docker-layer-caching@v0.0.11
# Ignore the failure of a step and avoid terminating the job.
continue-on-error: true
- name: Building and Pushing peers image
id: build-image
env:
DOCKER_REPO: ${{ secrets.OSS_REGISTRY_URL }}
IMAGE_TAG: ${{ github.ref_name }}_${{ github.sha }}
ENVIRONMENT: staging
run: |
skip_security_checks=${{ github.event.inputs.skip_security_checks }}
cd peers
PUSH_IMAGE=1 bash build.sh
PUSH_IMAGE=0 bash -x ./build.sh
[[ "x$skip_security_checks" == "xtrue" ]] || {
curl -L https://github.com/aquasecurity/trivy/releases/download/v0.34.0/trivy_0.34.0_Linux-64bit.tar.gz | tar -xzf - -C ./
images=("peers")
for image in ${images[*]};do
./trivy image --exit-code 1 --security-checks vuln --vuln-type os,library --severity "HIGH,CRITICAL" --ignore-unfixed $DOCKER_REPO/$image:$IMAGE_TAG
done
err_code=$?
[[ $err_code -ne 0 ]] && {
exit $err_code
}
} && {
echo "Skipping Security Checks"
}
images=("peers")
for image in ${images[*]};do
docker push $DOCKER_REPO/$image:$IMAGE_TAG
done
- name: Creating old image input
run: |
#
# Create yaml with existing image tags
#
kubectl get pods -n app -o jsonpath="{.items[*].spec.containers[*].image}" |\
tr -s '[[:space:]]' '\n' | sort | uniq -c | grep '/foss/' | cut -d '/' -f3 > /tmp/image_tag.txt
echo > /tmp/image_override.yaml
for line in `cat /tmp/image_tag.txt`;
do
image_array=($(echo "$line" | tr ':' '\n'))
cat <<EOF >> /tmp/image_override.yaml
${image_array[0]}:
image:
tag: ${image_array[1]}
EOF
done
- name: Deploy to kubernetes
run: |
cd scripts/helmcharts/
## Update secerts
sed -i "s#openReplayContainerRegistry.*#openReplayContainerRegistry: \"${{ secrets.OSS_REGISTRY_URL }}\"#g" vars.yaml
sed -i "s#minio_access_key.*#minio_access_key: \"${{ secrets.OSS_MINIO_ACCESS_KEY }}\" #g" vars.yaml
sed -i "s#minio_secret_key.*#minio_secret_key: \"${{ secrets.OSS_MINIO_SECRET_KEY }}\" #g" vars.yaml
sed -i "s#domain_name.*#domain_name: \"foss.openreplay.com\" #g" vars.yaml
sed -i "s#kubeconfig.*#kubeconfig_path: ${KUBECONFIG}#g" vars.yaml
sed -i "s/image_tag:.*/image_tag: \"$IMAGE_TAG\"/g" vars.yaml
bash kube-install.sh --app peers
sed -i "s/postgresqlPassword: \"changeMePassword\"/postgresqlPassword: \"${{ secrets.OSS_PG_PASSWORD }}\"/g" vars.yaml
sed -i "s/accessKey: \"changeMeMinioAccessKey\"/accessKey: \"${{ secrets.OSS_MINIO_ACCESS_KEY }}\"/g" vars.yaml
sed -i "s/secretKey: \"changeMeMinioPassword\"/secretKey: \"${{ secrets.OSS_MINIO_SECRET_KEY }}\"/g" vars.yaml
sed -i "s/jwt_secret: \"SetARandomStringHere\"/jwt_secret: \"${{ secrets.OSS_JWT_SECRET }}\"/g" vars.yaml
sed -i "s/domainName: \"\"/domainName: \"${{ secrets.OSS_DOMAIN_NAME }}\"/g" vars.yaml
# Update changed image tag
sed -i "/peers/{n;n;s/.*/ tag: ${IMAGE_TAG}/}" /tmp/image_override.yaml
cat /tmp/image_override.yaml
# Deploy command
mv openreplay/charts/{ingress-nginx,peers,quickwit} /tmp
rm -rf openreplay/charts/*
mv /tmp/{ingress-nginx,peers,quickwit} openreplay/charts/
helm template openreplay -n app openreplay -f vars.yaml -f /tmp/image_override.yaml --set ingress-nginx.enabled=false --set skipMigration=true --no-hooks | kubectl apply -n app -f -
env:
DOCKER_REPO: ${{ secrets.OSS_REGISTRY_URL }}
IMAGE_TAG: ${{ github.ref_name }}_${{ github.sha }}
ENVIRONMENT: staging
- name: Alert slack
if: ${{ failure() }}
uses: rtCamp/action-slack-notify@v2
env:
SLACK_CHANNEL: foss
SLACK_TITLE: "Failed ${{ github.workflow }}"
SLACK_COLOR: ${{ job.status }} # or a specific color like 'good' or '#ff00ff'
SLACK_WEBHOOK: ${{ secrets.SLACK_WEB_HOOK }}
SLACK_USERNAME: "OR Bot"
SLACK_MESSAGE: 'Build failed :bomb:'
# - name: Debug Job
# if: ${{ failure() }}
# uses: mxschmitt/action-tmate@v3
@ -65,4 +137,4 @@ jobs:
# DOCKER_REPO: ${{ secrets.OSS_REGISTRY_URL }}
# IMAGE_TAG: ${{ github.sha }}
# ENVIRONMENT: staging
#

View file

@ -0,0 +1,142 @@
# This action will push the sourcemapreader changes to aws
on:
workflow_dispatch:
inputs:
skip_security_checks:
description: 'Skip Security checks if there is a unfixable vuln or error. Value: true/false'
required: false
default: 'false'
push:
branches:
- dev
- api-*
paths:
- "sourcemap-reader/**"
- "!sourcemap-reader/.gitignore"
- "!sourcemap-reader/*-dev.sh"
name: Build and Deploy sourcemap-reader
jobs:
deploy:
name: Deploy
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
with:
# We need to diff with old commit
# to see which workers got changed.
fetch-depth: 2
- name: Docker login
run: |
docker login ${{ secrets.EE_REGISTRY_URL }} -u ${{ secrets.EE_DOCKER_USERNAME }} -p "${{ secrets.EE_REGISTRY_TOKEN }}"
- uses: azure/k8s-set-context@v1
with:
method: kubeconfig
kubeconfig: ${{ secrets.EE_KUBECONFIG }} # Use content of kubeconfig in secret.
id: setcontext
# Caching docker images
- uses: satackey/action-docker-layer-caching@v0.0.11
# Ignore the failure of a step and avoid terminating the job.
continue-on-error: true
- name: Building and Pushing sourcemaps-reader image
id: build-image
env:
DOCKER_REPO: ${{ secrets.OSS_REGISTRY_URL }}
IMAGE_TAG: ${{ github.ref_name }}_${{ github.sha }}-ee
ENVIRONMENT: staging
run: |
skip_security_checks=${{ github.event.inputs.skip_security_checks }}
cd sourcemap-reader
PUSH_IMAGE=0 bash -x ./build.sh
[[ "x$skip_security_checks" == "xtrue" ]] || {
curl -L https://github.com/aquasecurity/trivy/releases/download/v0.34.0/trivy_0.34.0_Linux-64bit.tar.gz | tar -xzf - -C ./
images=("sourcemaps-reader")
for image in ${images[*]};do
./trivy image --exit-code 1 --security-checks vuln --vuln-type os,library --severity "HIGH,CRITICAL" --ignore-unfixed $DOCKER_REPO/$image:$IMAGE_TAG
done
err_code=$?
[[ $err_code -ne 0 ]] && {
exit $err_code
}
} && {
echo "Skipping Security Checks"
}
images=("sourcemaps-reader")
for image in ${images[*]};do
docker push $DOCKER_REPO/$image:$IMAGE_TAG
done
- name: Creating old image input
run: |
#
# Create yaml with existing image tags
#
kubectl get pods -n app -o jsonpath="{.items[*].spec.containers[*].image}" |\
tr -s '[[:space:]]' '\n' | sort | uniq -c | grep '/foss/' | cut -d '/' -f3 > /tmp/image_tag.txt
echo > /tmp/image_override.yaml
for line in `cat /tmp/image_tag.txt`;
do
image_array=($(echo "$line" | tr ':' '\n'))
cat <<EOF >> /tmp/image_override.yaml
${image_array[0]}:
image:
tag: ${image_array[1]}
EOF
done
- name: Deploy to kubernetes
run: |
cd scripts/helmcharts/
## Update secerts
sed -i "s#openReplayContainerRegistry.*#openReplayContainerRegistry: \"${{ secrets.OSS_REGISTRY_URL }}\"#g" vars.yaml
sed -i "s/postgresqlPassword: \"changeMePassword\"/postgresqlPassword: \"${{ secrets.EE_PG_PASSWORD }}\"/g" vars.yaml
sed -i "s/accessKey: \"changeMeMinioAccessKey\"/accessKey: \"${{ secrets.EE_MINIO_ACCESS_KEY }}\"/g" vars.yaml
sed -i "s/secretKey: \"changeMeMinioPassword\"/secretKey: \"${{ secrets.EE_MINIO_SECRET_KEY }}\"/g" vars.yaml
sed -i "s/jwt_secret: \"SetARandomStringHere\"/jwt_secret: \"${{ secrets.EE_JWT_SECRET }}\"/g" vars.yaml
sed -i "s/domainName: \"\"/domainName: \"${{ secrets.EE_DOMAIN_NAME }}\"/g" vars.yaml
sed -i "s/enterpriseEditionLicense: \"\"/enterpriseEditionLicense: \"${{ secrets.EE_LICENSE_KEY }}\"/g" vars.yaml
# Update changed image tag
sed -i "/sourcemaps-reader/{n;n;s/.*/ tag: ${IMAGE_TAG}/}" /tmp/image_override.yaml
sed -i "s/sourcemaps-reader/sourcemapreader/g" /tmp/image_override.yaml
cat /tmp/image_override.yaml
# Deploy command
mv openreplay/charts/{ingress-nginx,sourcemapreader,quickwit} /tmp
rm -rf openreplay/charts/*
mv /tmp/{ingress-nginx,sourcemapreader,quickwit} openreplay/charts/
helm template openreplay -n app openreplay -f vars.yaml -f /tmp/image_override.yaml --set ingress-nginx.enabled=false --set skipMigration=true --no-hooks | kubectl apply -n app -f -
env:
DOCKER_REPO: ${{ secrets.EE_REGISTRY_URL }}
IMAGE_TAG: ${{ github.ref_name }}_${{ github.sha }}
ENVIRONMENT: staging
- name: Alert slack
if: ${{ failure() }}
uses: rtCamp/action-slack-notify@v2
env:
SLACK_CHANNEL: foss
SLACK_TITLE: "Failed ${{ github.workflow }}"
SLACK_COLOR: ${{ job.status }} # or a specific color like 'good' or '#ff00ff'
SLACK_WEBHOOK: ${{ secrets.SLACK_WEB_HOOK }}
SLACK_USERNAME: "OR Bot"
SLACK_MESSAGE: 'Build failed :bomb:'
# - name: Debug Job
# if: ${{ failure() }}
# uses: mxschmitt/action-tmate@v3
# env:
# DOCKER_REPO: ${{ secrets.EE_REGISTRY_URL }}
# IMAGE_TAG: ${{ github.sha }}
# ENVIRONMENT: staging

View file

@ -1,9 +1,15 @@
# This action will push the chalice changes to aws
# This action will push the sourcemapreader changes to aws
on:
workflow_dispatch:
inputs:
skip_security_checks:
description: 'Skip Security checks if there is a unfixable vuln or error. Value: true/false'
required: false
default: 'false'
push:
branches:
- dev
- api-*
paths:
- "sourcemap-reader/**"
- "!sourcemap-reader/.gitignore"
@ -47,8 +53,26 @@ jobs:
IMAGE_TAG: ${{ github.ref_name }}_${{ github.sha }}
ENVIRONMENT: staging
run: |
skip_security_checks=${{ github.event.inputs.skip_security_checks }}
cd sourcemap-reader
PUSH_IMAGE=1 bash build.sh
PUSH_IMAGE=0 bash -x ./build.sh
[[ "x$skip_security_checks" == "xtrue" ]] || {
curl -L https://github.com/aquasecurity/trivy/releases/download/v0.34.0/trivy_0.34.0_Linux-64bit.tar.gz | tar -xzf - -C ./
images=("sourcemaps-reader")
for image in ${images[*]};do
./trivy image --exit-code 1 --security-checks vuln --vuln-type os,library --severity "HIGH,CRITICAL" --ignore-unfixed $DOCKER_REPO/$image:$IMAGE_TAG
done
err_code=$?
[[ $err_code -ne 0 ]] && {
exit $err_code
}
} && {
echo "Skipping Security Checks"
}
images=("sourcemaps-reader")
for image in ${images[*]};do
docker push $DOCKER_REPO/$image:$IMAGE_TAG
done
- name: Creating old image input
run: |
#
@ -82,19 +106,31 @@ jobs:
sed -i "s/domainName: \"\"/domainName: \"${{ secrets.OSS_DOMAIN_NAME }}\"/g" vars.yaml
# Update changed image tag
sed -i "/chalice/{n;n;s/.*/ tag: ${IMAGE_TAG}/}" /tmp/image_override.yaml
sed -i "/sourcemaps-reader/{n;n;s/.*/ tag: ${IMAGE_TAG}/}" /tmp/image_override.yaml
sed -i "s/sourcemaps-reader/sourcemapreader/g" /tmp/image_override.yaml
cat /tmp/image_override.yaml
# Deploy command
mv openreplay/charts/{ingress-nginx,chalice,quickwit} /tmp
mv openreplay/charts/{ingress-nginx,sourcemapreader,quickwit} /tmp
rm -rf openreplay/charts/*
mv /tmp/{ingress-nginx,chalice,quickwit} openreplay/charts/
mv /tmp/{ingress-nginx,sourcemapreader,quickwit} openreplay/charts/
helm template openreplay -n app openreplay -f vars.yaml -f /tmp/image_override.yaml --set ingress-nginx.enabled=false --set skipMigration=true --no-hooks | kubectl apply -n app -f -
env:
DOCKER_REPO: ${{ secrets.OSS_REGISTRY_URL }}
IMAGE_TAG: ${{ github.ref_name }}_${{ github.sha }}
ENVIRONMENT: staging
- name: Alert slack
if: ${{ failure() }}
uses: rtCamp/action-slack-notify@v2
env:
SLACK_CHANNEL: foss
SLACK_TITLE: "Failed ${{ github.workflow }}"
SLACK_COLOR: ${{ job.status }} # or a specific color like 'good' or '#ff00ff'
SLACK_WEBHOOK: ${{ secrets.SLACK_WEB_HOOK }}
SLACK_USERNAME: "OR Bot"
SLACK_MESSAGE: 'Build failed :bomb:'
# - name: Debug Job
# if: ${{ failure() }}
# uses: mxschmitt/action-tmate@v3
@ -102,4 +138,4 @@ jobs:
# DOCKER_REPO: ${{ secrets.OSS_REGISTRY_URL }}
# IMAGE_TAG: ${{ github.sha }}
# ENVIRONMENT: staging
#

View file

@ -1,4 +1,5 @@
import logging
from contextlib import asynccontextmanager
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from decouple import config
@ -12,9 +13,42 @@ from chalicelib.utils import pg_client
from routers import core, core_dynamic
from routers.crons import core_crons
from routers.crons import core_dynamic_crons
from routers.subs import insights, metrics, v1_api
from routers.subs import insights, metrics, v1_api, health
app = FastAPI(root_path="/api", docs_url=config("docs_url", default=""), redoc_url=config("redoc_url", default=""))
loglevel = config("LOGLEVEL", default=logging.INFO)
print(f">Loglevel set to: {loglevel}")
logging.basicConfig(level=loglevel)
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
logging.info(">>>>> starting up <<<<<")
ap_logger = logging.getLogger('apscheduler')
ap_logger.setLevel(loglevel)
app.schedule = AsyncIOScheduler()
await pg_client.init()
app.schedule.start()
for job in core_crons.cron_jobs + core_dynamic_crons.cron_jobs:
app.schedule.add_job(id=job["func"].__name__, **job)
ap_logger.info(">Scheduled jobs:")
for job in app.schedule.get_jobs():
ap_logger.info({"Name": str(job.id), "Run Frequency": str(job.trigger), "Next Run": str(job.next_run_time)})
# App listening
yield
# Shutdown
logging.info(">>>>> shutting down <<<<<")
app.schedule.shutdown(wait=False)
await pg_client.terminate()
app = FastAPI(root_path="/api", docs_url=config("docs_url", default=""), redoc_url=config("redoc_url", default=""),
lifespan=lifespan)
app.add_middleware(GZipMiddleware, minimum_size=1000)
@ -51,39 +85,13 @@ app.include_router(core_dynamic.app_apikey)
app.include_router(metrics.app)
app.include_router(insights.app)
app.include_router(v1_api.app_apikey)
app.include_router(health.public_app)
app.include_router(health.app)
app.include_router(health.app_apikey)
loglevel = config("LOGLEVEL", default=logging.INFO)
print(f">Loglevel set to: {loglevel}")
logging.basicConfig(level=loglevel)
ap_logger = logging.getLogger('apscheduler')
ap_logger.setLevel(loglevel)
app.schedule = AsyncIOScheduler()
@app.on_event("startup")
async def startup():
logging.info(">>>>> starting up <<<<<")
await pg_client.init()
app.schedule.start()
for job in core_crons.cron_jobs + core_dynamic_crons.cron_jobs:
app.schedule.add_job(id=job["func"].__name__, **job)
ap_logger.info(">Scheduled jobs:")
for job in app.schedule.get_jobs():
ap_logger.info({"Name": str(job.id), "Run Frequency": str(job.trigger), "Next Run": str(job.next_run_time)})
@app.on_event("shutdown")
async def shutdown():
logging.info(">>>>> shutting down <<<<<")
app.schedule.shutdown(wait=False)
await pg_client.terminate()
@app.get('/private/shutdown', tags=["private"])
async def stop_server():
logging.info("Requested shutdown")
await shutdown()
import os, signal
os.kill(1, signal.SIGTERM)
# @app.get('/private/shutdown', tags=["private"])
# async def stop_server():
# logging.info("Requested shutdown")
# await shutdown()
# import os, signal
# os.kill(1, signal.SIGTERM)

View file

@ -1,33 +1,17 @@
import logging
from contextlib import asynccontextmanager
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from decouple import config
from fastapi import FastAPI
from chalicelib.utils import pg_client
from chalicelib.core import alerts_processor
app = FastAPI(root_path="/alerts", docs_url=config("docs_url", default=""), redoc_url=config("redoc_url", default=""))
logging.info("============= ALERTS =============")
from chalicelib.utils import pg_client
@app.get("/")
async def root():
return {"status": "Running"}
app.schedule = AsyncIOScheduler()
loglevel = config("LOGLEVEL", default=logging.INFO)
print(f">Loglevel set to: {loglevel}")
logging.basicConfig(level=loglevel)
ap_logger = logging.getLogger('apscheduler')
ap_logger.setLevel(loglevel)
app.schedule = AsyncIOScheduler()
@app.on_event("startup")
async def startup():
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
logging.info(">>>>> starting up <<<<<")
await pg_client.init()
app.schedule.start()
@ -39,24 +23,44 @@ async def startup():
for job in app.schedule.get_jobs():
ap_logger.info({"Name": str(job.id), "Run Frequency": str(job.trigger), "Next Run": str(job.next_run_time)})
# App listening
yield
@app.on_event("shutdown")
async def shutdown():
# Shutdown
logging.info(">>>>> shutting down <<<<<")
app.schedule.shutdown(wait=False)
await pg_client.terminate()
@app.get('/private/shutdown', tags=["private"])
async def stop_server():
logging.info("Requested shutdown")
await shutdown()
import os, signal
os.kill(1, signal.SIGTERM)
app = FastAPI(root_path="/alerts", docs_url=config("docs_url", default=""), redoc_url=config("redoc_url", default=""),
lifespan=lifespan)
logging.info("============= ALERTS =============")
@app.get("/")
async def root():
return {"status": "Running"}
@app.get("/health")
async def get_health_status():
return {"data": {
"health": True,
"details": {"version": config("version_number", default="unknown")}
}}
app.schedule = AsyncIOScheduler()
loglevel = config("LOGLEVEL", default=logging.INFO)
print(f">Loglevel set to: {loglevel}")
logging.basicConfig(level=loglevel)
ap_logger = logging.getLogger('apscheduler')
ap_logger.setLevel(loglevel)
app.schedule = AsyncIOScheduler()
if config("LOCAL_DEV", default=False, cast=bool):
@app.get('/private/trigger', tags=["private"])
@app.get('/trigger', tags=["private"])
async def trigger_main_cron():
logging.info("Triggering main cron")
alerts_processor.process()

View file

@ -30,7 +30,7 @@ check_prereq() {
[[ $1 == ee ]] && ee=true
[[ $PATCH -eq 1 ]] && {
image_tag="$(grep -ER ^.ppVersion ../scripts/helmcharts/openreplay/charts/$chart | xargs | awk '{print $2}' | awk -F. -v OFS=. '{$NF += 1 ; print}')"
[[ $ee == "true" ]] && {
[[ $ee == "true" ]] && {
image_tag="${image_tag}-ee"
}
}
@ -78,12 +78,6 @@ function build_api(){
check_prereq
build_api $environment
echo buil_complete
#IMAGE_TAG=$IMAGE_TAG PUSH_IMAGE=$PUSH_IMAGE DOCKER_REPO=$DOCKER_REPO SIGN_IMAGE=$SIGN_IMAGE SIGN_KEY=$SIGN_KEY bash build_alerts.sh $1
#
#[[ $environment == "ee" ]] && {
# cp ../ee/api/build_crons.sh .
# IMAGE_TAG=$IMAGE_TAG PUSH_IMAGE=$PUSH_IMAGE DOCKER_REPO=$DOCKER_REPO SIGN_IMAGE=$SIGN_IMAGE SIGN_KEY=$SIGN_KEY bash build_crons.sh $1
# exit_err $?
# rm build_crons.sh
#} || true
[[ $PATCH -eq 1 ]] && update_helm_release chalice
if [[ $PATCH -eq 1 ]]; then
update_helm_release chalice
fi

View file

@ -8,7 +8,7 @@
# Usage: IMAGE_TAG=latest DOCKER_REPO=myDockerHubID bash build.sh <ee>
git_sha=$(git rev-parse --short HEAD)
image_tag=${IMAGE_TAG:-$git_sha}
image_tag=${IMAGE_TAG:-git_sha}
envarg="default-foss"
check_prereq() {
which docker || {
@ -21,7 +21,7 @@ check_prereq() {
[[ $1 == ee ]] && ee=true
[[ $PATCH -eq 1 ]] && {
image_tag="$(grep -ER ^.ppVersion ../scripts/helmcharts/openreplay/charts/$chart | xargs | awk '{print $2}' | awk -F. -v OFS=. '{$NF += 1 ; print}')"
[[ $ee == "true" ]] && {
[[ $ee == "true" ]] && {
image_tag="${image_tag}-ee"
}
}
@ -68,4 +68,6 @@ function build_alerts(){
check_prereq
build_alerts $1
[[ $PATCH -eq 1 ]] && update_helm_release alerts
if [[ $PATCH -eq 1 ]]; then
update_helm_release alerts
fi

View file

@ -116,7 +116,7 @@ def process_notifications(data):
BATCH_SIZE = 200
for t in full.keys():
for i in range(0, len(full[t]), BATCH_SIZE):
notifications_list = full[t][i:i + BATCH_SIZE]
notifications_list = full[t][i:min(i + BATCH_SIZE, len(full[t]))]
if notifications_list is None or len(notifications_list) == 0:
break

View file

@ -4,8 +4,7 @@ from os.path import exists as path_exists, getsize
import jwt
import requests
from decouple import config
from starlette import status
from fastapi import HTTPException
from fastapi import HTTPException, status
import schemas
from chalicelib.core import projects

View file

@ -2,8 +2,7 @@ import json
import requests
from decouple import config
from fastapi import HTTPException
from starlette import status
from fastapi import HTTPException, status
import schemas
from chalicelib.core import webhook

View file

@ -1,9 +1,8 @@
import requests
from decouple import config
from datetime import datetime
from fastapi import HTTPException
from starlette import status
import requests
from decouple import config
from fastapi import HTTPException, status
import schemas
from chalicelib.core import webhook

View file

@ -2,8 +2,7 @@ import json
from typing import Union
from decouple import config
from fastapi import HTTPException
from starlette import status
from fastapi import HTTPException, status
import schemas
from chalicelib.core import sessions, funnels, errors, issues, metrics, click_maps, sessions_mobs

View file

@ -0,0 +1,162 @@
from urllib.parse import urlparse
import redis
import requests
from decouple import config
from chalicelib.utils import pg_client
HEALTH_ENDPOINTS = {
"alerts": "http://alerts-openreplay.app.svc.cluster.local:8888/health",
"assets": "http://assets-openreplay.app.svc.cluster.local:8888/metrics",
"assist": "http://assist-openreplay.app.svc.cluster.local:8888/health",
"chalice": "http://chalice-openreplay.app.svc.cluster.local:8888/metrics",
"db": "http://db-openreplay.app.svc.cluster.local:8888/metrics",
"ender": "http://ender-openreplay.app.svc.cluster.local:8888/metrics",
"heuristics": "http://heuristics-openreplay.app.svc.cluster.local:8888/metrics",
"http": "http://http-openreplay.app.svc.cluster.local:8888/metrics",
"ingress-nginx": "http://ingress-nginx-openreplay.app.svc.cluster.local:8888/metrics",
"integrations": "http://integrations-openreplay.app.svc.cluster.local:8888/metrics",
"peers": "http://peers-openreplay.app.svc.cluster.local:8888/health",
"sink": "http://sink-openreplay.app.svc.cluster.local:8888/metrics",
"sourcemaps-reader": "http://sourcemapreader-openreplay.app.svc.cluster.local:8888/health",
"storage": "http://storage-openreplay.app.svc.cluster.local:8888/metrics",
}
def __check_database_pg():
fail_response = {
"health": False,
"details": {
"errors": ["Postgres health-check failed"]
}
}
with pg_client.PostgresClient() as cur:
try:
cur.execute("SHOW server_version;")
server_version = cur.fetchone()
except Exception as e:
print("!! health failed: postgres not responding")
print(str(e))
return fail_response
try:
cur.execute("SELECT openreplay_version() AS version;")
schema_version = cur.fetchone()
except Exception as e:
print("!! health failed: openreplay_version not defined")
print(str(e))
return fail_response
return {
"health": True,
"details": {
# "version": server_version["server_version"],
# "schema": schema_version["version"]
}
}
def __not_supported():
return {"errors": ["not supported"]}
def __always_healthy():
return {
"health": True,
"details": {}
}
def __check_be_service(service_name):
def fn():
fail_response = {
"health": False,
"details": {
"errors": ["server health-check failed"]
}
}
try:
results = requests.get(HEALTH_ENDPOINTS.get(service_name), timeout=2)
if results.status_code != 200:
print(f"!! issue with the {service_name}-health code:{results.status_code}")
print(results.text)
# fail_response["details"]["errors"].append(results.text)
return fail_response
except requests.exceptions.Timeout:
print(f"!! Timeout getting {service_name}-health")
# fail_response["details"]["errors"].append("timeout")
return fail_response
except Exception as e:
print(f"!! Issue getting {service_name}-health response")
print(str(e))
try:
print(results.text)
# fail_response["details"]["errors"].append(results.text)
except:
print("couldn't get response")
# fail_response["details"]["errors"].append(str(e))
return fail_response
return {
"health": True,
"details": {}
}
return fn
def __check_redis():
fail_response = {
"health": False,
"details": {"errors": ["server health-check failed"]}
}
if config("REDIS_STRING", default=None) is None:
# fail_response["details"]["errors"].append("REDIS_STRING not defined in env-vars")
return fail_response
try:
u = urlparse(config("REDIS_STRING"))
r = redis.Redis(host=u.hostname, port=u.port, socket_timeout=2)
r.ping()
except Exception as e:
print("!! Issue getting redis-health response")
print(str(e))
# fail_response["details"]["errors"].append(str(e))
return fail_response
return {
"health": True,
"details": {
# "version": r.execute_command('INFO')['redis_version']
}
}
def get_health():
health_map = {
"databases": {
"postgres": __check_database_pg
},
"ingestionPipeline": {
"redis": __check_redis
},
"backendServices": {
"alerts": __check_be_service("alerts"),
"assets": __check_be_service("assets"),
"assist": __check_be_service("assist"),
"chalice": __always_healthy,
"db": __check_be_service("db"),
"ender": __check_be_service("ender"),
"frontend": __always_healthy,
"heuristics": __check_be_service("heuristics"),
"http": __check_be_service("http"),
"ingress-nginx": __always_healthy,
"integrations": __check_be_service("integrations"),
"peers": __check_be_service("peers"),
"sink": __check_be_service("sink"),
"sourcemaps-reader": __check_be_service("sourcemaps-reader"),
"storage": __check_be_service("storage")
}
}
for parent_key in health_map.keys():
for element_key in health_map[parent_key]:
health_map[parent_key][element_key] = health_map[parent_key][element_key]()
return health_map

View file

@ -1,8 +1,7 @@
import re
from typing import Optional
from fastapi import HTTPException
from starlette import status
from fastapi import HTTPException, status
from chalicelib.core import projects
from chalicelib.utils import pg_client

View file

@ -1,8 +1,7 @@
import json
from typing import Optional
from fastapi import HTTPException
from starlette import status
from fastapi import HTTPException, status
import schemas
from chalicelib.core import users
@ -54,6 +53,7 @@ def __create(tenant_id, name):
def get_projects(tenant_id, recording_state=False, gdpr=None, recorded=False, stack_integrations=False):
stack_integrations = False
with pg_client.PostgresClient() as cur:
extra_projection = ""
extra_join = ""

View file

@ -1,10 +1,7 @@
from typing import List
import schemas
from chalicelib.core import events, metadata, events_ios, \
sessions_mobs, issues, projects, resources, assist, performance_event, sessions_favorite, \
sessions_devtool, sessions_notes
from chalicelib.utils import errors_helper
from chalicelib.core import events, metadata, projects, performance_event, sessions_favorite
from chalicelib.utils import pg_client, helper, metrics_helper
from chalicelib.utils import sql_helper as sh
@ -33,89 +30,6 @@ COALESCE((SELECT TRUE
AND fs.user_id = %(userId)s LIMIT 1), FALSE) AS viewed """
def __group_metadata(session, project_metadata):
meta = {}
for m in project_metadata.keys():
if project_metadata[m] is not None and session.get(m) is not None:
meta[project_metadata[m]] = session[m]
session.pop(m)
return meta
def get_by_id2_pg(project_id, session_id, context: schemas.CurrentContext, full_data=False, include_fav_viewed=False,
group_metadata=False, live=True):
with pg_client.PostgresClient() as cur:
extra_query = []
if include_fav_viewed:
extra_query.append("""COALESCE((SELECT TRUE
FROM public.user_favorite_sessions AS fs
WHERE s.session_id = fs.session_id
AND fs.user_id = %(userId)s), FALSE) AS favorite""")
extra_query.append("""COALESCE((SELECT TRUE
FROM public.user_viewed_sessions AS fs
WHERE s.session_id = fs.session_id
AND fs.user_id = %(userId)s), FALSE) AS viewed""")
query = cur.mogrify(
f"""\
SELECT
s.*,
s.session_id::text AS session_id,
(SELECT project_key FROM public.projects WHERE project_id = %(project_id)s LIMIT 1) AS project_key
{"," if len(extra_query) > 0 else ""}{",".join(extra_query)}
{(",json_build_object(" + ",".join([f"'{m}',p.{m}" for m in metadata.column_names()]) + ") AS project_metadata") if group_metadata else ''}
FROM public.sessions AS s {"INNER JOIN public.projects AS p USING (project_id)" if group_metadata else ""}
WHERE s.project_id = %(project_id)s
AND s.session_id = %(session_id)s;""",
{"project_id": project_id, "session_id": session_id, "userId": context.user_id}
)
# print("===============")
# print(query)
cur.execute(query=query)
data = cur.fetchone()
if data is not None:
data = helper.dict_to_camel_case(data)
if full_data:
if data["platform"] == 'ios':
data['events'] = events_ios.get_by_sessionId(project_id=project_id, session_id=session_id)
for e in data['events']:
if e["type"].endswith("_IOS"):
e["type"] = e["type"][:-len("_IOS")]
data['crashes'] = events_ios.get_crashes_by_session_id(session_id=session_id)
data['userEvents'] = events_ios.get_customs_by_sessionId(project_id=project_id,
session_id=session_id)
data['mobsUrl'] = sessions_mobs.get_ios(session_id=session_id)
else:
data['events'] = events.get_by_session_id(project_id=project_id, session_id=session_id,
group_clickrage=True)
all_errors = events.get_errors_by_session_id(session_id=session_id, project_id=project_id)
data['stackEvents'] = [e for e in all_errors if e['source'] != "js_exception"]
# to keep only the first stack
# limit the number of errors to reduce the response-body size
data['errors'] = [errors_helper.format_first_stack_frame(e) for e in all_errors
if e['source'] == "js_exception"][:500]
data['userEvents'] = events.get_customs_by_session_id(project_id=project_id,
session_id=session_id)
data['domURL'] = sessions_mobs.get_urls(session_id=session_id, project_id=project_id)
data['mobsUrl'] = sessions_mobs.get_urls_depercated(session_id=session_id)
data['devtoolsURL'] = sessions_devtool.get_urls(session_id=session_id, project_id=project_id)
data['resources'] = resources.get_by_session_id(session_id=session_id, project_id=project_id,
start_ts=data["startTs"], duration=data["duration"])
data['notes'] = sessions_notes.get_session_notes(tenant_id=context.tenant_id, project_id=project_id,
session_id=session_id, user_id=context.user_id)
data['metadata'] = __group_metadata(project_metadata=data.pop("projectMetadata"), session=data)
data['issues'] = issues.get_by_session_id(session_id=session_id, project_id=project_id)
data['live'] = live and assist.is_live(project_id=project_id, session_id=session_id,
project_key=data["projectKey"])
data["inDB"] = True
return data
elif live:
return assist.get_live_session_by_id(project_id=project_id, session_id=session_id)
else:
return None
# This function executes the query and return result
def search_sessions(data: schemas.SessionsSearchPayloadSchema, project_id, user_id, errors_only=False,
error_status=schemas.ErrorStatus.all, count_only=False, issue=None, ids_only=False):

View file

@ -1,5 +1,4 @@
import schemas
from chalicelib.core import sessions
from chalicelib.utils import pg_client
@ -8,11 +7,14 @@ def add_favorite_session(context: schemas.CurrentContext, project_id, session_id
cur.execute(
cur.mogrify(f"""\
INSERT INTO public.user_favorite_sessions(user_id, session_id)
VALUES (%(userId)s,%(session_id)s);""",
VALUES (%(userId)s,%(session_id)s)
RETURNING session_id;""",
{"userId": context.user_id, "session_id": session_id})
)
return sessions.get_by_id2_pg(context=context, project_id=project_id, session_id=session_id,
full_data=False, include_fav_viewed=True)
row = cur.fetchone()
if row:
return {"data": {"sessionId": session_id}}
return {"errors": ["something went wrong"]}
def remove_favorite_session(context: schemas.CurrentContext, project_id, session_id):
@ -21,11 +23,14 @@ def remove_favorite_session(context: schemas.CurrentContext, project_id, session
cur.mogrify(f"""\
DELETE FROM public.user_favorite_sessions
WHERE user_id = %(userId)s
AND session_id = %(session_id)s;""",
AND session_id = %(session_id)s
RETURNING session_id;""",
{"userId": context.user_id, "session_id": session_id})
)
return sessions.get_by_id2_pg(context=context, project_id=project_id, session_id=session_id,
full_data=False, include_fav_viewed=True)
row = cur.fetchone()
if row:
return {"data": {"sessionId": session_id}}
return {"errors": ["something went wrong"]}
def favorite_session(context: schemas.CurrentContext, project_id, session_id):

View file

@ -0,0 +1,186 @@
import schemas
from chalicelib.core import events, metadata, events_ios, \
sessions_mobs, issues, resources, assist, sessions_devtool, sessions_notes
from chalicelib.utils import errors_helper
from chalicelib.utils import pg_client, helper
def __group_metadata(session, project_metadata):
meta = {}
for m in project_metadata.keys():
if project_metadata[m] is not None and session.get(m) is not None:
meta[project_metadata[m]] = session[m]
session.pop(m)
return meta
# for backward compatibility
def get_by_id2_pg(project_id, session_id, context: schemas.CurrentContext, full_data=False, include_fav_viewed=False,
group_metadata=False, live=True):
with pg_client.PostgresClient() as cur:
extra_query = []
if include_fav_viewed:
extra_query.append("""COALESCE((SELECT TRUE
FROM public.user_favorite_sessions AS fs
WHERE s.session_id = fs.session_id
AND fs.user_id = %(userId)s), FALSE) AS favorite""")
extra_query.append("""COALESCE((SELECT TRUE
FROM public.user_viewed_sessions AS fs
WHERE s.session_id = fs.session_id
AND fs.user_id = %(userId)s), FALSE) AS viewed""")
query = cur.mogrify(
f"""\
SELECT
s.*,
s.session_id::text AS session_id,
(SELECT project_key FROM public.projects WHERE project_id = %(project_id)s LIMIT 1) AS project_key
{"," if len(extra_query) > 0 else ""}{",".join(extra_query)}
{(",json_build_object(" + ",".join([f"'{m}',p.{m}" for m in metadata.column_names()]) + ") AS project_metadata") if group_metadata else ''}
FROM public.sessions AS s {"INNER JOIN public.projects AS p USING (project_id)" if group_metadata else ""}
WHERE s.project_id = %(project_id)s
AND s.session_id = %(session_id)s;""",
{"project_id": project_id, "session_id": session_id, "userId": context.user_id}
)
# print("===============")
# print(query)
cur.execute(query=query)
data = cur.fetchone()
if data is not None:
data = helper.dict_to_camel_case(data)
if full_data:
if data["platform"] == 'ios':
data['events'] = events_ios.get_by_sessionId(project_id=project_id, session_id=session_id)
for e in data['events']:
if e["type"].endswith("_IOS"):
e["type"] = e["type"][:-len("_IOS")]
data['crashes'] = events_ios.get_crashes_by_session_id(session_id=session_id)
data['userEvents'] = events_ios.get_customs_by_sessionId(project_id=project_id,
session_id=session_id)
data['mobsUrl'] = sessions_mobs.get_ios(session_id=session_id)
else:
data['events'] = events.get_by_session_id(project_id=project_id, session_id=session_id,
group_clickrage=True)
all_errors = events.get_errors_by_session_id(session_id=session_id, project_id=project_id)
data['stackEvents'] = [e for e in all_errors if e['source'] != "js_exception"]
# to keep only the first stack
# limit the number of errors to reduce the response-body size
data['errors'] = [errors_helper.format_first_stack_frame(e) for e in all_errors
if e['source'] == "js_exception"][:500]
data['userEvents'] = events.get_customs_by_session_id(project_id=project_id,
session_id=session_id)
data['domURL'] = sessions_mobs.get_urls(session_id=session_id, project_id=project_id)
data['mobsUrl'] = sessions_mobs.get_urls_depercated(session_id=session_id)
data['devtoolsURL'] = sessions_devtool.get_urls(session_id=session_id, project_id=project_id)
data['resources'] = resources.get_by_session_id(session_id=session_id, project_id=project_id,
start_ts=data["startTs"], duration=data["duration"])
data['notes'] = sessions_notes.get_session_notes(tenant_id=context.tenant_id, project_id=project_id,
session_id=session_id, user_id=context.user_id)
data['metadata'] = __group_metadata(project_metadata=data.pop("projectMetadata"), session=data)
data['issues'] = issues.get_by_session_id(session_id=session_id, project_id=project_id)
data['live'] = live and assist.is_live(project_id=project_id, session_id=session_id,
project_key=data["projectKey"])
data["inDB"] = True
return data
elif live:
return assist.get_live_session_by_id(project_id=project_id, session_id=session_id)
else:
return None
def get_replay(project_id, session_id, context: schemas.CurrentContext, full_data=False, include_fav_viewed=False,
group_metadata=False, live=True):
with pg_client.PostgresClient() as cur:
extra_query = []
if include_fav_viewed:
extra_query.append("""COALESCE((SELECT TRUE
FROM public.user_favorite_sessions AS fs
WHERE s.session_id = fs.session_id
AND fs.user_id = %(userId)s), FALSE) AS favorite""")
extra_query.append("""COALESCE((SELECT TRUE
FROM public.user_viewed_sessions AS fs
WHERE s.session_id = fs.session_id
AND fs.user_id = %(userId)s), FALSE) AS viewed""")
query = cur.mogrify(
f"""\
SELECT
s.*,
s.session_id::text AS session_id,
(SELECT project_key FROM public.projects WHERE project_id = %(project_id)s LIMIT 1) AS project_key
{"," if len(extra_query) > 0 else ""}{",".join(extra_query)}
{(",json_build_object(" + ",".join([f"'{m}',p.{m}" for m in metadata.column_names()]) + ") AS project_metadata") if group_metadata else ''}
FROM public.sessions AS s {"INNER JOIN public.projects AS p USING (project_id)" if group_metadata else ""}
WHERE s.project_id = %(project_id)s
AND s.session_id = %(session_id)s;""",
{"project_id": project_id, "session_id": session_id, "userId": context.user_id}
)
# print("===============")
# print(query)
cur.execute(query=query)
data = cur.fetchone()
if data is not None:
data = helper.dict_to_camel_case(data)
if full_data:
if data["platform"] == 'ios':
data['mobsUrl'] = sessions_mobs.get_ios(session_id=session_id)
else:
data['domURL'] = sessions_mobs.get_urls(session_id=session_id, project_id=project_id)
data['mobsUrl'] = sessions_mobs.get_urls_depercated(session_id=session_id)
data['devtoolsURL'] = sessions_devtool.get_urls(session_id=session_id, project_id=project_id)
data['metadata'] = __group_metadata(project_metadata=data.pop("projectMetadata"), session=data)
data['live'] = live and assist.is_live(project_id=project_id, session_id=session_id,
project_key=data["projectKey"])
data["inDB"] = True
return data
elif live:
return assist.get_live_session_by_id(project_id=project_id, session_id=session_id)
else:
return None
def get_events(project_id, session_id):
with pg_client.PostgresClient() as cur:
query = cur.mogrify(
f"""SELECT session_id, platform, start_ts, duration
FROM public.sessions AS s
WHERE s.project_id = %(project_id)s
AND s.session_id = %(session_id)s;""",
{"project_id": project_id, "session_id": session_id}
)
# print("===============")
# print(query)
cur.execute(query=query)
s_data = cur.fetchone()
if s_data is not None:
s_data = helper.dict_to_camel_case(s_data)
data = {}
if s_data["platform"] == 'ios':
data['events'] = events_ios.get_by_sessionId(project_id=project_id, session_id=session_id)
for e in data['events']:
if e["type"].endswith("_IOS"):
e["type"] = e["type"][:-len("_IOS")]
data['crashes'] = events_ios.get_crashes_by_session_id(session_id=session_id)
data['userEvents'] = events_ios.get_customs_by_sessionId(project_id=project_id,
session_id=session_id)
else:
data['events'] = events.get_by_session_id(project_id=project_id, session_id=session_id,
group_clickrage=True)
all_errors = events.get_errors_by_session_id(session_id=session_id, project_id=project_id)
data['stackEvents'] = [e for e in all_errors if e['source'] != "js_exception"]
# to keep only the first stack
# limit the number of errors to reduce the response-body size
data['errors'] = [errors_helper.format_first_stack_frame(e) for e in all_errors
if e['source'] == "js_exception"][:500]
data['userEvents'] = events.get_customs_by_session_id(project_id=project_id,
session_id=session_id)
data['resources'] = resources.get_by_session_id(session_id=session_id, project_id=project_id,
start_ts=s_data["startTs"], duration=s_data["duration"])
data['issues'] = issues.get_by_session_id(session_id=session_id, project_id=project_id)
return data
else:
return None

View file

@ -8,7 +8,7 @@ from chalicelib.utils import pg_client
from chalicelib.utils.TimeUTC import TimeUTC
def create_step1(data: schemas.UserSignupSchema):
def create_tenant(data: schemas.UserSignupSchema):
print(f"===================== SIGNUP STEP 1 AT {TimeUTC.to_human_readable(TimeUTC.now())} UTC")
errors = []
if tenants.tenants_exists():

View file

@ -68,7 +68,7 @@ def update(tenant_id, user_id, data: schemas.UpdateTenantSchema):
return edit_client(tenant_id=tenant_id, changes=changes)
def tenants_exists():
with pg_client.PostgresClient() as cur:
def tenants_exists(use_pool=True):
with pg_client.PostgresClient(use_pool=use_pool) as cur:
cur.execute(f"SELECT EXISTS(SELECT 1 FROM public.tenants)")
return cur.fetchone()["exists"]

View file

@ -2,8 +2,7 @@ import logging
from typing import Optional
import requests
from fastapi import HTTPException
from starlette import status
from fastapi import HTTPException, status
import schemas
from chalicelib.utils import pg_client, helper

View file

@ -1,8 +1,7 @@
import requests
from datetime import datetime
from fastapi import HTTPException
from starlette import status
import requests
from fastapi import HTTPException, status
class github_formatters:

View file

@ -2,11 +2,10 @@ import time
from datetime import datetime
import requests
from fastapi import HTTPException, status
from jira import JIRA
from jira.exceptions import JIRAError
from requests.auth import HTTPBasicAuth
from starlette import status
from fastapi import HTTPException
fields = "id, summary, description, creator, reporter, created, assignee, status, updated, comment, issuetype, labels"

View file

@ -87,9 +87,10 @@ class PostgresClient:
long_query = False
unlimited_query = False
def __init__(self, long_query=False, unlimited_query=False):
def __init__(self, long_query=False, unlimited_query=False, use_pool=True):
self.long_query = long_query
self.unlimited_query = unlimited_query
self.use_pool = use_pool
if unlimited_query:
long_config = dict(_PG_CONFIG)
long_config["application_name"] += "-UNLIMITED"
@ -100,7 +101,7 @@ class PostgresClient:
long_config["options"] = f"-c statement_timeout=" \
f"{config('pg_long_timeout', cast=int, default=5 * 60) * 1000}"
self.connection = psycopg2.connect(**long_config)
elif not config('PG_POOL', cast=bool, default=True):
elif not use_pool or not config('PG_POOL', cast=bool, default=True):
single_config = dict(_PG_CONFIG)
single_config["application_name"] += "-NOPOOL"
single_config["options"] = f"-c statement_timeout={config('PG_TIMEOUT', cast=int, default=30) * 1000}"
@ -111,6 +112,8 @@ class PostgresClient:
def __enter__(self):
if self.cursor is None:
self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
self.cursor.cursor_execute = self.cursor.execute
self.cursor.execute = self.__execute
self.cursor.recreate = self.recreate_cursor
return self.cursor
@ -118,11 +121,12 @@ class PostgresClient:
try:
self.connection.commit()
self.cursor.close()
if self.long_query or self.unlimited_query:
if not self.use_pool or self.long_query or self.unlimited_query:
self.connection.close()
except Exception as error:
logging.error("Error while committing/closing PG-connection", error)
if str(error) == "connection already closed" \
and self.use_pool \
and not self.long_query \
and not self.unlimited_query \
and config('PG_POOL', cast=bool, default=True):
@ -132,10 +136,22 @@ class PostgresClient:
raise error
finally:
if config('PG_POOL', cast=bool, default=True) \
and self.use_pool \
and not self.long_query \
and not self.unlimited_query:
postgreSQL_pool.putconn(self.connection)
def __execute(self, query, vars=None):
try:
result = self.cursor.cursor_execute(query=query, vars=vars)
except psycopg2.Error as error:
logging.error(f"!!! Error of type:{type(error)} while executing query:")
logging.error(query)
logging.info("starting rollback to allow future execution")
self.connection.rollback()
raise error
return result
def recreate_cursor(self, rollback=False):
if rollback:
try:

View file

@ -16,7 +16,8 @@ else:
aws_access_key_id=config("S3_KEY"),
aws_secret_access_key=config("S3_SECRET"),
config=Config(signature_version='s3v4'),
region_name=config("sessions_region"))
region_name=config("sessions_region"),
verify=not config("S3_DISABLE_SSL_VERIFY", default=False, cast=bool))
def __get_s3_resource():
@ -26,7 +27,8 @@ def __get_s3_resource():
aws_access_key_id=config("S3_KEY"),
aws_secret_access_key=config("S3_SECRET"),
config=Config(signature_version='s3v4'),
region_name=config("sessions_region"))
region_name=config("sessions_region"),
verify=not config("S3_DISABLE_SSL_VERIFY", default=False, cast=bool))
def exists(bucket, key):
@ -81,7 +83,8 @@ def get_presigned_url_for_upload_secure(bucket, expires_in, key, conditions=None
Conditions=conditions,
)
req = PreparedRequest()
req.prepare_url(f"{url_parts['url']}/{url_parts['fields']['key']}", url_parts['fields'])
req.prepare_url(
f"{url_parts['url']}/{url_parts['fields']['key']}", url_parts['fields'])
return req.url
@ -101,7 +104,8 @@ def get_file(source_bucket, source_key):
def rename(source_bucket, source_key, target_bucket, target_key):
s3 = __get_s3_resource()
s3.Object(target_bucket, target_key).copy_from(CopySource=f'{source_bucket}/{source_key}')
s3.Object(target_bucket, target_key).copy_from(
CopySource=f'{source_bucket}/{source_key}')
s3.Object(source_bucket, source_key).delete()

View file

@ -1,3 +1,3 @@
#!/bin/sh
uvicorn app:app --host 0.0.0.0 --port $LISTEN_PORT --reload --proxy-headers
uvicorn app:app --host 0.0.0.0 --port $LISTEN_PORT --proxy-headers

View file

@ -1,3 +1,3 @@
#!/bin/sh
export ASSIST_KEY=ignore
uvicorn app:app --host 0.0.0.0 --port $LISTEN_PORT --reload
uvicorn app:app --host 0.0.0.0 --port 8888

View file

@ -10,6 +10,7 @@ EMAIL_USE_TLS=true
S3_HOST=
S3_KEY=
S3_SECRET=
S3_DISABLE_SSL_VERIFY=
SITE_URL=
announcement_url=
captcha_key=
@ -39,7 +40,7 @@ PG_POOL=true
sessions_bucket=mobs
sessions_region=us-east-1
sourcemaps_bucket=sourcemaps
sourcemaps_reader=http://sourcemaps-reader-openreplay.app.svc.cluster.local:9000/sourcemaps/%s/sourcemaps
sourcemaps_reader=http://sourcemapreader-openreplay.app.svc.cluster.local:9000/sourcemaps/%s/sourcemaps
STAGE=default-foss
version_number=1.4.0
FS_DIR=/mnt/efs
@ -52,4 +53,4 @@ PRESIGNED_URL_EXPIRATION=3600
ASSIST_JWT_EXPIRATION=144000
ASSIST_JWT_SECRET=
PYTHONUNBUFFERED=1
THUMBNAILS_BUCKET=thumbnails
REDIS_STRING=redis://redis-master.db.svc.cluster.local:6379

View file

@ -1,15 +1,15 @@
requests==2.28.2
urllib3==1.26.14
boto3==1.26.70
urllib3==1.26.15
boto3==1.26.100
pyjwt==2.6.0
psycopg2-binary==2.9.5
elasticsearch==8.6.1
jira==3.4.1
elasticsearch==8.6.2
jira==3.5.0
fastapi==0.92.0
uvicorn[standard]==0.20.0
python-decouple==3.7
pydantic[email]==1.10.4
apscheduler==3.10.0
fastapi==0.95.0
uvicorn[standard]==0.21.1
python-decouple==3.8
pydantic[email]==1.10.7
apscheduler==3.10.1

View file

@ -1,15 +1,17 @@
requests==2.28.2
urllib3==1.26.14
boto3==1.26.70
urllib3==1.26.15
boto3==1.26.100
pyjwt==2.6.0
psycopg2-binary==2.9.5
elasticsearch==8.6.1
jira==3.4.1
elasticsearch==8.6.2
jira==3.5.0
fastapi==0.92.0
uvicorn[standard]==0.20.0
python-decouple==3.7
pydantic[email]==1.10.4
apscheduler==3.10.0
fastapi==0.95.0
uvicorn[standard]==0.21.1
python-decouple==3.8
pydantic[email]==1.10.7
apscheduler==3.10.1
redis==4.5.3

View file

@ -1,9 +1,8 @@
from typing import Union
from decouple import config
from fastapi import Depends, Body, HTTPException, Response
from fastapi import Depends, Body, HTTPException, Response, status
from fastapi.responses import JSONResponse
from starlette import status
import schemas
from chalicelib.core import log_tool_rollbar, sourcemaps, events, sessions_assignments, projects, \
@ -11,7 +10,7 @@ from chalicelib.core import log_tool_rollbar, sourcemaps, events, sessions_assig
log_tool_elasticsearch, log_tool_datadog, \
log_tool_stackdriver, reset_password, log_tool_cloudwatch, log_tool_sentry, log_tool_sumologic, log_tools, sessions, \
log_tool_newrelic, announcements, log_tool_bugsnag, weekly_report, integration_jira_cloud, integration_github, \
assist, mobile, signup, tenants, boarding, notifications, webhook, users, \
assist, mobile, tenants, boarding, notifications, webhook, users, \
custom_metrics, saved_search, integrations_global
from chalicelib.core.collaboration_msteams import MSTeams
from chalicelib.core.collaboration_slack import Slack
@ -663,12 +662,6 @@ async def mobile_signe(projectId: int, sessionId: int, data: schemas.MobileSignP
return {"data": mobile.sign_keys(project_id=projectId, session_id=sessionId, keys=data.keys)}
@public_app.post('/signup', tags=['signup'])
@public_app.put('/signup', tags=['signup'])
async def signup_handler(data: schemas.UserSignupSchema = Body(...)):
return signup.create_step1(data)
@app.post('/projects', tags=['projects'])
async def create_project(data: schemas.CreateProjectSchema = Body(...),
context: schemas.CurrentContext = Depends(OR_context)):

View file

@ -6,7 +6,7 @@ from starlette.responses import RedirectResponse, FileResponse
import schemas
from chalicelib.core import sessions, errors, errors_viewed, errors_favorite, sessions_assignments, heatmaps, \
sessions_favorite, assist, sessions_notes, click_maps
sessions_favorite, assist, sessions_notes, click_maps, sessions_replay, signup
from chalicelib.core import sessions_viewed
from chalicelib.core import tenants, users, projects, license
from chalicelib.core import webhook
@ -27,6 +27,13 @@ async def get_all_signup():
"edition": license.EDITION}}
if not tenants.tenants_exists(use_pool=False):
@public_app.post('/signup', tags=['signup'])
@public_app.put('/signup', tags=['signup'])
async def signup_handler(data: schemas.UserSignupSchema = Body(...)):
return signup.create_tenant(data)
@app.get('/account', tags=['accounts'])
async def get_account(context: schemas.CurrentContext = Depends(OR_context)):
r = users.get(tenant_id=context.tenant_id, user_id=context.user_id)
@ -145,13 +152,14 @@ async def get_projects(context: schemas.CurrentContext = Depends(OR_context)):
stack_integrations=True)}
@app.get('/{projectId}/sessions/{sessionId}', tags=["sessions"])
# for backward compatibility
@app.get('/{projectId}/sessions/{sessionId}', tags=["sessions", "replay"])
async def get_session(projectId: int, sessionId: Union[int, str], background_tasks: BackgroundTasks,
context: schemas.CurrentContext = Depends(OR_context)):
if isinstance(sessionId, str):
return {"errors": ["session not found"]}
data = sessions.get_by_id2_pg(project_id=projectId, session_id=sessionId, full_data=True,
include_fav_viewed=True, group_metadata=True, context=context)
data = sessions_replay.get_by_id2_pg(project_id=projectId, session_id=sessionId, full_data=True,
include_fav_viewed=True, group_metadata=True, context=context)
if data is None:
return {"errors": ["session not found"]}
if data.get("inDB"):
@ -162,6 +170,37 @@ async def get_session(projectId: int, sessionId: Union[int, str], background_tas
}
@app.get('/{projectId}/sessions/{sessionId}/replay', tags=["sessions", "replay"])
async def get_session_events(projectId: int, sessionId: Union[int, str], background_tasks: BackgroundTasks,
context: schemas.CurrentContext = Depends(OR_context)):
if isinstance(sessionId, str):
return {"errors": ["session not found"]}
data = sessions_replay.get_replay(project_id=projectId, session_id=sessionId, full_data=True,
include_fav_viewed=True, group_metadata=True, context=context)
if data is None:
return {"errors": ["session not found"]}
if data.get("inDB"):
background_tasks.add_task(sessions_viewed.view_session, project_id=projectId, user_id=context.user_id,
session_id=sessionId)
return {
'data': data
}
@app.get('/{projectId}/sessions/{sessionId}/events', tags=["sessions", "replay"])
async def get_session_events(projectId: int, sessionId: Union[int, str],
context: schemas.CurrentContext = Depends(OR_context)):
if isinstance(sessionId, str):
return {"errors": ["session not found"]}
data = sessions_replay.get_events(project_id=projectId, session_id=sessionId)
if data is None:
return {"errors": ["session not found"]}
return {
'data': data
}
@app.get('/{projectId}/sessions/{sessionId}/errors/{errorId}/sourcemaps', tags=["sessions", "sourcemaps"])
async def get_error_trace(projectId: int, sessionId: int, errorId: str,
context: schemas.CurrentContext = Depends(OR_context)):
@ -239,8 +278,8 @@ async def get_live_session(projectId: int, sessionId: str, background_tasks: Bac
context: schemas.CurrentContext = Depends(OR_context)):
data = assist.get_live_session_by_id(project_id=projectId, session_id=sessionId)
if data is None:
data = sessions.get_by_id2_pg(context=context, project_id=projectId, session_id=sessionId,
full_data=True, include_fav_viewed=True, group_metadata=True, live=False)
data = sessions_replay.get_replay(context=context, project_id=projectId, session_id=sessionId,
full_data=True, include_fav_viewed=True, group_metadata=True, live=False)
if data is None:
return {"errors": ["session not found"]}
if data.get("inDB"):

View file

@ -0,0 +1,20 @@
from fastapi import HTTPException, status
from chalicelib.core import health, tenants
from routers.base import get_routers
public_app, app, app_apikey = get_routers()
@app.get('/health', tags=["health-check"])
def get_global_health_status():
return {"data": health.get_health()}
if not tenants.tenants_exists(use_pool=False):
@public_app.get('/health', tags=["health-check"])
def get_public_health_status():
if tenants.tenants_exists():
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Not Found")
return get_global_health_status()

View file

@ -1,3 +1,3 @@
#!/bin/zsh
uvicorn app_alerts:app --reload
uvicorn app_alerts:app --reload --port 8888

View file

@ -500,6 +500,7 @@ class IssueType(str, Enum):
crash = 'crash'
custom = 'custom'
js_exception = 'js_exception'
mouse_thrashing = 'mouse_thrashing'
class MetricFormatType(str, Enum):

View file

@ -18,4 +18,4 @@ USER 1001
ADD --chown=1001 https://static.openreplay.com/geoip/GeoLite2-Country.mmdb $MAXMINDDB_FILE
ENTRYPOINT ["/sbin/tini", "--"]
CMD npm start
CMD npm start

View file

@ -18,7 +18,7 @@ check_prereq() {
[[ $1 == ee ]] && ee=true
[[ $PATCH -eq 1 ]] && {
image_tag="$(grep -ER ^.ppVersion ../scripts/helmcharts/openreplay/charts/$chart | xargs | awk '{print $2}' | awk -F. -v OFS=. '{$NF += 1 ; print}')"
[[ $ee == "true" ]] && {
[[ $ee == "true" ]] && {
image_tag="${image_tag}-ee"
}
}
@ -35,20 +35,20 @@ update_helm_release() {
}
function build_api(){
destination="_utilities"
destination="_assist"
[[ $1 == "ee" ]] && {
destination="_utilities_ee"
destination="_assist_ee"
}
cp -R ../utilities ../${destination}
cp -R ../assist ../${destination}
cd ../${destination}
# Copy enterprise code
[[ $1 == "ee" ]] && {
cp -rf ../ee/utilities/* ./
cp -rf ../ee/assist/* ./
}
docker build -f ./Dockerfile --build-arg GIT_SHA=$git_sha -t ${DOCKER_REPO:-'local'}/assist:${image_tag} .
cd ../utilities
cd ../assist
rm -rf ../${destination}
[[ $PUSH_IMAGE -eq 1 ]] && {
docker push ${DOCKER_REPO:-'local'}/assist:${image_tag}
@ -63,4 +63,6 @@ function build_api(){
check_prereq
build_api $1
[[ $PATCH -eq 1 ]] && update_helm_release assist
if [[ $PATCH -eq 1 ]]; then
update_helm_release assist
fi

View file

@ -45,9 +45,9 @@
}
},
"node_modules/@types/node": {
"version": "18.14.1",
"resolved": "https://registry.npmjs.org/@types/node/-/node-18.14.1.tgz",
"integrity": "sha512-QH+37Qds3E0eDlReeboBxfHbX9omAcBCXEzswCu6jySP642jiM3cYSIkU/REqwhCUqXdonHFuBfJDiAJxMNhaQ=="
"version": "18.14.6",
"resolved": "https://registry.npmjs.org/@types/node/-/node-18.14.6.tgz",
"integrity": "sha512-93+VvleD3mXwlLI/xASjw0FzKcwzl3OdTCzm1LaRfqgS21gfFtK3zDXM5Op9TeeMsJVOaJ2VRDpT9q4Y3d0AvA=="
},
"node_modules/accepts": {
"version": "1.3.8",
@ -987,9 +987,9 @@
}
},
"node_modules/ua-parser-js": {
"version": "1.0.33",
"resolved": "https://registry.npmjs.org/ua-parser-js/-/ua-parser-js-1.0.33.tgz",
"integrity": "sha512-RqshF7TPTE0XLYAqmjlu5cLLuGdKrNu9O1KLA/qp39QtbZwuzwv1dT46DZSopoUMsYgXpB3Cv8a03FI8b74oFQ==",
"version": "1.0.34",
"resolved": "https://registry.npmjs.org/ua-parser-js/-/ua-parser-js-1.0.34.tgz",
"integrity": "sha512-K9mwJm/DaB6mRLZfw6q8IMXipcrmuT6yfhYmwhAkuh+81sChuYstYA+znlgaflUPaYUa3odxKPKGw6Vw/lANew==",
"funding": [
{
"type": "opencollective",

View file

@ -1,6 +1,6 @@
{
"name": "assist-server",
"version": "1.0.0",
"version": "v1.11.0",
"description": "assist server to get live sessions & sourcemaps reader to get stack trace",
"main": "peerjs-server.js",
"scripts": {

View file

@ -2,6 +2,7 @@ const dumps = require('./utils/HeapSnapshot');
const express = require('express');
const socket = require("./servers/websocket");
const {request_logger} = require("./utils/helper");
const health = require("./utils/health");
const assert = require('assert').strict;
const debug = process.env.debug === "1";
@ -10,7 +11,7 @@ const HOST = process.env.LISTEN_HOST || '0.0.0.0';
const PORT = process.env.LISTEN_PORT || 9001;
assert.ok(process.env.ASSIST_KEY, 'The "ASSIST_KEY" environment variable is required');
const P_KEY = process.env.ASSIST_KEY;
const PREFIX = process.env.PREFIX || process.env.prefix || `/assist`
const PREFIX = process.env.PREFIX || process.env.prefix || `/assist`;
const wsapp = express();
wsapp.use(express.json());
@ -27,16 +28,9 @@ heapdump && wsapp.use(`${PREFIX}/${P_KEY}/heapdump`, dumps.router);
const wsserver = wsapp.listen(PORT, HOST, () => {
console.log(`WS App listening on http://${HOST}:${PORT}`);
console.log('Press Ctrl+C to quit.');
health.healthApp.listen(health.PORT, HOST, health.listen_cb);
});
wsapp.enable('trust proxy');
socket.start(wsserver);
module.exports = {wsserver};
wsapp.get('/private/shutdown', (req, res) => {
console.log("Requested shutdown");
res.statusCode = 200;
res.end("ok!");
process.kill(1, "SIGTERM");
}
);
module.exports = {wsserver};

View file

@ -26,7 +26,7 @@ const debug = process.env.debug === "1";
const createSocketIOServer = function (server, prefix) {
io = _io(server, {
maxHttpBufferSize: (parseInt(process.env.maxHttpBufferSize) || 5) * 1e6,
maxHttpBufferSize: (parseFloat(process.env.maxHttpBufferSize) || 5) * 1e6,
cors: {
origin: "*",
methods: ["GET", "POST", "PUT"]

52
assist/utils/health.js Normal file
View file

@ -0,0 +1,52 @@
const express = require('express');
const HOST = process.env.LISTEN_HOST || '0.0.0.0';
const PORT = process.env.HEALTH_PORT || 8888;
const {request_logger} = require("./helper");
const debug = process.env.debug === "1";
const respond = function (res, data) {
res.statusCode = 200;
res.setHeader('Content-Type', 'application/json');
res.end(JSON.stringify({"data": data}));
}
const check_health = async function (req, res) {
debug && console.log("[WS]looking for all available sessions");
respond(res, {
"health": true,
"details": {
"version": process.env.npm_package_version
}
});
}
const healthApp = express();
healthApp.use(express.json());
healthApp.use(express.urlencoded({extended: true}));
healthApp.use(request_logger("[healthApp]"));
healthApp.get(['/'], (req, res) => {
res.statusCode = 200;
res.end("healthApp ok!");
}
);
healthApp.get('/health', check_health);
healthApp.get('/shutdown', (req, res) => {
console.log("Requested shutdown");
res.statusCode = 200;
res.end("ok!");
process.kill(1, "SIGTERM");
}
);
const listen_cb = async function () {
console.log(`Health App listening on http://${HOST}:${PORT}`);
console.log('Press Ctrl+C to quit.');
}
module.exports = {
healthApp,
PORT,
listen_cb
};

View file

@ -82,7 +82,9 @@ ENV TZ=UTC \
COMPRESSION_TYPE=zstd \
CH_USERNAME="default" \
CH_PASSWORD="" \
CH_DATABASE="default"
CH_DATABASE="default" \
# Max file size to process, default to 100MB
MAX_FILE_SIZE=100000000
RUN if [ "$SERVICE_NAME" = "http" ]; then \

View file

@ -13,7 +13,6 @@ import (
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/metrics"
assetsMetrics "openreplay/backend/pkg/metrics/assets"
"openreplay/backend/pkg/pprof"
"openreplay/backend/pkg/queue"
)
@ -24,9 +23,6 @@ func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := config.New()
if cfg.UseProfiler {
pprof.StartProfilingServer()
}
cacher := cacher.NewCacher(cfg)

View file

@ -1,174 +1,60 @@
package main
import (
"errors"
"log"
"os"
"os/signal"
"syscall"
"time"
"openreplay/backend/internal/config/db"
config "openreplay/backend/internal/config/db"
"openreplay/backend/internal/db"
"openreplay/backend/internal/db/datasaver"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/postgres"
types2 "openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/handlers"
custom2 "openreplay/backend/pkg/handlers/custom"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/metrics"
databaseMetrics "openreplay/backend/pkg/metrics/database"
"openreplay/backend/pkg/pprof"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/sessions"
"openreplay/backend/pkg/terminator"
)
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
m := metrics.New()
m.Register(databaseMetrics.List())
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := db.New()
if cfg.UseProfiler {
pprof.StartProfilingServer()
}
cfg := config.New()
// Init database
pg := cache.NewPGCache(
postgres.NewConn(cfg.Postgres.String(), cfg.BatchQueueLimit, cfg.BatchSizeLimit), cfg.ProjectExpirationTimeoutMs)
defer pg.Close()
// HandlersFabric returns the list of message handlers we want to be applied to each incoming message.
handlersFabric := func() []handlers.MessageProcessor {
return []handlers.MessageProcessor{
&custom2.EventMapper{},
custom2.NewInputEventBuilder(),
custom2.NewPageEventBuilder(),
}
}
// Create handler's aggregator
builderMap := sessions.NewBuilderMap(handlersFabric)
// Init modules
saver := datasaver.New(pg, cfg)
saver.InitStats()
// Init data saver
saver := datasaver.New(cfg, pg)
// Message filter
msgFilter := []int{messages.MsgMetadata, messages.MsgIssueEvent, messages.MsgSessionStart, messages.MsgSessionEnd,
messages.MsgUserID, messages.MsgUserAnonymousID, messages.MsgClickEvent,
messages.MsgIntegrationEvent, messages.MsgPerformanceTrackAggr,
messages.MsgJSException, messages.MsgResourceTiming,
messages.MsgCustomEvent, messages.MsgCustomIssue, messages.MsgFetch, messages.MsgNetworkRequest, messages.MsgGraphQL,
messages.MsgStateAction, messages.MsgSetInputTarget, messages.MsgSetInputValue, messages.MsgCreateDocument,
messages.MsgMouseClick, messages.MsgSetPageLocation, messages.MsgPageLoadTiming, messages.MsgPageRenderTiming}
// Handler logic
msgHandler := func(msg messages.Message) {
// Just save session data into db without additional checks
if err := saver.InsertMessage(msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, msg.SessionID(), msg)
}
return
}
var (
session *types2.Session
err error
)
if msg.TypeID() == messages.MsgSessionEnd {
session, err = pg.GetSession(msg.SessionID())
} else {
session, err = pg.Cache.GetSession(msg.SessionID())
}
if session == nil {
if err != nil && !errors.Is(err, cache.NilSessionInCacheError) {
log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, msg.SessionID(), msg)
}
return
}
// Save statistics to db
err = saver.InsertStats(session, msg)
if err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message: %v", err, session, msg)
}
// Handle heuristics and save to temporary queue in memory
builderMap.HandleMessage(msg)
// Process saved heuristics messages as usual messages above in the code
builderMap.IterateSessionReadyMessages(msg.SessionID(), func(msg messages.Message) {
if err := saver.InsertMessage(msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
return
}
if err := saver.InsertStats(session, msg); err != nil {
log.Printf("Stats Insertion Error %v; Session: %v, Message %v", err, session, msg)
}
})
}
messages.MsgUserID, messages.MsgUserAnonymousID, messages.MsgIntegrationEvent, messages.MsgPerformanceTrackAggr,
messages.MsgJSException, messages.MsgResourceTiming, messages.MsgCustomEvent, messages.MsgCustomIssue,
messages.MsgFetch, messages.MsgNetworkRequest, messages.MsgGraphQL, messages.MsgStateAction,
messages.MsgSetInputTarget, messages.MsgSetInputValue, messages.MsgCreateDocument, messages.MsgMouseClick,
messages.MsgSetPageLocation, messages.MsgPageLoadTiming, messages.MsgPageRenderTiming,
messages.MsgInputEvent, messages.MsgPageEvent, messages.MsgMouseThrashing, messages.MsgInputChange,
messages.MsgUnbindNodes}
// Init consumer
consumer := queue.NewConsumer(
cfg.GroupDB,
[]string{
cfg.TopicRawWeb, // from tracker
cfg.TopicAnalytics, // from heuristics
cfg.TopicRawWeb,
cfg.TopicAnalytics,
},
messages.NewMessageIterator(msgHandler, msgFilter, true),
messages.NewMessageIterator(saver.Handle, msgFilter, true),
false,
cfg.MessageSizeLimit,
)
// Run service and wait for TERM signal
service := db.New(cfg, consumer, saver)
log.Printf("Db service started\n")
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
commitTick := time.Tick(cfg.CommitBatchTimeout)
// Send collected batches to db
commitDBUpdates := func() {
// Commit collected batches and bulks of information to PG
pg.Commit()
// Commit collected batches of information to CH
if err := saver.CommitStats(); err != nil {
log.Printf("Error on stats commit: %v", err)
}
// Commit current position in queue
if err := consumer.Commit(); err != nil {
log.Printf("Error on consumer commit: %v", err)
}
}
for {
select {
case sig := <-sigchan:
log.Printf("Caught signal %s: terminating\n", sig.String())
commitDBUpdates()
if err := pg.Close(); err != nil {
log.Printf("db.Close error: %s", err)
}
if err := saver.Close(); err != nil {
log.Printf("saver.Close error: %s", err)
}
consumer.Close()
os.Exit(0)
case <-commitTick:
commitDBUpdates()
builderMap.ClearOldSessions()
case msg := <-consumer.Rebalanced():
log.Println(msg)
default:
// Handle new message from queue
if err := consumer.ConsumeNext(); err != nil {
log.Fatalf("Error on consumption: %v", err)
}
}
}
terminator.Wait(service)
}

View file

@ -18,7 +18,6 @@ import (
"openreplay/backend/pkg/metrics"
databaseMetrics "openreplay/backend/pkg/metrics/database"
enderMetrics "openreplay/backend/pkg/metrics/ender"
"openreplay/backend/pkg/pprof"
"openreplay/backend/pkg/queue"
)
@ -30,9 +29,6 @@ func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := ender.New()
if cfg.UseProfiler {
pprof.StartProfilingServer()
}
pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres.String(), 0, 0), cfg.ProjectExpirationTimeoutMs)
defer pg.Close()
@ -72,12 +68,12 @@ func main() {
consumer.Close()
os.Exit(0)
case <-tick:
failedSessionEnds := make(map[uint64]int64)
failedSessionEnds := make(map[uint64]uint64)
duplicatedSessionEnds := make(map[uint64]uint64)
// Find ended sessions and send notification to other services
sessions.HandleEndedSessions(func(sessionID uint64, timestamp int64) bool {
msg := &messages.SessionEnd{Timestamp: uint64(timestamp)}
sessions.HandleEndedSessions(func(sessionID uint64, timestamp uint64) bool {
msg := &messages.SessionEnd{Timestamp: timestamp}
currDuration, err := pg.GetSessionDuration(sessionID)
if err != nil {
log.Printf("getSessionDuration failed, sessID: %d, err: %s", sessionID, err)

View file

@ -2,90 +2,54 @@ package main
import (
"log"
"openreplay/backend/pkg/pprof"
"os"
"os/signal"
"syscall"
"time"
"openreplay/backend/internal/config/heuristics"
config "openreplay/backend/internal/config/heuristics"
"openreplay/backend/internal/heuristics"
"openreplay/backend/pkg/handlers"
web2 "openreplay/backend/pkg/handlers/web"
"openreplay/backend/pkg/intervals"
"openreplay/backend/pkg/handlers/custom"
"openreplay/backend/pkg/handlers/web"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/metrics"
heuristicsMetrics "openreplay/backend/pkg/metrics/heuristics"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/sessions"
"openreplay/backend/pkg/terminator"
)
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
m := metrics.New()
m.Register(heuristicsMetrics.List())
cfg := heuristics.New()
if cfg.UseProfiler {
pprof.StartProfilingServer()
}
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := config.New()
// HandlersFabric returns the list of message handlers we want to be applied to each incoming message.
handlersFabric := func() []handlers.MessageProcessor {
return []handlers.MessageProcessor{
// web handlers
&web2.ClickRageDetector{},
&web2.CpuIssueDetector{},
&web2.DeadClickDetector{},
&web2.MemoryIssueDetector{},
&web2.NetworkIssueDetector{},
&web2.PerformanceAggregator{},
// Other handlers (you can add your custom handlers here)
//&custom.CustomHandler{},
custom.NewInputEventBuilder(),
custom.NewPageEventBuilder(),
web.NewDeadClickDetector(),
&web.ClickRageDetector{},
&web.CpuIssueDetector{},
&web.MemoryIssueDetector{},
&web.NetworkIssueDetector{},
&web.PerformanceAggregator{},
}
}
// Create handler's aggregator
builderMap := sessions.NewBuilderMap(handlersFabric)
// Init producer and consumer for data bus
eventBuilder := sessions.NewBuilderMap(handlersFabric)
producer := queue.NewProducer(cfg.MessageSizeLimit, true)
msgHandler := func(msg messages.Message) {
builderMap.HandleMessage(msg)
}
consumer := queue.NewConsumer(
cfg.GroupHeuristics,
[]string{
cfg.TopicRawWeb,
},
messages.NewMessageIterator(msgHandler, nil, true),
messages.NewMessageIterator(eventBuilder.HandleMessage, nil, true),
false,
cfg.MessageSizeLimit,
)
// Run service and wait for TERM signal
service := heuristics.New(cfg, producer, consumer, eventBuilder)
log.Printf("Heuristics service started\n")
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
tick := time.Tick(intervals.EVENTS_COMMIT_INTERVAL * time.Millisecond)
for {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
producer.Close(cfg.ProducerTimeout)
consumer.Commit()
consumer.Close()
os.Exit(0)
case <-tick:
builderMap.IterateReadyMessages(func(sessionID uint64, readyMsg messages.Message) {
producer.Produce(cfg.TopicAnalytics, sessionID, readyMsg.Encode())
})
producer.Flush(cfg.ProducerTimeout)
consumer.Commit()
case msg := <-consumer.Rebalanced():
log.Println(msg)
default:
if err := consumer.ConsumeNext(); err != nil {
log.Fatalf("Error on consuming: %v", err)
}
}
}
terminator.Wait(service)
}

View file

@ -15,7 +15,6 @@ import (
"openreplay/backend/pkg/metrics"
databaseMetrics "openreplay/backend/pkg/metrics/database"
httpMetrics "openreplay/backend/pkg/metrics/http"
"openreplay/backend/pkg/pprof"
"openreplay/backend/pkg/queue"
)
@ -27,9 +26,6 @@ func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := http.New()
if cfg.UseProfiler {
pprof.StartProfilingServer()
}
// Connect to queue
producer := queue.NewProducer(cfg.MessageSizeLimit, true)

View file

@ -13,7 +13,6 @@ import (
"openreplay/backend/pkg/intervals"
"openreplay/backend/pkg/metrics"
databaseMetrics "openreplay/backend/pkg/metrics/database"
"openreplay/backend/pkg/pprof"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/token"
)
@ -25,9 +24,6 @@ func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := config.New()
if cfg.UseProfiler {
pprof.StartProfilingServer()
}
pg := postgres.NewConn(cfg.Postgres.String(), 0, 0)
defer pg.Close()

View file

@ -16,7 +16,6 @@ import (
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/metrics"
sinkMetrics "openreplay/backend/pkg/metrics/sink"
"openreplay/backend/pkg/pprof"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/url/assets"
)
@ -27,9 +26,6 @@ func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := sink.New()
if cfg.UseProfiler {
pprof.StartProfilingServer()
}
if _, err := os.Stat(cfg.FsDir); os.IsNotExist(err) {
log.Fatalf("%v doesn't exist. %v", cfg.FsDir, err)
@ -112,7 +108,7 @@ func main() {
log.Printf("zero ts; sessID: %d, msgType: %d", msg.SessionID(), msg.TypeID())
} else {
// Log ts of last processed message
counter.Update(msg.SessionID(), time.UnixMilli(ts))
counter.Update(msg.SessionID(), time.UnixMilli(int64(ts)))
}
// Try to encode message to avoid null data inserts

View file

@ -13,7 +13,6 @@ import (
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/metrics"
storageMetrics "openreplay/backend/pkg/metrics/storage"
"openreplay/backend/pkg/pprof"
"openreplay/backend/pkg/queue"
cloud "openreplay/backend/pkg/storage"
)
@ -25,9 +24,6 @@ func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := config.New()
if cfg.UseProfiler {
pprof.StartProfilingServer()
}
s3 := cloud.NewS3(cfg.S3Region, cfg.S3Bucket)
srv, err := storage.New(cfg, s3)

View file

@ -24,7 +24,7 @@ require (
github.com/sethvargo/go-envconfig v0.7.0
github.com/tomasen/realip v0.0.0-20180522021738-f0c99a92ddce
github.com/ua-parser/uap-go v0.0.0-20200325213135-e1c09f13e2fe
golang.org/x/net v0.1.1-0.20221104162952-702349b0e862
golang.org/x/net v0.8.0
google.golang.org/api v0.81.0
)
@ -61,8 +61,8 @@ require (
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 // indirect
golang.org/x/sys v0.1.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220519153652-3a47de7e79bd // indirect

View file

@ -589,8 +589,8 @@ golang.org/x/net v0.0.0-20220325170049-de3da57026de/go.mod h1:CfG3xpIq0wQ8r1q4Su
golang.org/x/net v0.0.0-20220412020605-290c469a71a5/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.1.1-0.20221104162952-702349b0e862 h1:KrLJ+iz8J6j6VVr/OCfULAcK+xozUmWE43fKpMR4MlI=
golang.org/x/net v0.1.1-0.20221104162952-702349b0e862/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@ -702,8 +702,8 @@ golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220429233432-b5fbb4746d32/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220502124256-b6088ccd6cba/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -715,8 +715,8 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=

View file

@ -2,9 +2,11 @@ package cacher
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"io/ioutil"
"log"
"mime"
"net/http"
metrics "openreplay/backend/pkg/metrics/assets"
@ -38,6 +40,35 @@ func (c *cacher) CanCache() bool {
func NewCacher(cfg *config.Config) *cacher {
rewriter := assets.NewRewriter(cfg.AssetsOrigin)
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
}
if cfg.ClientCertFilePath != "" && cfg.ClientKeyFilePath != "" && cfg.CaCertFilePath != "" {
var cert tls.Certificate
var err error
cert, err = tls.LoadX509KeyPair(cfg.ClientCertFilePath, cfg.ClientKeyFilePath)
if err != nil {
log.Fatalf("Error creating x509 keypair from the client cert file %s and client key file %s , Error: %s", err, cfg.ClientCertFilePath, cfg.ClientKeyFilePath)
}
caCert, err := ioutil.ReadFile(cfg.CaCertFilePath)
if err != nil {
log.Fatalf("Error opening cert file %s, Error: %s", cfg.CaCertFilePath, err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig = &tls.Config{
InsecureSkipVerify: true,
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}
}
c := &cacher{
timeoutMap: newTimeoutMap(),
s3: storage.NewS3(cfg.AWSRegion, cfg.S3BucketAssets),
@ -45,7 +76,7 @@ func NewCacher(cfg *config.Config) *cacher {
Timeout: time.Duration(6) * time.Second,
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
TLSClientConfig: tlsConfig,
},
},
rewriter: rewriter,
@ -104,6 +135,13 @@ func (c *cacher) cacheURL(t *Task) {
if contentType == "" {
contentType = mime.TypeByExtension(filepath.Ext(res.Request.URL.Path))
}
// Skip html file (usually it's a CDN mock for 404 error)
if strings.HasPrefix(contentType, "text/html") {
c.Errors <- errors.Wrap(fmt.Errorf("context type is text/html, sessID: %d", t.sessionID), t.urlContext)
return
}
isCSS := strings.HasPrefix(contentType, "text/css")
strData := string(data)

View file

@ -15,6 +15,9 @@ type Config struct {
AssetsSizeLimit int `env:"ASSETS_SIZE_LIMIT,required"`
AssetsRequestHeaders map[string]string `env:"ASSETS_REQUEST_HEADERS"`
UseProfiler bool `env:"PROFILER_ENABLED,default=false"`
ClientKeyFilePath string `env:"CLIENT_KEY_FILE_PATH"`
CaCertFilePath string `env:"CA_CERT_FILE_PATH"`
ClientCertFilePath string `env:"CLIENT_CERT_FILE_PATH"`
}
func New() *Config {

View file

@ -3,6 +3,7 @@ package heuristics
import (
"openreplay/backend/internal/config/common"
"openreplay/backend/internal/config/configurator"
"openreplay/backend/pkg/pprof"
)
type Config struct {
@ -19,5 +20,8 @@ type Config struct {
func New() *Config {
cfg := &Config{}
configurator.Process(cfg)
if cfg.UseProfiler {
pprof.StartProfilingServer()
}
return cfg
}

View file

@ -1,74 +0,0 @@
package datasaver
import (
"fmt"
. "openreplay/backend/pkg/messages"
)
func (mi *Saver) InsertMessage(msg Message) error {
sessionID := msg.SessionID()
switch m := msg.(type) {
// Common
case *Metadata:
if err := mi.pg.InsertMetadata(sessionID, m); err != nil {
return fmt.Errorf("insert metadata err: %s", err)
}
return nil
case *IssueEvent:
return mi.pg.InsertIssueEvent(sessionID, m)
//TODO: message adapter (transformer) (at the level of pkg/message) for types: *IOSMetadata, *IOSIssueEvent and others
// Web
case *SessionStart:
return mi.pg.HandleWebSessionStart(sessionID, m)
case *SessionEnd:
return mi.pg.HandleWebSessionEnd(sessionID, m)
case *UserID:
return mi.pg.InsertWebUserID(sessionID, m)
case *UserAnonymousID:
return mi.pg.InsertWebUserAnonymousID(sessionID, m)
case *CustomEvent:
return mi.pg.InsertWebCustomEvent(sessionID, m)
case *ClickEvent:
return mi.pg.InsertWebClickEvent(sessionID, m)
case *InputEvent:
return mi.pg.InsertWebInputEvent(sessionID, m)
// Unique Web messages
case *PageEvent:
return mi.pg.InsertWebPageEvent(sessionID, m)
case *NetworkRequest:
return mi.pg.InsertWebNetworkRequest(sessionID, m)
case *GraphQL:
return mi.pg.InsertWebGraphQL(sessionID, m)
case *JSException:
return mi.pg.InsertWebJSException(m)
case *IntegrationEvent:
return mi.pg.InsertWebIntegrationEvent(m)
// IOS
case *IOSSessionStart:
return mi.pg.InsertIOSSessionStart(sessionID, m)
case *IOSSessionEnd:
return mi.pg.InsertIOSSessionEnd(sessionID, m)
case *IOSUserID:
return mi.pg.InsertIOSUserID(sessionID, m)
case *IOSUserAnonymousID:
return mi.pg.InsertIOSUserAnonymousID(sessionID, m)
case *IOSCustomEvent:
return mi.pg.InsertIOSCustomEvent(sessionID, m)
case *IOSClickEvent:
return mi.pg.InsertIOSClickEvent(sessionID, m)
case *IOSInputEvent:
return mi.pg.InsertIOSInputEvent(sessionID, m)
// Unique IOS messages
case *IOSNetworkCall:
return mi.pg.InsertIOSNetworkCall(sessionID, m)
case *IOSScreenEnter:
return mi.pg.InsertIOSScreenEnter(sessionID, m)
case *IOSCrash:
return mi.pg.InsertIOSCrash(sessionID, m)
}
return nil // "Not implemented"
}

View file

@ -0,0 +1,19 @@
package datasaver
import (
. "openreplay/backend/pkg/messages"
)
func (s *saverImpl) init() {
// noop
}
func (s *saverImpl) handleExtraMessage(msg Message) error {
switch m := msg.(type) {
case *PerformanceTrackAggr:
return s.pg.InsertWebStatsPerformance(m)
case *ResourceTiming:
return s.pg.InsertWebStatsResourceEvent(m)
}
return nil
}

View file

@ -1,16 +1,130 @@
package datasaver
import (
"log"
"openreplay/backend/internal/config/db"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/db/clickhouse"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/db/types"
. "openreplay/backend/pkg/messages"
queue "openreplay/backend/pkg/queue/types"
)
type Saver struct {
pg *cache.PGCache
producer types.Producer
type Saver interface {
Handle(msg Message)
Commit() error
Close() error
}
func New(pg *cache.PGCache, _ *db.Config) *Saver {
return &Saver{pg: pg, producer: nil}
type saverImpl struct {
cfg *db.Config
pg *cache.PGCache
ch clickhouse.Connector
producer queue.Producer
}
func New(cfg *db.Config, pg *cache.PGCache) Saver {
s := &saverImpl{cfg: cfg, pg: pg}
s.init()
return s
}
func (s *saverImpl) Handle(msg Message) {
if msg.TypeID() == MsgCustomEvent {
defer s.Handle(types.WrapCustomEvent(msg.(*CustomEvent)))
}
if err := s.handleMessage(msg); err != nil {
if !postgres.IsPkeyViolation(err) {
log.Printf("Message Insertion Error %v, SessionID: %v, Message: %v", err, msg.SessionID(), msg)
}
return
}
if err := s.handleExtraMessage(msg); err != nil {
log.Printf("Stats Insertion Error %v; Session: %d, Message: %v", err, msg.SessionID(), msg)
}
return
}
func (s *saverImpl) handleMessage(msg Message) error {
switch m := msg.(type) {
case *Metadata:
return s.pg.InsertMetadata(m)
case *IssueEvent:
return s.pg.InsertIssueEvent(m)
case *SessionStart:
return s.pg.HandleWebSessionStart(m)
case *SessionEnd:
return s.pg.HandleWebSessionEnd(m)
case *UserID:
return s.pg.InsertWebUserID(m)
case *UserAnonymousID:
return s.pg.InsertWebUserAnonymousID(m)
case *CustomEvent:
return s.pg.InsertWebCustomEvent(m)
case *MouseClick:
return s.pg.InsertWebClickEvent(m)
case *InputEvent:
return s.pg.InsertWebInputEvent(m)
case *PageEvent:
return s.pg.InsertWebPageEvent(m)
case *NetworkRequest:
return s.pg.InsertWebNetworkRequest(m)
case *GraphQL:
return s.pg.InsertWebGraphQL(m)
case *JSException:
return s.pg.InsertWebJSException(m)
case *IntegrationEvent:
return s.pg.InsertWebIntegrationEvent(m)
case *InputChange:
return s.pg.InsertWebInputDuration(m)
case *MouseThrashing:
return s.pg.InsertMouseThrashing(m)
case *IOSSessionStart:
return s.pg.InsertIOSSessionStart(m)
case *IOSSessionEnd:
return s.pg.InsertIOSSessionEnd(m)
case *IOSUserID:
return s.pg.InsertIOSUserID(m)
case *IOSUserAnonymousID:
return s.pg.InsertIOSUserAnonymousID(m)
case *IOSCustomEvent:
return s.pg.InsertIOSCustomEvent(m)
case *IOSClickEvent:
return s.pg.InsertIOSClickEvent(m)
case *IOSInputEvent:
return s.pg.InsertIOSInputEvent(m)
case *IOSNetworkCall:
return s.pg.InsertIOSNetworkCall(m)
case *IOSScreenEnter:
return s.pg.InsertIOSScreenEnter(m)
case *IOSCrash:
return s.pg.InsertIOSCrash(m)
}
return nil
}
func (s *saverImpl) Commit() error {
if s.pg != nil {
s.pg.Commit()
}
if s.ch != nil {
s.ch.Commit()
}
return nil
}
func (s *saverImpl) Close() error {
if s.pg != nil {
if err := s.pg.Close(); err != nil {
log.Printf("pg.Close error: %s", err)
}
}
if s.ch != nil {
if err := s.ch.Stop(); err != nil {
log.Printf("ch.Close error: %s", err)
}
}
return nil
}

View file

@ -1,29 +0,0 @@
package datasaver
import (
. "openreplay/backend/pkg/db/types"
. "openreplay/backend/pkg/messages"
)
func (si *Saver) InitStats() {
// noop
}
func (si *Saver) InsertStats(session *Session, msg Message) error {
switch m := msg.(type) {
// Web
case *PerformanceTrackAggr:
return si.pg.InsertWebStatsPerformance(session.SessionID, m)
case *ResourceEvent:
return si.pg.InsertWebStatsResourceEvent(session.SessionID, m)
}
return nil
}
func (si *Saver) CommitStats() error {
return nil
}
func (si *Saver) Close() error {
return nil
}

View file

@ -0,0 +1,56 @@
package db
import (
"log"
"time"
"openreplay/backend/internal/config/db"
"openreplay/backend/internal/db/datasaver"
"openreplay/backend/internal/service"
"openreplay/backend/pkg/queue/types"
)
type dbImpl struct {
cfg *db.Config
consumer types.Consumer
saver datasaver.Saver
}
func New(cfg *db.Config, consumer types.Consumer, saver datasaver.Saver) service.Interface {
s := &dbImpl{
cfg: cfg,
consumer: consumer,
saver: saver,
}
go s.run()
return s
}
func (d *dbImpl) run() {
commitTick := time.Tick(d.cfg.CommitBatchTimeout)
for {
select {
case <-commitTick:
d.commit()
case msg := <-d.consumer.Rebalanced():
log.Println(msg)
default:
if err := d.consumer.ConsumeNext(); err != nil {
log.Fatalf("Error on consumption: %v", err)
}
}
}
}
func (d *dbImpl) commit() {
d.saver.Commit()
d.consumer.Commit()
}
func (d *dbImpl) Stop() {
d.commit()
if err := d.saver.Close(); err != nil {
log.Printf("saver.Close error: %s", err)
}
d.consumer.Close()
}

View file

@ -0,0 +1,87 @@
package heuristics
import (
"fmt"
"log"
"openreplay/backend/pkg/messages"
metrics "openreplay/backend/pkg/metrics/heuristics"
"time"
"openreplay/backend/internal/config/heuristics"
"openreplay/backend/internal/service"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/sessions"
)
type heuristicsImpl struct {
cfg *heuristics.Config
producer types.Producer
consumer types.Consumer
events sessions.EventBuilder
}
func New(cfg *heuristics.Config, p types.Producer, c types.Consumer, e sessions.EventBuilder) service.Interface {
s := &heuristicsImpl{
cfg: cfg,
producer: p,
consumer: c,
events: e,
}
go s.run()
return s
}
func (h *heuristicsImpl) run() {
tick := time.Tick(10 * time.Second)
for {
select {
case evt := <-h.events.Events():
if err := h.producer.Produce(h.cfg.TopicAnalytics, evt.SessionID(), evt.Encode()); err != nil {
log.Printf("can't send new event to queue: %s", err)
} else {
metrics.IncreaseTotalEvents(messageTypeName(evt))
}
case <-tick:
h.producer.Flush(h.cfg.ProducerTimeout)
h.consumer.Commit()
case msg := <-h.consumer.Rebalanced():
log.Println(msg)
default:
if err := h.consumer.ConsumeNext(); err != nil {
log.Fatalf("Error on consuming: %v", err)
}
}
}
}
func (h *heuristicsImpl) Stop() {
// Stop event builder and flush all events
log.Println("stopping heuristics service")
h.events.Stop()
for evt := range h.events.Events() {
if err := h.producer.Produce(h.cfg.TopicAnalytics, evt.SessionID(), evt.Encode()); err != nil {
log.Printf("can't send new event to queue: %s", err)
}
}
h.producer.Close(h.cfg.ProducerTimeout)
h.consumer.Commit()
h.consumer.Close()
}
func messageTypeName(msg messages.Message) string {
switch msg.TypeID() {
case 31:
return "PageEvent"
case 32:
return "InputEvent"
case 56:
return "PerformanceTrackAggr"
case 69:
return "MouseClick"
case 125:
m := msg.(*messages.IssueEvent)
return fmt.Sprintf("IssueEvent(%s)", m.Type)
default:
return "unknown"
}
}

View file

@ -0,0 +1,5 @@
package service
type Interface interface {
Stop()
}

View file

@ -9,13 +9,13 @@ import (
)
// EndedSessionHandler handler for ended sessions
type EndedSessionHandler func(sessionID uint64, timestamp int64) bool
type EndedSessionHandler func(sessionID uint64, timestamp uint64) bool
// session holds information about user's session live status
type session struct {
lastTimestamp int64
lastUpdate int64
lastUserTime int64
lastUserTime uint64
isEnded bool
}

View file

@ -95,6 +95,7 @@ func (s *Storage) Upload(msg *messages.SessionEnd) (err error) {
if err != nil {
if strings.Contains(err.Error(), "big file") {
log.Printf("%s, sess: %d", err, msg.SessionID())
metrics.IncreaseStorageTotalSkippedSessions()
return nil
}
return err
@ -110,6 +111,7 @@ func (s *Storage) openSession(filePath string, tp FileType) ([]byte, error) {
// Check file size before download into memory
info, err := os.Stat(filePath)
if err == nil && info.Size() > s.cfg.MaxFileSize {
metrics.RecordSkippedSessionSize(float64(info.Size()), tp.String())
return nil, fmt.Errorf("big file, size: %d", info.Size())
}
// Read file into memory

View file

@ -21,7 +21,8 @@ func (c *PGCache) HandleSessionEnd(sessionID uint64) error {
return nil
}
func (c *PGCache) InsertIssueEvent(sessionID uint64, crash *IssueEvent) error {
func (c *PGCache) InsertIssueEvent(crash *IssueEvent) error {
sessionID := crash.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
@ -29,7 +30,8 @@ func (c *PGCache) InsertIssueEvent(sessionID uint64, crash *IssueEvent) error {
return c.Conn.InsertIssueEvent(sessionID, session.ProjectID, crash)
}
func (c *PGCache) InsertMetadata(sessionID uint64, metadata *Metadata) error {
func (c *PGCache) InsertMetadata(metadata *Metadata) error {
sessionID := metadata.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err

View file

@ -6,7 +6,8 @@ import (
. "openreplay/backend/pkg/messages"
)
func (c *PGCache) InsertIOSSessionStart(sessionID uint64, s *IOSSessionStart) error {
func (c *PGCache) InsertIOSSessionStart(s *IOSSessionStart) error {
sessionID := s.SessionID()
if c.Cache.HasSession(sessionID) {
return fmt.Errorf("session %d already in cache", sessionID)
}
@ -33,13 +34,15 @@ func (c *PGCache) InsertIOSSessionStart(sessionID uint64, s *IOSSessionStart) er
return nil
}
func (c *PGCache) InsertIOSSessionEnd(sessionID uint64, e *IOSSessionEnd) error {
func (c *PGCache) InsertIOSSessionEnd(e *IOSSessionEnd) error {
sessionID := e.SessionID()
_, err := c.InsertSessionEnd(sessionID, e.Timestamp)
return err
}
func (c *PGCache) InsertIOSScreenEnter(sessionID uint64, screenEnter *IOSScreenEnter) error {
if err := c.Conn.InsertIOSScreenEnter(sessionID, screenEnter); err != nil {
func (c *PGCache) InsertIOSScreenEnter(screenEnter *IOSScreenEnter) error {
sessionID := screenEnter.SessionID()
if err := c.Conn.InsertIOSScreenEnter(screenEnter); err != nil {
return err
}
session, err := c.Cache.GetSession(sessionID)
@ -50,8 +53,9 @@ func (c *PGCache) InsertIOSScreenEnter(sessionID uint64, screenEnter *IOSScreenE
return nil
}
func (c *PGCache) InsertIOSClickEvent(sessionID uint64, clickEvent *IOSClickEvent) error {
if err := c.Conn.InsertIOSClickEvent(sessionID, clickEvent); err != nil {
func (c *PGCache) InsertIOSClickEvent(clickEvent *IOSClickEvent) error {
sessionID := clickEvent.SessionID()
if err := c.Conn.InsertIOSClickEvent(clickEvent); err != nil {
return err
}
session, err := c.Cache.GetSession(sessionID)
@ -62,8 +66,9 @@ func (c *PGCache) InsertIOSClickEvent(sessionID uint64, clickEvent *IOSClickEven
return nil
}
func (c *PGCache) InsertIOSInputEvent(sessionID uint64, inputEvent *IOSInputEvent) error {
if err := c.Conn.InsertIOSInputEvent(sessionID, inputEvent); err != nil {
func (c *PGCache) InsertIOSInputEvent(inputEvent *IOSInputEvent) error {
sessionID := inputEvent.SessionID()
if err := c.Conn.InsertIOSInputEvent(inputEvent); err != nil {
return err
}
session, err := c.Cache.GetSession(sessionID)
@ -74,18 +79,15 @@ func (c *PGCache) InsertIOSInputEvent(sessionID uint64, inputEvent *IOSInputEven
return nil
}
func (c *PGCache) InsertIOSCrash(sessionID uint64, crash *IOSCrash) error {
func (c *PGCache) InsertIOSCrash(crash *IOSCrash) error {
sessionID := crash.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
}
if err := c.Conn.InsertIOSCrash(sessionID, session.ProjectID, crash); err != nil {
if err := c.Conn.InsertIOSCrash(session.ProjectID, crash); err != nil {
return err
}
session.ErrorsCount += 1
return nil
}
func (c *PGCache) InsertIOSIssueEvent(sessionID uint64, issueEvent *IOSIssueEvent) error {
return nil
}

View file

@ -30,7 +30,8 @@ func (c *PGCache) InsertWebSessionStart(sessionID uint64, s *SessionStart) error
})
}
func (c *PGCache) HandleWebSessionStart(sessionID uint64, s *SessionStart) error {
func (c *PGCache) HandleWebSessionStart(s *SessionStart) error {
sessionID := s.SessionID()
if c.Cache.HasSession(sessionID) {
return fmt.Errorf("session %d already in cache", sessionID)
}
@ -69,7 +70,8 @@ func (c *PGCache) InsertWebSessionEnd(sessionID uint64, e *SessionEnd) error {
return err
}
func (c *PGCache) HandleWebSessionEnd(sessionID uint64, e *SessionEnd) error {
func (c *PGCache) HandleWebSessionEnd(e *SessionEnd) error {
sessionID := e.SessionID()
return c.HandleSessionEnd(sessionID)
}
@ -99,7 +101,8 @@ func (c *PGCache) InsertSessionReferrer(sessionID uint64, referrer string) error
return c.Conn.InsertSessionReferrer(sessionID, referrer)
}
func (c *PGCache) InsertWebNetworkRequest(sessionID uint64, e *NetworkRequest) error {
func (c *PGCache) InsertWebNetworkRequest(e *NetworkRequest) error {
sessionID := e.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
@ -111,7 +114,8 @@ func (c *PGCache) InsertWebNetworkRequest(sessionID uint64, e *NetworkRequest) e
return c.Conn.InsertWebNetworkRequest(sessionID, session.ProjectID, project.SaveRequestPayloads, e)
}
func (c *PGCache) InsertWebGraphQL(sessionID uint64, e *GraphQL) error {
func (c *PGCache) InsertWebGraphQL(e *GraphQL) error {
sessionID := e.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
@ -123,7 +127,8 @@ func (c *PGCache) InsertWebGraphQL(sessionID uint64, e *GraphQL) error {
return c.Conn.InsertWebGraphQL(sessionID, session.ProjectID, project.SaveRequestPayloads, e)
}
func (c *PGCache) InsertWebCustomEvent(sessionID uint64, e *CustomEvent) error {
func (c *PGCache) InsertWebCustomEvent(e *CustomEvent) error {
sessionID := e.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
@ -131,7 +136,8 @@ func (c *PGCache) InsertWebCustomEvent(sessionID uint64, e *CustomEvent) error {
return c.Conn.InsertWebCustomEvent(sessionID, session.ProjectID, e)
}
func (c *PGCache) InsertWebUserID(sessionID uint64, userID *UserID) error {
func (c *PGCache) InsertWebUserID(userID *UserID) error {
sessionID := userID.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
@ -139,7 +145,8 @@ func (c *PGCache) InsertWebUserID(sessionID uint64, userID *UserID) error {
return c.Conn.InsertWebUserID(sessionID, session.ProjectID, userID)
}
func (c *PGCache) InsertWebUserAnonymousID(sessionID uint64, userAnonymousID *UserAnonymousID) error {
func (c *PGCache) InsertWebUserAnonymousID(userAnonymousID *UserAnonymousID) error {
sessionID := userAnonymousID.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
@ -147,7 +154,8 @@ func (c *PGCache) InsertWebUserAnonymousID(sessionID uint64, userAnonymousID *Us
return c.Conn.InsertWebUserAnonymousID(sessionID, session.ProjectID, userAnonymousID)
}
func (c *PGCache) InsertWebPageEvent(sessionID uint64, e *PageEvent) error {
func (c *PGCache) InsertWebPageEvent(e *PageEvent) error {
sessionID := e.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
@ -155,7 +163,8 @@ func (c *PGCache) InsertWebPageEvent(sessionID uint64, e *PageEvent) error {
return c.Conn.InsertWebPageEvent(sessionID, session.ProjectID, e)
}
func (c *PGCache) InsertWebClickEvent(sessionID uint64, e *ClickEvent) error {
func (c *PGCache) InsertWebClickEvent(e *MouseClick) error {
sessionID := e.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
@ -163,10 +172,29 @@ func (c *PGCache) InsertWebClickEvent(sessionID uint64, e *ClickEvent) error {
return c.Conn.InsertWebClickEvent(sessionID, session.ProjectID, e)
}
func (c *PGCache) InsertWebInputEvent(sessionID uint64, e *InputEvent) error {
func (c *PGCache) InsertWebInputEvent(e *InputEvent) error {
sessionID := e.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
}
return c.Conn.InsertWebInputEvent(sessionID, session.ProjectID, e)
}
func (c *PGCache) InsertWebInputDuration(e *InputChange) error {
sessionID := e.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
}
return c.Conn.InsertWebInputDuration(sessionID, session.ProjectID, e)
}
func (c *PGCache) InsertMouseThrashing(e *MouseThrashing) error {
sessionID := e.SessionID()
session, err := c.Cache.GetSession(sessionID)
if err != nil {
return err
}
return c.Conn.InsertMouseThrashing(sessionID, session.ProjectID, e)
}

View file

@ -0,0 +1,24 @@
package clickhouse
import (
"openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/messages"
)
type Connector interface {
Prepare() error
Commit() error
Stop() error
InsertWebSession(session *types.Session) error
InsertWebResourceEvent(session *types.Session, msg *messages.ResourceTiming) error
InsertWebPageEvent(session *types.Session, msg *messages.PageEvent) error
InsertWebClickEvent(session *types.Session, msg *messages.MouseClick) error
InsertWebInputEvent(session *types.Session, msg *messages.InputEvent) error
InsertWebErrorEvent(session *types.Session, msg *types.ErrorEvent) error
InsertWebPerformanceTrackAggr(session *types.Session, msg *messages.PerformanceTrackAggr) error
InsertAutocomplete(session *types.Session, msgType, msgValue string) error
InsertRequest(session *types.Session, msg *messages.NetworkRequest, savePayload bool) error
InsertCustom(session *types.Session, msg *messages.CustomEvent) error
InsertGraphQL(session *types.Session, msg *messages.GraphQL) error
InsertIssue(session *types.Session, msg *messages.IssueEvent) error
}

View file

@ -193,9 +193,7 @@ func (conn *BatchSet) worker() {
for {
select {
case t := <-conn.workerTask:
start := time.Now()
conn.sendBatches(t)
log.Printf("pg batches dur: %d", time.Now().Sub(start).Milliseconds())
case <-conn.done:
if len(conn.workerTask) > 0 {
for t := range conn.workerTask {

View file

@ -2,7 +2,6 @@ package postgres
import (
"log"
"time"
)
type bulksTask struct {
@ -10,7 +9,7 @@ type bulksTask struct {
}
func NewBulksTask() *bulksTask {
return &bulksTask{bulks: make([]Bulk, 0, 14)}
return &bulksTask{bulks: make([]Bulk, 0, 15)}
}
type BulkSet struct {
@ -20,6 +19,7 @@ type BulkSet struct {
customEvents Bulk
webPageEvents Bulk
webInputEvents Bulk
webInputDurations Bulk
webGraphQL Bulk
webErrors Bulk
webErrorEvents Bulk
@ -58,6 +58,8 @@ func (conn *BulkSet) Get(name string) Bulk {
return conn.webPageEvents
case "webInputEvents":
return conn.webInputEvents
case "webInputDurations":
return conn.webInputDurations
case "webGraphQL":
return conn.webGraphQL
case "webErrors":
@ -127,6 +129,14 @@ func (conn *BulkSet) initBulks() {
if err != nil {
log.Fatalf("can't create webPageEvents bulk: %s", err)
}
conn.webInputDurations, err = NewBulk(conn.c,
"events.inputs",
"(session_id, message_id, timestamp, value, label, hesitation, duration)",
"($%d, $%d, $%d, LEFT($%d, 2000), NULLIF(LEFT($%d, 2000),''), $%d, $%d)",
7, 200)
if err != nil {
log.Fatalf("can't create webPageEvents bulk: %s", err)
}
conn.webGraphQL, err = NewBulk(conn.c,
"events.graphql",
"(session_id, timestamp, message_id, name, request_body, response_body)",
@ -185,9 +195,9 @@ func (conn *BulkSet) initBulks() {
}
conn.webClickEvents, err = NewBulk(conn.c,
"events.clicks",
"(session_id, message_id, timestamp, label, selector, url, path)",
"($%d, $%d, $%d, NULLIF(LEFT($%d, 2000), ''), LEFT($%d, 8000), LEFT($%d, 2000), LEFT($%d, 2000))",
7, 200)
"(session_id, message_id, timestamp, label, selector, url, path, hesitation)",
"($%d, $%d, $%d, NULLIF(LEFT($%d, 2000), ''), LEFT($%d, 8000), LEFT($%d, 2000), LEFT($%d, 2000), $%d)",
8, 200)
if err != nil {
log.Fatalf("can't create webClickEvents bulk: %s", err)
}
@ -210,6 +220,7 @@ func (conn *BulkSet) Send() {
newTask.bulks = append(newTask.bulks, conn.customEvents)
newTask.bulks = append(newTask.bulks, conn.webPageEvents)
newTask.bulks = append(newTask.bulks, conn.webInputEvents)
newTask.bulks = append(newTask.bulks, conn.webInputDurations)
newTask.bulks = append(newTask.bulks, conn.webGraphQL)
newTask.bulks = append(newTask.bulks, conn.webErrors)
newTask.bulks = append(newTask.bulks, conn.webErrorEvents)
@ -243,9 +254,7 @@ func (conn *BulkSet) worker() {
for {
select {
case t := <-conn.workerTask:
start := time.Now()
conn.sendBulks(t)
log.Printf("pg bulks dur: %d", time.Now().Sub(start).Milliseconds())
case <-conn.done:
if len(conn.workerTask) > 0 {
for t := range conn.workerTask {

View file

@ -17,7 +17,7 @@ type Conn struct {
c Pool
batches *BatchSet
bulks *BulkSet
chConn CH
chConn CH // hack for autocomplete inserts, TODO: rewrite
}
func (conn *Conn) SetClickHouse(ch CH) {

View file

@ -6,7 +6,8 @@ import (
"openreplay/backend/pkg/url"
)
func (conn *Conn) InsertIOSCustomEvent(sessionID uint64, e *messages.IOSCustomEvent) error {
func (conn *Conn) InsertIOSCustomEvent(e *messages.IOSCustomEvent) error {
sessionID := e.SessionID()
err := conn.InsertCustomEvent(sessionID, e.Timestamp, truncSqIdx(e.Index), e.Name, e.Payload)
if err == nil {
conn.insertAutocompleteValue(sessionID, 0, "CUSTOM_IOS", e.Name)
@ -14,7 +15,8 @@ func (conn *Conn) InsertIOSCustomEvent(sessionID uint64, e *messages.IOSCustomEv
return err
}
func (conn *Conn) InsertIOSUserID(sessionID uint64, userID *messages.IOSUserID) error {
func (conn *Conn) InsertIOSUserID(userID *messages.IOSUserID) error {
sessionID := userID.SessionID()
err := conn.InsertUserID(sessionID, userID.Value)
if err == nil {
conn.insertAutocompleteValue(sessionID, 0, "USERID_IOS", userID.Value)
@ -22,7 +24,8 @@ func (conn *Conn) InsertIOSUserID(sessionID uint64, userID *messages.IOSUserID)
return err
}
func (conn *Conn) InsertIOSUserAnonymousID(sessionID uint64, userAnonymousID *messages.IOSUserAnonymousID) error {
func (conn *Conn) InsertIOSUserAnonymousID(userAnonymousID *messages.IOSUserAnonymousID) error {
sessionID := userAnonymousID.SessionID()
err := conn.InsertUserAnonymousID(sessionID, userAnonymousID.Value)
if err == nil {
conn.insertAutocompleteValue(sessionID, 0, "USERANONYMOUSID_IOS", userAnonymousID.Value)
@ -30,7 +33,8 @@ func (conn *Conn) InsertIOSUserAnonymousID(sessionID uint64, userAnonymousID *me
return err
}
func (conn *Conn) InsertIOSNetworkCall(sessionID uint64, e *messages.IOSNetworkCall) error {
func (conn *Conn) InsertIOSNetworkCall(e *messages.IOSNetworkCall) error {
sessionID := e.SessionID()
err := conn.InsertRequest(sessionID, e.Timestamp, truncSqIdx(e.Index), e.URL, e.Duration, e.Success)
if err == nil {
conn.insertAutocompleteValue(sessionID, 0, "REQUEST_IOS", url.DiscardURLQuery(e.URL))
@ -38,7 +42,8 @@ func (conn *Conn) InsertIOSNetworkCall(sessionID uint64, e *messages.IOSNetworkC
return err
}
func (conn *Conn) InsertIOSScreenEnter(sessionID uint64, screenEnter *messages.IOSScreenEnter) error {
func (conn *Conn) InsertIOSScreenEnter(screenEnter *messages.IOSScreenEnter) error {
sessionID := screenEnter.SessionID()
tx, err := conn.c.Begin()
if err != nil {
return err
@ -69,7 +74,8 @@ func (conn *Conn) InsertIOSScreenEnter(sessionID uint64, screenEnter *messages.I
return nil
}
func (conn *Conn) InsertIOSClickEvent(sessionID uint64, clickEvent *messages.IOSClickEvent) error {
func (conn *Conn) InsertIOSClickEvent(clickEvent *messages.IOSClickEvent) error {
sessionID := clickEvent.SessionID()
tx, err := conn.c.Begin()
if err != nil {
return err
@ -100,7 +106,8 @@ func (conn *Conn) InsertIOSClickEvent(sessionID uint64, clickEvent *messages.IOS
return nil
}
func (conn *Conn) InsertIOSInputEvent(sessionID uint64, inputEvent *messages.IOSInputEvent) error {
func (conn *Conn) InsertIOSInputEvent(inputEvent *messages.IOSInputEvent) error {
sessionID := inputEvent.SessionID()
tx, err := conn.c.Begin()
if err != nil {
return err
@ -137,7 +144,8 @@ func (conn *Conn) InsertIOSInputEvent(sessionID uint64, inputEvent *messages.IOS
return nil
}
func (conn *Conn) InsertIOSCrash(sessionID uint64, projectID uint32, crash *messages.IOSCrash) error {
func (conn *Conn) InsertIOSCrash(projectID uint32, crash *messages.IOSCrash) error {
sessionID := crash.SessionID()
tx, err := conn.c.Begin()
if err != nil {
return err

View file

@ -5,7 +5,8 @@ import (
"openreplay/backend/pkg/url"
)
func (conn *Conn) InsertWebStatsPerformance(sessionID uint64, p *PerformanceTrackAggr) error {
func (conn *Conn) InsertWebStatsPerformance(p *PerformanceTrackAggr) error {
sessionID := p.SessionID()
timestamp := (p.TimestampEnd + p.TimestampStart) / 2
sqlRequest := `
@ -35,40 +36,37 @@ func (conn *Conn) InsertWebStatsPerformance(sessionID uint64, p *PerformanceTrac
return nil
}
func (conn *Conn) InsertWebStatsResourceEvent(sessionID uint64, e *ResourceEvent) error {
func (conn *Conn) InsertWebStatsResourceEvent(e *ResourceTiming) error {
sessionID := e.SessionID()
host, _, _, err := url.GetURLParts(e.URL)
if err != nil {
return err
}
msgType := url.GetResourceType(e.Initiator, e.URL)
sqlRequest := `
INSERT INTO events.resources (
session_id, timestamp, message_id,
type,
url, url_host, url_hostpath,
success, status,
method,
duration, ttfb, header_size, encoded_body_size, decoded_body_size
) VALUES (
$1, $2, $3,
$4,
LEFT($5, 8000), LEFT($6, 300), LEFT($7, 2000),
$8, $9,
NULLIF($10, '')::events.resource_method,
NULLIF($11, 0), NULLIF($12, 0), NULLIF($13, 0), NULLIF($14, 0), NULLIF($15, 0)
NULLIF($10, 0), NULLIF($11, 0), NULLIF($12, 0), NULLIF($13, 0), NULLIF($14, 0)
)`
urlQuery := url.DiscardURLQuery(e.URL)
urlMethod := url.EnsureMethod(e.Method)
conn.batchQueue(sessionID, sqlRequest,
sessionID, e.Timestamp, truncSqIdx(e.MessageID),
e.Type,
sessionID, e.Timestamp, truncSqIdx(e.MsgID()),
msgType,
e.URL, host, urlQuery,
e.Success, e.Status,
urlMethod,
e.Duration != 0, 0,
e.Duration, e.TTFB, e.HeaderSize, e.EncodedBodySize, e.DecodedBodySize,
)
// Record approximate message size
conn.updateBatchSize(sessionID, len(sqlRequest)+len(e.Type)+len(e.URL)+len(host)+len(urlQuery)+len(urlMethod)+8*9+1)
conn.updateBatchSize(sessionID, len(sqlRequest)+len(msgType)+len(e.URL)+len(host)+len(urlQuery)+8*9+1)
return nil
}

View file

@ -2,8 +2,8 @@ package postgres
import (
"log"
"openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/hashid"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/url"
)
@ -57,10 +57,13 @@ func (conn *Conn) InsertWebPageEvent(sessionID uint64, projectID uint32, e *Page
return nil
}
func (conn *Conn) InsertWebClickEvent(sessionID uint64, projectID uint32, e *ClickEvent) error {
func (conn *Conn) InsertWebClickEvent(sessionID uint64, projectID uint32, e *MouseClick) error {
if e.Label == "" {
return nil
}
var host, path string
host, path, _, _ = url.GetURLParts(e.Url)
if err := conn.bulks.Get("webClickEvents").Append(sessionID, truncSqIdx(e.MessageID), e.Timestamp, e.Label, e.Selector, host+path, path); err != nil {
if err := conn.bulks.Get("webClickEvents").Append(sessionID, truncSqIdx(e.MsgID()), e.Timestamp, e.Label, e.Selector, host+path, path, e.HesitationTime); err != nil {
log.Printf("insert web click err: %s", err)
}
// Accumulate session updates and exec inside batch with another sql commands
@ -86,6 +89,22 @@ func (conn *Conn) InsertWebInputEvent(sessionID uint64, projectID uint32, e *Inp
return nil
}
func (conn *Conn) InsertWebInputDuration(sessionID uint64, projectID uint32, e *InputChange) error {
if e.Label == "" {
return nil
}
value := &e.Value
if e.ValueMasked {
value = nil
}
if err := conn.bulks.Get("webInputDurations").Append(sessionID, truncSqIdx(e.ID), e.Timestamp, value, e.Label, e.HesitationTime, e.InputDuration); err != nil {
log.Printf("insert web input event err: %s", err)
}
conn.updateSessionEvents(sessionID, 1, 0)
conn.insertAutocompleteValue(sessionID, projectID, "INPUT", e.Label)
return nil
}
func (conn *Conn) InsertWebErrorEvent(sessionID uint64, projectID uint32, e *types.ErrorEvent) error {
errorID := e.ID(projectID)
if err := conn.bulks.Get("webErrors").Append(errorID, projectID, e.Source, e.Name, e.Message, e.Payload); err != nil {
@ -142,3 +161,15 @@ func (conn *Conn) InsertSessionReferrer(sessionID uint64, referrer string) error
WHERE session_id = $3 AND referrer IS NULL`,
referrer, url.DiscardURLQuery(referrer), sessionID)
}
func (conn *Conn) InsertMouseThrashing(sessionID uint64, projectID uint32, e *MouseThrashing) error {
issueID := hashid.MouseThrashingID(projectID, sessionID, e.Timestamp)
if err := conn.bulks.Get("webIssues").Append(projectID, issueID, "mouse_thrashing", e.Url); err != nil {
log.Printf("insert web issue err: %s", err)
}
if err := conn.bulks.Get("webIssueEvents").Append(sessionID, issueID, e.Timestamp, truncSqIdx(e.MsgID()), nil); err != nil {
log.Printf("insert web issue event err: %s", err)
}
conn.updateSessionIssues(sessionID, 0, 50)
return nil
}

View file

@ -120,3 +120,15 @@ func (e *ErrorEvent) ID(projectID uint32) string {
}
return strconv.FormatUint(uint64(projectID), 16) + hex.EncodeToString(hash.Sum(nil))
}
func WrapCustomEvent(m *CustomEvent) *IssueEvent {
msg := &IssueEvent{
Type: "custom",
Timestamp: m.Time(),
MessageID: m.MsgID(),
ContextString: m.Name,
Payload: m.Payload,
}
msg.Meta().SetMeta(m.Meta())
return msg
}

View file

@ -1,82 +0,0 @@
package custom
import (
"net/url"
"strings"
. "openreplay/backend/pkg/messages"
)
func getURLExtention(URL string) string {
u, err := url.Parse(URL)
if err != nil {
return ""
}
i := strings.LastIndex(u.Path, ".")
return u.Path[i+1:]
}
func getResourceType(initiator string, URL string) string {
switch initiator {
case "xmlhttprequest", "fetch":
return "fetch"
case "img":
return "img"
default:
switch getURLExtention(URL) {
case "css":
return "stylesheet"
case "js":
return "script"
case "png", "gif", "jpg", "jpeg", "svg":
return "img"
case "mp4", "mkv", "ogg", "webm", "avi", "mp3":
return "media"
default:
return "other"
}
}
}
type EventMapper struct{}
func (b *EventMapper) Build() Message {
return nil
}
func (b *EventMapper) Handle(message Message, messageID uint64, timestamp uint64) Message {
switch msg := message.(type) {
case *MouseClick:
if msg.Label != "" {
return &ClickEvent{
MessageID: messageID,
Label: msg.Label,
HesitationTime: msg.HesitationTime,
Timestamp: timestamp,
Selector: msg.Selector,
}
}
case *ResourceTiming:
return &ResourceEvent{
MessageID: messageID,
Timestamp: msg.Timestamp,
Duration: msg.Duration,
TTFB: msg.TTFB,
HeaderSize: msg.HeaderSize,
EncodedBodySize: msg.EncodedBodySize,
DecodedBodySize: msg.DecodedBodySize,
URL: msg.URL,
Type: getResourceType(msg.Initiator, msg.URL),
Success: msg.Duration != 0,
}
case *CustomIssue:
return &IssueEvent{
Type: "custom",
Timestamp: timestamp,
MessageID: messageID,
ContextString: msg.Name,
Payload: msg.Payload,
}
}
return nil
}

View file

@ -4,7 +4,7 @@ import (
. "openreplay/backend/pkg/messages"
)
const INPUT_EVENT_TIMEOUT = 1 * 60 * 1000
const InputEventTimeout = 1 * 60 * 1000
type inputLabels map[uint64]string
@ -24,7 +24,7 @@ func (b *inputEventBuilder) clearLabels() {
b.inputLabels = make(inputLabels)
}
func (b *inputEventBuilder) Handle(message Message, messageID uint64, timestamp uint64) Message {
func (b *inputEventBuilder) Handle(message Message, timestamp uint64) Message {
var inputEvent Message = nil
switch msg := message.(type) {
case *SetInputTarget:
@ -41,7 +41,7 @@ func (b *inputEventBuilder) Handle(message Message, messageID uint64, timestamp
}
if b.inputEvent == nil {
b.inputEvent = &InputEvent{
MessageID: messageID,
MessageID: message.MsgID(),
Timestamp: timestamp,
Value: msg.Value,
ValueMasked: msg.Mask > 0,
@ -59,7 +59,7 @@ func (b *inputEventBuilder) Handle(message Message, messageID uint64, timestamp
return b.Build()
}
if b.inputEvent != nil && b.inputEvent.Timestamp+INPUT_EVENT_TIMEOUT < timestamp {
if b.inputEvent != nil && b.inputEvent.Timestamp+InputEventTimeout < timestamp {
return b.Build()
}
return nil

View file

@ -4,7 +4,7 @@ import (
. "openreplay/backend/pkg/messages"
)
const PAGE_EVENT_TIMEOUT = 1 * 60 * 1000
const PageEventTimeout = 1 * 60 * 1000
type pageEventBuilder struct {
pageEvent *PageEvent
@ -16,7 +16,7 @@ func NewPageEventBuilder() *pageEventBuilder {
return ieBuilder
}
func (b *pageEventBuilder) Handle(message Message, messageID uint64, timestamp uint64) Message {
func (b *pageEventBuilder) Handle(message Message, timestamp uint64) Message {
switch msg := message.(type) {
case *SetPageLocation:
if msg.NavigationStart == 0 { // routing without new page loading
@ -24,7 +24,7 @@ func (b *pageEventBuilder) Handle(message Message, messageID uint64, timestamp u
URL: msg.URL,
Referrer: msg.Referrer,
Loaded: false,
MessageID: messageID,
MessageID: message.MsgID(),
Timestamp: timestamp,
}
} else {
@ -33,7 +33,7 @@ func (b *pageEventBuilder) Handle(message Message, messageID uint64, timestamp u
URL: msg.URL,
Referrer: msg.Referrer,
Loaded: true,
MessageID: messageID,
MessageID: message.MsgID(),
Timestamp: timestamp,
}
return pageEvent
@ -81,7 +81,7 @@ func (b *pageEventBuilder) Handle(message Message, messageID uint64, timestamp u
}
if b.pageEvent != nil && b.pageEvent.Timestamp+PAGE_EVENT_TIMEOUT < timestamp {
if b.pageEvent != nil && b.pageEvent.Timestamp+PageEventTimeout < timestamp {
return b.Build()
}
return nil

View file

@ -48,7 +48,7 @@ func (h *ClickRageDetector) Handle(message Message, messageID uint64, timestamp
}
func (h *ClickRageDetector) Build() Message {
if h.countsInARow >= web.MIN_CLICKS_IN_A_ROW {
if h.countsInARow >= web.MinClicksInARow {
event := &IOSIssueEvent{
Type: "click_rage",
ContextString: h.lastLabel,

View file

@ -6,6 +6,6 @@ import . "openreplay/backend/pkg/messages"
// U can create your own message handler and easily connect to heuristics service
type MessageProcessor interface {
Handle(message Message, messageID uint64, timestamp uint64) Message
Handle(message Message, timestamp uint64) Message
Build() Message
}

View file

@ -7,14 +7,8 @@ import (
. "openreplay/backend/pkg/messages"
)
/*
Handler name: ClickRage
Input event: MouseClick
Output event: IssueEvent
*/
const MAX_TIME_DIFF = 300
const MIN_CLICKS_IN_A_ROW = 3
const MaxTimeDiff = 300
const MinClicksInARow = 3
type ClickRageDetector struct {
lastTimestamp uint64
@ -34,46 +28,54 @@ func (crd *ClickRageDetector) reset() {
crd.url = ""
}
func (crd *ClickRageDetector) Build() Message {
defer crd.reset()
if crd.countsInARow >= MIN_CLICKS_IN_A_ROW {
payload, err := json.Marshal(struct{ Count int }{crd.countsInARow})
if err != nil {
log.Printf("can't marshal ClickRage payload to json: %s", err)
}
event := &IssueEvent{
Type: "click_rage",
ContextString: crd.lastLabel,
Payload: string(payload),
Timestamp: crd.firstInARawTimestamp,
MessageID: crd.firstInARawMessageId,
URL: crd.url,
}
return event
func (crd *ClickRageDetector) createPayload() string {
p, err := json.Marshal(struct{ Count int }{crd.countsInARow})
if err != nil {
log.Printf("can't marshal ClickRage payload to json: %s", err)
return ""
}
return nil
return string(p)
}
func (crd *ClickRageDetector) Handle(message Message, messageID uint64, timestamp uint64) Message {
func (crd *ClickRageDetector) Build() Message {
defer crd.reset()
if crd.countsInARow < MinClicksInARow {
return nil
}
return &IssueEvent{
Type: "click_rage",
ContextString: crd.lastLabel,
Payload: crd.createPayload(),
Timestamp: crd.firstInARawTimestamp,
MessageID: crd.firstInARawMessageId,
URL: crd.url,
}
}
func (crd *ClickRageDetector) Handle(message Message, timestamp uint64) Message {
switch msg := message.(type) {
case *MouseClick:
// Set click url
if crd.url == "" && msg.Url != "" {
crd.url = msg.Url
}
// TODO: check if we it is ok to capture clickRage event without the connected ClickEvent in db.
// Click on different object -> build if we can and reset the builder
if msg.Label == "" {
return crd.Build()
}
if crd.lastLabel == msg.Label && timestamp-crd.lastTimestamp < MAX_TIME_DIFF {
// Update builder with last information
if crd.lastLabel == msg.Label && timestamp-crd.lastTimestamp < MaxTimeDiff {
crd.lastTimestamp = timestamp
crd.countsInARow += 1
return nil
}
// Try to build event
event := crd.Build()
// Use current message as init values for new event
crd.lastTimestamp = timestamp
crd.lastLabel = msg.Label
crd.firstInARawTimestamp = timestamp
crd.firstInARawMessageId = messageID
crd.firstInARawMessageId = message.MsgID()
crd.countsInARow = 1
if crd.url == "" && msg.Url != "" {
crd.url = msg.Url

View file

@ -15,8 +15,8 @@ import (
Output event: IssueEvent
*/
const CPU_THRESHOLD = 70 // % out of 100
const CPU_MIN_DURATION_TRIGGER = 6 * 1000
const CpuThreshold = 70 // % out of 100
const CpuMinDurationTrigger = 6 * 1000
type CpuIssueDetector struct {
startTimestamp uint64
@ -26,65 +26,61 @@ type CpuIssueDetector struct {
contextString string
}
func (f *CpuIssueDetector) Build() Message {
if f.startTimestamp == 0 {
return nil
}
duration := f.lastTimestamp - f.startTimestamp
timestamp := f.startTimestamp
messageID := f.startMessageID
maxRate := f.maxRate
f.startTimestamp = 0
f.startMessageID = 0
f.maxRate = 0
if duration < CPU_MIN_DURATION_TRIGGER {
return nil
}
payload, err := json.Marshal(struct {
func (f *CpuIssueDetector) createPayload() string {
p, err := json.Marshal(struct {
Duration uint64
Rate uint64
}{duration, maxRate})
}{f.duration(), f.maxRate})
if err != nil {
log.Printf("can't marshal CpuIssue payload to json: %s", err)
}
return string(p)
}
func (f *CpuIssueDetector) duration() uint64 {
return f.lastTimestamp - f.startTimestamp
}
func (f *CpuIssueDetector) reset() {
f.startTimestamp = 0
f.startMessageID = 0
f.maxRate = 0
}
func (f *CpuIssueDetector) Build() Message {
defer f.reset()
if f.startTimestamp == 0 || f.duration() < CpuMinDurationTrigger {
return nil
}
return &IssueEvent{
Type: "cpu",
Timestamp: timestamp,
MessageID: messageID,
Timestamp: f.startTimestamp,
MessageID: f.startMessageID,
ContextString: f.contextString,
Payload: string(payload),
Payload: f.createPayload(),
}
}
func (f *CpuIssueDetector) Handle(message Message, messageID uint64, timestamp uint64) Message {
func (f *CpuIssueDetector) Handle(message Message, timestamp uint64) Message {
switch msg := message.(type) {
case *PerformanceTrack:
dt := performance.TimeDiff(timestamp, f.lastTimestamp)
if dt == 0 {
return nil // TODO: handle error
// Ignore if it's a wrong message order
if timestamp < f.lastTimestamp {
return nil
}
f.lastTimestamp = timestamp
if msg.Frames == -1 || msg.Ticks == -1 {
cpuRate := performance.CPURate(msg.Ticks, performance.TimeDiff(timestamp, f.lastTimestamp))
// Build event if cpu issue have gone
if msg.Frames == -1 || msg.Ticks == -1 || cpuRate < CpuThreshold {
return f.Build()
}
cpuRate := performance.CPURate(msg.Ticks, dt)
if cpuRate >= CPU_THRESHOLD {
if f.startTimestamp == 0 {
f.startTimestamp = timestamp
f.startMessageID = messageID
}
if f.maxRate < cpuRate {
f.maxRate = cpuRate
}
} else {
return f.Build()
// Update values
if f.startTimestamp == 0 {
f.startTimestamp = timestamp
f.startMessageID = message.MsgID()
}
if f.maxRate < cpuRate {
f.maxRate = cpuRate
}
case *SetPageLocation:
f.contextString = msg.URL

View file

@ -4,43 +4,39 @@ import (
. "openreplay/backend/pkg/messages"
)
/*
Handler name: DeadClick
Input events: SetInputTarget,
CreateDocument,
MouseClick,
SetNodeAttribute,
RemoveNodeAttribute,
CreateElementNode,
CreateTextNode,
MoveNode,
RemoveNode,
SetCSSData,
CSSInsertRule,
CSSDeleteRule
Output event: IssueEvent
*/
const CLICK_RELATION_TIME = 1234
const ClickRelationTime = 1234
type DeadClickDetector struct {
lastTimestamp uint64
lastMouseClick *MouseClick
lastTimestamp uint64
lastClickTimestamp uint64
lastMessageID uint64
inputIDSet map[uint64]bool
}
func NewDeadClickDetector() *DeadClickDetector {
return &DeadClickDetector{inputIDSet: make(map[uint64]bool)}
}
func (d *DeadClickDetector) addInputID(id uint64) {
d.inputIDSet[id] = true
}
func (d *DeadClickDetector) clearInputIDs() {
d.inputIDSet = make(map[uint64]bool)
}
func (d *DeadClickDetector) reset() {
d.inputIDSet = nil
d.lastMouseClick = nil
d.lastClickTimestamp = 0
d.lastMessageID = 0
d.clearInputIDs()
}
func (d *DeadClickDetector) build(timestamp uint64) Message {
func (d *DeadClickDetector) Build() Message {
// remove reset from external Build call
defer d.reset()
if d.lastMouseClick == nil || d.lastClickTimestamp+CLICK_RELATION_TIME > timestamp { // reaction is instant
if d.lastMouseClick == nil || d.lastClickTimestamp+ClickRelationTime > d.lastTimestamp { // reaction is instant
return nil
}
event := &IssueEvent{
@ -52,42 +48,37 @@ func (d *DeadClickDetector) build(timestamp uint64) Message {
return event
}
func (d *DeadClickDetector) Build() Message {
return d.build(d.lastTimestamp)
}
func (d *DeadClickDetector) Handle(message Message, messageID uint64, timestamp uint64) Message {
func (d *DeadClickDetector) Handle(message Message, timestamp uint64) Message {
d.lastTimestamp = timestamp
switch msg := message.(type) {
case *SetInputTarget:
if d.inputIDSet == nil {
d.inputIDSet = make(map[uint64]bool)
}
d.inputIDSet[msg.ID] = true
d.addInputID(msg.ID)
case *CreateDocument:
d.inputIDSet = nil
d.clearInputIDs()
case *MouseClick:
if msg.Label == "" {
return nil
}
event := d.build(timestamp)
if d.inputIDSet[msg.ID] { // ignore if input
isInputEvent := d.inputIDSet[msg.ID]
event := d.Build()
if isInputEvent {
return event
}
d.lastMouseClick = msg
d.lastClickTimestamp = timestamp
d.lastMessageID = messageID
d.lastMessageID = message.MsgID()
return event
case *SetNodeAttribute,
*RemoveNodeAttribute,
*CreateElementNode,
*CreateTextNode,
*SetNodeFocus,
*MoveNode,
*RemoveNode,
*SetCSSData,
*CSSInsertRule,
*CSSDeleteRule:
return d.build(timestamp)
return d.Build()
}
return nil
}

View file

@ -1,55 +0,0 @@
package web
import (
. "openreplay/backend/pkg/messages"
)
/*
Handler name: DomDrop
Input events: CreateElementNode,
CreateTextNode,
RemoveNode
Output event: DOMDrop
*/
const DROP_WINDOW = 200 //ms
const CRITICAL_COUNT = 1 // Our login page contains 20. But on crush it removes only roots (1-3 nodes).
// TODO: smart detection (making whole DOM tree would eat all memory)
type domDropDetector struct {
removedCount int
lastDropTimestamp uint64
}
func (dd *domDropDetector) reset() {
dd.removedCount = 0
dd.lastDropTimestamp = 0
}
func (dd *domDropDetector) Handle(message Message, _ uint64, timestamp uint64) Message {
switch message.(type) {
case *CreateElementNode,
*CreateTextNode:
dd.removedCount = 0
dd.lastDropTimestamp = 0
case *RemoveNode:
if dd.lastDropTimestamp+DROP_WINDOW > timestamp {
dd.removedCount += 1
} else {
dd.removedCount = 1
}
dd.lastDropTimestamp = timestamp
}
return nil
}
func (dd *domDropDetector) Build() Message {
defer dd.reset()
if dd.removedCount >= CRITICAL_COUNT {
domDrop := &DOMDrop{
Timestamp: dd.lastDropTimestamp,
}
return domDrop
}
return nil
}

Some files were not shown because too many files have changed in this diff Show more