pulled webpack changes and resolved conflicts

This commit is contained in:
Shekar Siri 2022-06-08 16:04:52 +02:00
commit f235da44ab
1264 changed files with 17175 additions and 62111 deletions

View file

@ -1,5 +1,6 @@
# This action will push the chalice changes to aws
on:
workflow_dispatch:
push:
branches:
- dev
@ -31,27 +32,64 @@ jobs:
kubeconfig: ${{ secrets.EE_KUBECONFIG }} # Use content of kubeconfig in secret.
id: setcontext
# Caching docker images
- uses: satackey/action-docker-layer-caching@v0.0.11
# Ignore the failure of a step and avoid terminating the job.
continue-on-error: true
- name: Building and Pusing api image
id: build-image
env:
DOCKER_REPO: ${{ secrets.EE_REGISTRY_URL }}
IMAGE_TAG: ee-${{ github.sha }}
IMAGE_TAG: ${{ github.sha }}-ee
ENVIRONMENT: staging
run: |
cd api
PUSH_IMAGE=1 bash build.sh ee
- name: Creating old image input
run: |
#
# Create yaml with existing image tags
#
kubectl get pods -n app -o jsonpath="{.items[*].spec.containers[*].image}" |\
tr -s '[[:space:]]' '\n' | sort | uniq -c | grep '/foss/' | cut -d '/' -f3 > /tmp/image_tag.txt
echo > /tmp/image_override.yaml
for line in `cat /tmp/image_tag.txt`;
do
image_array=($(echo "$line" | tr ':' '\n'))
cat <<EOF >> /tmp/image_override.yaml
${image_array[0]}:
image:
# We've to strip off the -ee, as helm will append it.
tag: `echo ${image_array[1]} | cut -d '-' -f 1`
EOF
done
- name: Deploy to kubernetes
run: |
cd scripts/helm/
sed -i "s#minio_access_key.*#minio_access_key: \"${{ secrets.EE_MINIO_ACCESS_KEY }}\" #g" vars.yaml
sed -i "s#minio_secret_key.*#minio_secret_key: \"${{ secrets.EE_MINIO_SECRET_KEY }}\" #g" vars.yaml
sed -i "s#domain_name.*#domain_name: \"foss.openreplay.com\" #g" vars.yaml
sed -i "s#kubeconfig.*#kubeconfig_path: ${KUBECONFIG}#g" vars.yaml
sed -i "s/image_tag:.*/image_tag: \"$IMAGE_TAG\"/g" vars.yaml
bash kube-install.sh --app chalice
cd scripts/helmcharts/
## Update secerts
sed -i "s/postgresqlPassword: \"changeMePassword\"/postgresqlPassword: \"${{ secrets.EE_PG_PASSWORD }}\"/g" vars.yaml
sed -i "s/accessKey: \"changeMeMinioAccessKey\"/accessKey: \"${{ secrets.EE_MINIO_ACCESS_KEY }}\"/g" vars.yaml
sed -i "s/secretKey: \"changeMeMinioPassword\"/secretKey: \"${{ secrets.EE_MINIO_SECRET_KEY }}\"/g" vars.yaml
sed -i "s/jwt_secret: \"SetARandomStringHere\"/jwt_secret: \"${{ secrets.EE_JWT_SECRET }}\"/g" vars.yaml
sed -i "s/domainName: \"\"/domainName: \"${{ secrets.EE_DOMAIN_NAME }}\"/g" vars.yaml
sed -i "s/enterpriseEditionLicense: \"\"/enterpriseEditionLicense: \"${{ secrets.EE_LICENSE_KEY }}\"/g" vars.yaml
# Update changed image tag
sed -i "/chalice/{n;n;s/.*/ tag: ${IMAGE_TAG}/}" /tmp/image_override.yaml
cat /tmp/image_override.yaml
# Deploy command
helm upgrade --install openreplay -n app openreplay -f vars.yaml -f /tmp/image_override.yaml
env:
DOCKER_REPO: ${{ secrets.EE_REGISTRY_URL }}
IMAGE_TAG: ee-${{ github.sha }}
# We're not passing -ee flag, because helm will add that.
IMAGE_TAG: ${{ github.sha }}
ENVIRONMENT: staging
# - name: Debug Job
@ -59,6 +97,6 @@ jobs:
# uses: mxschmitt/action-tmate@v3
# env:
# DOCKER_REPO: ${{ secrets.EE_REGISTRY_URL }}
# IMAGE_TAG: ee-${{ github.sha }}
# IMAGE_TAG: ${{ github.sha }}-ee
# ENVIRONMENT: staging
#

View file

@ -3,6 +3,7 @@ on:
workflow_dispatch:
push:
branches:
- dev
- api-v1.5.5
paths:
- api/**
@ -32,6 +33,12 @@ jobs:
kubeconfig: ${{ secrets.OSS_KUBECONFIG }} # Use content of kubeconfig in secret.
id: setcontext
# Caching docker images
- uses: satackey/action-docker-layer-caching@v0.0.11
# Ignore the failure of a step and avoid terminating the job.
continue-on-error: true
- name: Building and Pusing api image
id: build-image
env:
@ -41,15 +48,43 @@ jobs:
run: |
cd api
PUSH_IMAGE=1 bash build.sh
- name: Creating old image input
run: |
#
# Create yaml with existing image tags
#
kubectl get pods -n app -o jsonpath="{.items[*].spec.containers[*].image}" |\
tr -s '[[:space:]]' '\n' | sort | uniq -c | grep '/foss/' | cut -d '/' -f3 > /tmp/image_tag.txt
echo > /tmp/image_override.yaml
for line in `cat /tmp/image_tag.txt`;
do
image_array=($(echo "$line" | tr ':' '\n'))
cat <<EOF >> /tmp/image_override.yaml
${image_array[0]}:
image:
tag: ${image_array[1]}
EOF
done
- name: Deploy to kubernetes
run: |
cd scripts/helm/
sed -i "s#minio_access_key.*#minio_access_key: \"${{ secrets.OSS_MINIO_ACCESS_KEY }}\" #g" vars.yaml
sed -i "s#minio_secret_key.*#minio_secret_key: \"${{ secrets.OSS_MINIO_SECRET_KEY }}\" #g" vars.yaml
sed -i "s#domain_name.*#domain_name: \"foss.openreplay.com\" #g" vars.yaml
sed -i "s#kubeconfig.*#kubeconfig_path: ${KUBECONFIG}#g" vars.yaml
sed -i "s/image_tag:.*/image_tag: \"$IMAGE_TAG\"/g" vars.yaml
bash kube-install.sh --app chalice
cd scripts/helmcharts/
## Update secerts
sed -i "s/postgresqlPassword: \"changeMePassword\"/postgresqlPassword: \"${{ secrets.OSS_PG_PASSWORD }}\"/g" vars.yaml
sed -i "s/accessKey: \"changeMeMinioAccessKey\"/accessKey: \"${{ secrets.OSS_MINIO_ACCESS_KEY }}\"/g" vars.yaml
sed -i "s/secretKey: \"changeMeMinioPassword\"/secretKey: \"${{ secrets.OSS_MINIO_SECRET_KEY }}\"/g" vars.yaml
sed -i "s/jwt_secret: \"SetARandomStringHere\"/jwt_secret: \"${{ secrets.OSS_JWT_SECRET }}\"/g" vars.yaml
sed -i "s/domainName: \"\"/domainName: \"${{ secrets.OSS_DOMAIN_NAME }}\"/g" vars.yaml
# Update changed image tag
sed -i "/chalice/{n;n;s/.*/ tag: ${IMAGE_TAG}/}" /tmp/image_override.yaml
cat /tmp/image_override.yaml
# Deploy command
helm upgrade --install openreplay -n app openreplay -f vars.yaml -f /tmp/image_override.yaml
env:
DOCKER_REPO: ${{ secrets.OSS_REGISTRY_URL }}
IMAGE_TAG: ${{ github.sha }}

View file

@ -1,5 +1,6 @@
# This action will push the utilities changes to aws
on:
workflow_dispatch:
push:
branches:
- dev

View file

@ -1,6 +1,7 @@
# Ref: https://docs.github.com/en/actions/reference/workflow-syntax-for-github-actions
on:
workflow_dispatch:
push:
branches:
- dev
@ -33,11 +34,16 @@ jobs:
kubeconfig: ${{ secrets.EE_KUBECONFIG }} # Use content of kubeconfig in secret.
id: setcontext
- name: Build, tag, and Deploy to k8s
# Caching docker images
- uses: satackey/action-docker-layer-caching@v0.0.11
# Ignore the failure of a step and avoid terminating the job.
continue-on-error: true
- name: Build, tag
id: build-image
env:
DOCKER_REPO: ${{ secrets.EE_REGISTRY_URL }}
IMAGE_TAG: ee-${{ github.sha }}
IMAGE_TAG: ${{ github.sha }}-ee
ENVIRONMENT: staging
run: |
#
@ -47,41 +53,79 @@ jobs:
#
# Getting the images to build
#
set -x
{
git diff --name-only HEAD HEAD~1 | grep -E "backend/cmd|backend/services" | grep -vE ^ee/ | cut -d '/' -f3
git diff --name-only HEAD HEAD~1 | grep -E "backend/pkg|backend/internal" | grep -vE ^ee/ | cut -d '/' -f3 | uniq | while read -r pkg_name ; do
grep -rl "pkg/$pkg_name" backend/services backend/cmd | cut -d '/' -f3
done
} | uniq > backend/images_to_build.txt
[[ $(cat backend/images_to_build.txt) != "" ]] || (echo "Nothing to build here"; exit 0)
} | uniq > /tmp/images_to_build.txt
[[ $(cat /tmp/images_to_build.txt) != "" ]] || (echo "Nothing to build here"; exit 0)
#
# Pushing image to registry
#
cd backend
for image in $(cat images_to_build.txt);
for image in $(cat /tmp/images_to_build.txt);
do
echo "Bulding $image"
PUSH_IMAGE=1 bash -x ./build.sh ee $image
echo "::set-output name=image::$DOCKER_REPO/$image:$IMAGE_TAG"
done
- name: Creating old image input
env:
IMAGE_TAG: ${{ github.sha }}
run: |
#
# Create yaml with existing image tags
#
kubectl get pods -n app -o jsonpath="{.items[*].spec.containers[*].image}" |\
tr -s '[[:space:]]' '\n' | sort | uniq -c | grep '/foss/' | cut -d '/' -f3 > /tmp/image_tag.txt
echo > /tmp/image_override.yaml
for line in `cat /tmp/image_tag.txt`;
do
image_array=($(echo "$line" | tr ':' '\n'))
cat <<EOF >> /tmp/image_override.yaml
${image_array[0]}:
image:
# We've to strip off the -ee, as helm will append it.
tag: `echo ${image_array[1]} | cut -d '-' -f 1`
EOF
done
- name: Deploying to kuberntes
env:
# We're not passing -ee flag, because helm will add that.
IMAGE_TAG: ${{ github.sha }}
run: |
#
# Deploying image to environment.
#
cd ../scripts/helm/
sed -i "s#minio_access_key.*#minio_access_key: \"${{ secrets.EE_MINIO_ACCESS_KEY }}\" #g" vars.yaml
sed -i "s#minio_secret_key.*#minio_secret_key: \"${{ secrets.EE_MINIO_SECRET_KEY }}\" #g" vars.yaml
sed -i "s#jwt_secret_key.*#jwt_secret_key: \"${{ secrets.EE_JWT_SECRET }}\" #g" vars.yaml
sed -i "s#domain_name.*#domain_name: \"foss.openreplay.com\" #g" vars.yaml
sed -i "s#kubeconfig.*#kubeconfig_path: ${KUBECONFIG}#g" vars.yaml
for image in $(cat ../../backend/images_to_build.txt);
cd scripts/helmcharts/
## Update secerts
sed -i "s/postgresqlPassword: \"changeMePassword\"/postgresqlPassword: \"${{ secrets.EE_PG_PASSWORD }}\"/g" vars.yaml
sed -i "s/accessKey: \"changeMeMinioAccessKey\"/accessKey: \"${{ secrets.EE_MINIO_ACCESS_KEY }}\"/g" vars.yaml
sed -i "s/secretKey: \"changeMeMinioPassword\"/secretKey: \"${{ secrets.EE_MINIO_SECRET_KEY }}\"/g" vars.yaml
sed -i "s/jwt_secret: \"SetARandomStringHere\"/jwt_secret: \"${{ secrets.EE_JWT_SECRET }}\"/g" vars.yaml
sed -i "s/domainName: \"\"/domainName: \"${{ secrets.EE_DOMAIN_NAME }}\"/g" vars.yaml
sed -i "s/enterpriseEditionLicense: \"\"/enterpriseEditionLicense: \"${{ secrets.EE_LICENSE_KEY }}\"/g" vars.yaml
## Update images
for image in $(cat /tmp/images_to_build.txt);
do
sed -i "s/image_tag:.*/image_tag: \"$IMAGE_TAG\"/g" vars.yaml
# Deploy command
bash openreplay-cli --install $image
sed -i "/${image}/{n;n;s/.*/ tag: ${IMAGE_TAG}/}" /tmp/image_override.yaml
done
cat /tmp/image_override.yaml
# Deploy command
helm upgrade --install openreplay -n app openreplay -f vars.yaml -f /tmp/image_override.yaml
# - name: Debug Job
# if: ${{ failure() }}
# uses: mxschmitt/action-tmate@v3

View file

@ -1,6 +1,7 @@
# Ref: https://docs.github.com/en/actions/reference/workflow-syntax-for-github-actions
on:
workflow_dispatch:
push:
branches:
- dev
@ -33,7 +34,12 @@ jobs:
kubeconfig: ${{ secrets.OSS_KUBECONFIG }} # Use content of kubeconfig in secret.
id: setcontext
- name: Build, tag, and Deploy to k8s
# Caching docker images
- uses: satackey/action-docker-layer-caching@v0.0.11
# Ignore the failure of a step and avoid terminating the job.
continue-on-error: true
- name: Build, tag
id: build-image
env:
DOCKER_REPO: ${{ secrets.OSS_REGISTRY_URL }}
@ -47,42 +53,73 @@ jobs:
#
# Getting the images to build
#
set -x
{
git diff --name-only HEAD HEAD~1 | grep -E "backend/cmd|backend/services" | grep -vE ^ee/ | cut -d '/' -f3
git diff --name-only HEAD HEAD~1 | grep -E "backend/pkg|backend/internal" | grep -vE ^ee/ | cut -d '/' -f3 | uniq | while read -r pkg_name ; do
grep -rl "pkg/$pkg_name" backend/services backend/cmd | cut -d '/' -f3
done
} | uniq > backend/images_to_build.txt
} | uniq > /tmp/images_to_build.txt
[[ $(cat backend/images_to_build.txt) != "" ]] || (echo "Nothing to build here"; exit 0)
[[ $(cat /tmp/images_to_build.txt) != "" ]] || (echo "Nothing to build here"; exit 0)
#
# Pushing image to registry
#
cd backend
for image in $(cat images_to_build.txt);
for image in $(cat /tmp/images_to_build.txt);
do
echo "Bulding $image"
PUSH_IMAGE=1 bash -x ./build.sh skip $image
echo "::set-output name=image::$DOCKER_REPO/$image:$IMAGE_TAG"
done
- name: Creating old image input
env:
IMAGE_TAG: ${{ github.sha }}
run: |
#
# Create yaml with existing image tags
#
kubectl get pods -n app -o jsonpath="{.items[*].spec.containers[*].image}" |\
tr -s '[[:space:]]' '\n' | sort | uniq -c | grep '/foss/' | cut -d '/' -f3 > /tmp/image_tag.txt
echo > /tmp/image_override.yaml
for line in `cat /tmp/image_tag.txt`;
do
image_array=($(echo "$line" | tr ':' '\n'))
cat <<EOF >> /tmp/image_override.yaml
${image_array[0]}:
image:
tag: ${image_array[1]}
EOF
done
- name: Deploying to kuberntes
env:
IMAGE_TAG: ${{ github.sha }}
run: |
#
# Deploying image to environment.
#
cd ../scripts/helm/
sed -i "s#minio_access_key.*#minio_access_key: \"${{ secrets.OSS_MINIO_ACCESS_KEY }}\" #g" vars.yaml
sed -i "s#minio_secret_key.*#minio_secret_key: \"${{ secrets.OSS_MINIO_SECRET_KEY }}\" #g" vars.yaml
sed -i "s#domain_name.*#domain_name: \"foss.openreplay.com\" #g" vars.yaml
sed -i "s#kubeconfig.*#kubeconfig_path: ${KUBECONFIG}#g" vars.yaml
for image in $(cat ../../backend/images_to_build.txt);
cd scripts/helmcharts/
## Update secerts
sed -i "s/postgresqlPassword: \"changeMePassword\"/postgresqlPassword: \"${{ secrets.OSS_PG_PASSWORD }}\"/g" vars.yaml
sed -i "s/accessKey: \"changeMeMinioAccessKey\"/accessKey: \"${{ secrets.OSS_MINIO_ACCESS_KEY }}\"/g" vars.yaml
sed -i "s/secretKey: \"changeMeMinioPassword\"/secretKey: \"${{ secrets.OSS_MINIO_SECRET_KEY }}\"/g" vars.yaml
sed -i "s/jwt_secret: \"SetARandomStringHere\"/jwt_secret: \"${{ secrets.OSS_JWT_SECRET }}\"/g" vars.yaml
sed -i "s/domainName: \"\"/domainName: \"${{ secrets.OSS_DOMAIN_NAME }}\"/g" vars.yaml
## Update images
for image in $(cat /tmp/images_to_build.txt);
do
sed -i "s/image_tag:.*/image_tag: \"$IMAGE_TAG\"/g" vars.yaml
# Deploy command
bash kube-install.sh --app $image
sed -i "/${image}/{n;n;s/.*/ tag: ${IMAGE_TAG}/}" /tmp/image_override.yaml
done
# Deploy command
helm upgrade --install openreplay -n app openreplay -f vars.yaml -f /tmp/image_override.yaml
# - name: Debug Job
# if: ${{ failure() }}
# uses: mxschmitt/action-tmate@v3

View file

@ -10,13 +10,15 @@ RUN go mod download
FROM prepare AS build
COPY cmd cmd
COPY pkg pkg
COPY services services
COPY internal internal
ARG SERVICE_NAME
RUN CGO_ENABLED=1 GOOS=linux GOARCH=amd64 go build -o service -tags musl openreplay/backend/services/$SERVICE_NAME
RUN CGO_ENABLED=1 GOOS=linux GOARCH=amd64 go build -o service -tags musl openreplay/backend/cmd/$SERVICE_NAME
FROM alpine
FROM alpine AS entrypoint
RUN apk add --no-cache ca-certificates
ENV TZ=UTC \
@ -28,7 +30,7 @@ ENV TZ=UTC \
BEACON_SIZE_LIMIT=7000000 \
KAFKA_USE_SSL=true \
KAFKA_MAX_POLL_INTERVAL_MS=400000 \
REDIS_STREAMS_MAX_LEN=3000 \
REDIS_STREAMS_MAX_LEN=5000 \
TOPIC_RAW_WEB=raw \
TOPIC_RAW_IOS=raw-ios \
TOPIC_CACHE=cache \
@ -39,12 +41,14 @@ ENV TZ=UTC \
GROUP_DB=db \
GROUP_ENDER=ender \
GROUP_CACHE=cache \
GROUP_HEURISTICS=heuristics \
AWS_REGION_WEB=eu-central-1 \
AWS_REGION_IOS=eu-west-1 \
AWS_REGION_ASSETS=eu-central-1 \
CACHE_ASSETS=true \
ASSETS_SIZE_LIMIT=6291456 \
FS_CLEAN_HRS=72 \
FILE_SPLIT_SIZE=300000 \
LOG_QUEUE_STATS_INTERVAL_SEC=60

View file

@ -10,13 +10,13 @@ RUN go mod download
FROM prepare AS build
COPY cmd cmd
COPY pkg pkg
COPY services services
COPY internal internal
RUN for name in assets db ender http integrations sink storage;do CGO_ENABLED=1 GOOS=linux GOARCH=amd64 go build -o bin/$name -tags musl openreplay/backend/services/$name; done
RUN for name in assets db ender http integrations sink storage;do CGO_ENABLED=1 GOOS=linux GOARCH=amd64 go build -o bin/$name -tags musl openreplay/backend/cmd/$name; done
FROM alpine
FROM alpine AS entrypoint
#FROM pygmy/alpine-tini:latest
RUN apk add --no-cache ca-certificates
@ -46,6 +46,7 @@ ENV TZ=UTC \
CACHE_ASSETS=true \
ASSETS_SIZE_LIMIT=6291456 \
FS_CLEAN_HRS=12 \
FILE_SPLIT_SIZE=300000 \
LOG_QUEUE_STATS_INTERVAL_SEC=60
RUN mkdir $FS_DIR

View file

@ -18,25 +18,13 @@ check_prereq() {
return
}
function build_service() {
image="$1"
echo "BUILDING $image"
case "$image" in
http | db | sink | ender | heuristics | storage | assets)
echo build http
docker build -t ${DOCKER_REPO:-'local'}/$image:${git_sha1} --platform linux/amd64 --build-arg SERVICE_NAME=$image -f ./cmd/Dockerfile .
[[ $PUSH_IMAGE -eq 1 ]] && {
docker push ${DOCKER_REPO:-'local'}/$image:${git_sha1}
}
;;
*)
docker build -t ${DOCKER_REPO:-'local'}/$image:${git_sha1} --platform linux/amd64 --build-arg SERVICE_NAME=$image .
[[ $PUSH_IMAGE -eq 1 ]] && {
docker push ${DOCKER_REPO:-'local'}/$image:${git_sha1}
}
;;
esac
docker build -t ${DOCKER_REPO:-'local'}/$image:${git_sha1} --platform linux/amd64 --build-arg SERVICE_NAME=$image .
[[ $PUSH_IMAGE -eq 1 ]] && {
docker push ${DOCKER_REPO:-'local'}/$image:${git_sha1}
}
return
}
@ -50,7 +38,7 @@ function build_api(){
build_service $2
return
}
for image in $(ls services);
for image in $(ls cmd);
do
build_service $image
echo "::set-output name=image::${DOCKER_REPO:-'local'}/$image:${git_sha1}"

View file

@ -1,60 +0,0 @@
FROM golang:1.18-alpine3.15 AS prepare
RUN apk add --no-cache git openssh openssl-dev pkgconf gcc g++ make libc-dev bash
WORKDIR /root
COPY go.mod .
COPY go.sum .
RUN go mod download
FROM prepare AS build
COPY pkg pkg
COPY services services
COPY internal internal
COPY cmd cmd
ARG SERVICE_NAME
RUN CGO_ENABLED=1 GOOS=linux GOARCH=amd64 go build -o service -tags musl openreplay/backend/cmd/$SERVICE_NAME
FROM alpine AS entrypoint
RUN apk add --no-cache ca-certificates
ENV TZ=UTC \
FS_ULIMIT=1000 \
FS_DIR=/mnt/efs \
MAXMINDDB_FILE=/root/geoip.mmdb \
UAPARSER_FILE=/root/regexes.yaml \
HTTP_PORT=80 \
BEACON_SIZE_LIMIT=7000000 \
KAFKA_USE_SSL=true \
KAFKA_MAX_POLL_INTERVAL_MS=400000 \
REDIS_STREAMS_MAX_LEN=3000 \
TOPIC_RAW_WEB=raw \
TOPIC_RAW_IOS=raw-ios \
TOPIC_CACHE=cache \
TOPIC_ANALYTICS=analytics \
TOPIC_TRIGGER=trigger \
GROUP_SINK=sink \
GROUP_STORAGE=storage \
GROUP_DB=db \
GROUP_ENDER=ender \
GROUP_CACHE=cache \
AWS_REGION_WEB=eu-central-1 \
AWS_REGION_IOS=eu-west-1 \
AWS_REGION_ASSETS=eu-central-1 \
CACHE_ASSETS=true \
ASSETS_SIZE_LIMIT=6291456 \
FS_CLEAN_HRS=72 \
LOG_QUEUE_STATS_INTERVAL_SEC=60
ARG SERVICE_NAME
RUN if [ "$SERVICE_NAME" = "http" ]; then \
wget https://raw.githubusercontent.com/ua-parser/uap-core/master/regexes.yaml -O "$UAPARSER_FILE" &&\
wget https://static.openreplay.com/geoip/GeoLite2-Country.mmdb -O "$MAXMINDDB_FILE"; fi
COPY --from=build /root/service /root/service
ENTRYPOINT /root/service

View file

@ -1,7 +1,9 @@
package main
import (
"context"
"log"
"openreplay/backend/pkg/monitoring"
"os"
"os/signal"
"syscall"
@ -15,7 +17,13 @@ import (
"openreplay/backend/pkg/queue/types"
)
/*
Assets
*/
func main() {
metrics := monitoring.New("assets")
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := config.New()
@ -27,6 +35,11 @@ func main() {
cfg.AssetsSizeLimit,
)
totalAssets, err := metrics.RegisterCounter("assets_total")
if err != nil {
log.Printf("can't create assets_total metric: %s", err)
}
consumer := queue.NewMessageConsumer(
cfg.GroupCache,
[]string{cfg.TopicCache},
@ -34,6 +47,7 @@ func main() {
switch msg := message.(type) {
case *messages.AssetCache:
cacher.CacheURL(sessionID, msg.URL)
totalAssets.Add(context.Background(), 1)
case *messages.ErrorEvent:
if msg.Source != "js_exception" {
return

View file

@ -2,11 +2,11 @@ package main
import (
"log"
"openreplay/backend/internal/builder"
"openreplay/backend/internal/config/db"
"openreplay/backend/internal/datasaver"
"openreplay/backend/internal/handlers"
"openreplay/backend/internal/handlers/custom"
"openreplay/backend/internal/db/datasaver"
"openreplay/backend/pkg/handlers"
custom2 "openreplay/backend/pkg/handlers/custom"
"openreplay/backend/pkg/sessions"
"time"
"os"
@ -21,6 +21,10 @@ import (
"openreplay/backend/pkg/queue/types"
)
/*
DB
*/
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
@ -33,14 +37,14 @@ func main() {
// HandlersFabric returns the list of message handlers we want to be applied to each incoming message.
handlersFabric := func() []handlers.MessageProcessor {
return []handlers.MessageProcessor{
&custom.EventMapper{},
custom.NewInputEventBuilder(),
custom.NewPageEventBuilder(),
&custom2.EventMapper{},
custom2.NewInputEventBuilder(),
custom2.NewPageEventBuilder(),
}
}
// Create handler's aggregator
builderMap := builder.NewBuilderMap(handlersFabric)
builderMap := sessions.NewBuilderMap(handlersFabric)
// Init modules
saver := datasaver.New(pg)
@ -59,11 +63,11 @@ func main() {
return
}
// Try to get session from db for the following handlers
session, err := pg.GetSession(sessionID)
if err != nil {
// Might happen due to the assets-related message TODO: log only if session is necessary for this kind of message
log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, sessionID, msg)
if session == nil {
if err != nil {
log.Printf("Error on session retrieving from cache: %v, SessionID: %v, Message: %v", err, sessionID, msg)
}
return
}

View file

@ -4,6 +4,7 @@ import (
"log"
"openreplay/backend/internal/config/ender"
"openreplay/backend/internal/sessionender"
"openreplay/backend/pkg/monitoring"
"time"
"os"
@ -17,7 +18,13 @@ import (
"openreplay/backend/pkg/queue/types"
)
/*
Ender
*/
func main() {
metrics := monitoring.New("ender")
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
// Load service configuration
@ -25,10 +32,14 @@ func main() {
// Init all modules
statsLogger := logger.NewQueueStats(cfg.LoggerTimeout)
sessions := sessionender.New(intervals.EVENTS_SESSION_END_TIMEOUT)
sessions, err := sessionender.New(metrics, intervals.EVENTS_SESSION_END_TIMEOUT)
if err != nil {
log.Printf("can't init ender service: %s", err)
return
}
producer := queue.NewProducer()
consumer := queue.NewMessageConsumer(
cfg.GroupEvents,
cfg.GroupEnder,
[]string{
cfg.TopicRawWeb,
cfg.TopicRawIOS,
@ -61,7 +72,7 @@ func main() {
sessions.HandleEndedSessions(func(sessionID uint64, timestamp int64) bool {
msg := &messages.SessionEnd{Timestamp: uint64(timestamp)}
if err := producer.Produce(cfg.TopicTrigger, sessionID, messages.Encode(msg)); err != nil {
log.Printf("can't send message to queue: %s", err)
log.Printf("can't send SessionEnd to trigger topic: %s; sessID: %d", err, sessionID)
return false
}
return true

View file

@ -2,50 +2,54 @@ package main
import (
"log"
"openreplay/backend/internal/builder"
"openreplay/backend/internal/config/ender"
"openreplay/backend/internal/handlers"
"openreplay/backend/internal/handlers/custom"
"openreplay/backend/internal/handlers/ios"
"openreplay/backend/internal/handlers/web"
"openreplay/backend/internal/config/heuristics"
"openreplay/backend/pkg/handlers"
"openreplay/backend/pkg/handlers/custom"
ios2 "openreplay/backend/pkg/handlers/ios"
web2 "openreplay/backend/pkg/handlers/web"
"openreplay/backend/pkg/intervals"
logger "openreplay/backend/pkg/log"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/sessions"
"os"
"os/signal"
"syscall"
"time"
)
/*
Heuristics
*/
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
// Load service configuration
cfg := ender.New()
cfg := heuristics.New()
// HandlersFabric returns the list of message handlers we want to be applied to each incoming message.
handlersFabric := func() []handlers.MessageProcessor {
return []handlers.MessageProcessor{
// web handlers
&web.ClickRageDetector{},
&web.CpuIssueDetector{},
&web.DeadClickDetector{},
&web.MemoryIssueDetector{},
&web.NetworkIssueDetector{},
&web.PerformanceAggregator{},
&web2.ClickRageDetector{},
&web2.CpuIssueDetector{},
&web2.DeadClickDetector{},
&web2.MemoryIssueDetector{},
&web2.NetworkIssueDetector{},
&web2.PerformanceAggregator{},
// iOS handlers
&ios.AppNotResponding{},
&ios.ClickRageDetector{},
&ios.PerformanceAggregator{},
&ios2.AppNotResponding{},
&ios2.ClickRageDetector{},
&ios2.PerformanceAggregator{},
// Other handlers (you can add your custom handlers here)
&custom.CustomHandler{},
}
}
// Create handler's aggregator
builderMap := builder.NewBuilderMap(handlersFabric)
builderMap := sessions.NewBuilderMap(handlersFabric)
// Init logger
statsLogger := logger.NewQueueStats(cfg.LoggerTimeout)
@ -53,7 +57,7 @@ func main() {
// Init producer and consumer for data bus
producer := queue.NewProducer()
consumer := queue.NewMessageConsumer(
cfg.GroupEvents,
cfg.GroupHeuristics,
[]string{
cfg.TopicRawWeb,
cfg.TopicRawIOS,

View file

@ -2,26 +2,33 @@ package main
import (
"log"
"openreplay/backend/internal/config/http"
"openreplay/backend/internal/http/router"
"openreplay/backend/internal/http/server"
"openreplay/backend/internal/http/services"
"openreplay/backend/pkg/monitoring"
"os"
"os/signal"
"syscall"
"openreplay/backend/internal/config"
"openreplay/backend/internal/router"
"openreplay/backend/internal/server"
"openreplay/backend/internal/services"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/pprof"
"openreplay/backend/pkg/queue"
)
/*
HTTP
*/
func main() {
metrics := monitoring.New("http")
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
pprof.StartProfilingServer()
// Load configuration
cfg := config.New()
cfg := http.New()
// Connect to queue
producer := queue.NewProducer()
@ -35,7 +42,7 @@ func main() {
services := services.New(cfg, producer, dbConn)
// Init server's routes
router, err := router.NewRouter(cfg, services)
router, err := router.NewRouter(cfg, services, metrics)
if err != nil {
log.Fatalf("failed while creating engine: %s", err)
}

View file

@ -2,6 +2,8 @@ package main
import (
"log"
config "openreplay/backend/internal/config/integrations"
"openreplay/backend/internal/integrations/clientManager"
"time"
"os"
@ -9,23 +11,25 @@ import (
"syscall"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/env"
"openreplay/backend/pkg/intervals"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/token"
"openreplay/backend/services/integrations/clientManager"
)
/*
Integrations
*/
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
TOPIC_RAW_WEB := env.String("TOPIC_RAW_WEB")
POSTGRES_STRING := env.String("POSTGRES_STRING")
pg := postgres.NewConn(POSTGRES_STRING)
cfg := config.New()
pg := postgres.NewConn(cfg.PostgresURI)
defer pg.Close()
tokenizer := token.NewTokenizer(env.String("TOKEN_SECRET"))
tokenizer := token.NewTokenizer(cfg.TokenSecret)
manager := clientManager.NewManager()
@ -45,7 +49,7 @@ func main() {
producer := queue.NewProducer()
defer producer.Close(15000)
listener, err := postgres.NewIntegrationsListener(POSTGRES_STRING)
listener, err := postgres.NewIntegrationsListener(cfg.PostgresURI)
if err != nil {
log.Printf("Postgres listener error: %v\n", err)
log.Fatalf("Postgres listener error")
@ -81,7 +85,7 @@ func main() {
sessionID = sessData.ID
}
// TODO: send to ready-events topic. Otherwise it have to go through the events worker.
producer.Produce(TOPIC_RAW_WEB, sessionID, messages.Encode(event.RawErrorEvent))
producer.Produce(cfg.TopicRawWeb, sessionID, messages.Encode(event.RawErrorEvent))
case err := <-manager.Errors:
log.Printf("Integration error: %v\n", err)
case i := <-manager.RequestDataUpdates:

View file

@ -1,24 +1,33 @@
package main
import (
"context"
"encoding/binary"
"log"
"openreplay/backend/internal/sink/assetscache"
"openreplay/backend/internal/sink/oswriter"
"openreplay/backend/internal/storage"
"openreplay/backend/pkg/monitoring"
"time"
"os"
"os/signal"
"syscall"
"openreplay/backend/internal/assetscache"
"openreplay/backend/internal/config/sink"
"openreplay/backend/internal/oswriter"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/url/assets"
)
/*
Sink
*/
func main() {
metrics := monitoring.New("sink")
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := sink.New()
@ -34,7 +43,19 @@ func main() {
rewriter := assets.NewRewriter(cfg.AssetsOrigin)
assetMessageHandler := assetscache.New(cfg, rewriter, producer)
count := 0
counter := storage.NewLogCounter()
totalMessages, err := metrics.RegisterCounter("messages_total")
if err != nil {
log.Printf("can't create messages_total metric: %s", err)
}
savedMessages, err := metrics.RegisterCounter("messages_saved")
if err != nil {
log.Printf("can't create messages_saved metric: %s", err)
}
messageSize, err := metrics.RegisterHistogram("messages_size")
if err != nil {
log.Printf("can't create messages_size metric: %s", err)
}
consumer := queue.NewMessageConsumer(
cfg.GroupSink,
@ -43,14 +64,25 @@ func main() {
cfg.TopicRawWeb,
},
func(sessionID uint64, message Message, _ *types.Meta) {
count++
// Process assets
message = assetMessageHandler.ParseAssets(sessionID, message)
totalMessages.Add(context.Background(), 1)
// Filter message
typeID := message.TypeID()
if !IsReplayerType(typeID) {
return
}
message = assetMessageHandler.ParseAssets(sessionID, message)
// If message timestamp is empty, use at least ts of session start
ts := message.Meta().Timestamp
if ts == 0 {
log.Printf("zero ts; sessID: %d, msg: %+v", sessionID, message)
} else {
// Log ts of last processed message
counter.Update(sessionID, time.UnixMilli(ts))
}
value := message.Encode()
var data []byte
@ -64,32 +96,35 @@ func main() {
if err := writer.Write(sessionID, data); err != nil {
log.Printf("Writer error: %v\n", err)
}
messageSize.Record(context.Background(), float64(len(data)))
savedMessages.Add(context.Background(), 1)
},
false,
)
log.Printf("Sink service started\n")
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
tick := time.Tick(30 * time.Second)
log.Printf("Sink service started\n")
for {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
consumer.Commit()
if err := consumer.Commit(); err != nil {
log.Printf("can't commit messages: %s", err)
}
consumer.Close()
os.Exit(0)
case <-tick:
if err := writer.SyncAll(); err != nil {
log.Fatalf("Sync error: %v\n", err)
}
log.Printf("%v messages during 30 sec", count)
count = 0
consumer.Commit()
counter.Print()
if err := consumer.Commit(); err != nil {
log.Printf("can't commit messages: %s", err)
}
default:
err := consumer.ConsumeNext()
if err != nil {

View file

@ -2,6 +2,7 @@ package main
import (
"log"
"openreplay/backend/pkg/monitoring"
"os"
"os/signal"
"strconv"
@ -16,18 +17,26 @@ import (
s3storage "openreplay/backend/pkg/storage"
)
/*
Storage
*/
func main() {
metrics := monitoring.New("storage")
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
cfg := config.New()
s3 := s3storage.NewS3(cfg.S3Region, cfg.S3Bucket)
srv, err := storage.New(cfg, s3)
srv, err := storage.New(cfg, s3, metrics)
if err != nil {
log.Printf("can't init storage service: %s", err)
return
}
counter := storage.NewLogCounter()
consumer := queue.NewMessageConsumer(
cfg.GroupStorage,
[]string{
@ -37,6 +46,8 @@ func main() {
switch msg.(type) {
case *messages.SessionEnd:
srv.UploadKey(strconv.FormatUint(sessionID, 10), 5)
// Log timestamp of last processed session
counter.Update(sessionID, time.UnixMilli(meta.Timestamp))
}
},
true,
@ -47,15 +58,15 @@ func main() {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
cleanTick := time.Tick(time.Duration(cfg.FSCleanHRS) * time.Hour)
counterTick := time.Tick(time.Second * 30)
for {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
consumer.Close()
os.Exit(0)
case <-cleanTick:
go srv.CleanDir(cfg.FSDir)
case <-counterTick:
go counter.Print()
default:
err := consumer.ConsumeNext()
if err != nil {

View file

@ -4,7 +4,6 @@ go 1.18
require (
cloud.google.com/go/logging v1.4.2
github.com/ClickHouse/clickhouse-go v1.4.3
github.com/aws/aws-sdk-go v1.35.23
github.com/btcsuite/btcutil v1.0.2
github.com/elastic/go-elasticsearch/v7 v7.13.1
@ -19,18 +18,21 @@ require (
github.com/pkg/errors v0.9.1
github.com/tomasen/realip v0.0.0-20180522021738-f0c99a92ddce
github.com/ua-parser/uap-go v0.0.0-20200325213135-e1c09f13e2fe
golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420
go.opentelemetry.io/otel/exporters/prometheus v0.30.0
go.opentelemetry.io/otel/metric v0.30.0
golang.org/x/net v0.0.0-20210525063256-abc453219eb5
google.golang.org/api v0.50.0
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.7.0
)
require (
cloud.google.com/go v0.84.0 // indirect
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect
github.com/confluentinc/confluent-kafka-go v1.7.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.6 // indirect
github.com/google/go-cmp v0.5.7 // indirect
github.com/googleapis/gax-go/v2 v2.0.5 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgio v1.0.0 // indirect
@ -42,13 +44,22 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/jstemmer/go-junit-report v0.9.1 // indirect
github.com/klauspost/compress v1.11.9 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/prometheus/client_golang v1.12.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/sdk v1.7.0 // indirect
go.opentelemetry.io/otel/sdk/metric v0.30.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/mod v0.4.2 // indirect
golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 // indirect
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect
golang.org/x/text v0.3.6 // indirect
golang.org/x/tools v0.1.4 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
@ -56,5 +67,5 @@ require (
google.golang.org/genproto v0.0.0-20210624195500-8bfb893ecb84 // indirect
google.golang.org/grpc v1.38.0 // indirect
google.golang.org/protobuf v1.26.0 // indirect
gopkg.in/yaml.v2 v2.2.8 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

View file

@ -44,13 +44,20 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/ClickHouse/clickhouse-go v1.4.3 h1:iAFMa2UrQdR5bHJ2/yaSLffZkxpcOYQMCUuKeNXGdqc=
github.com/ClickHouse/clickhouse-go v1.4.3/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aws/aws-sdk-go v1.35.23 h1:SCP0d0XvyJTDmfnHEQPvBaYi3kea1VNUo7uQmkVgFts=
github.com/aws/aws-sdk-go v1.35.23/go.mod h1:tlPOdRjfxPBpNIwqDj61rmsnA85v9jc0Ps9+muhnW+k=
github.com/bkaradzic/go-lz4 v1.0.0 h1:RXc4wYsyz985CkXXeX04y4VnZFGG8Rd43pRaHsOXAKk=
github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwjwegp5jy4=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ=
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
@ -62,19 +69,18 @@ github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 h1:F1EaeKL/ta07PY/k9Os/UFtwERei2/XzGemhpGnBKNg=
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h4xcZ5GoxqC5SDxFQ8gwyZPKQoEzownBlhI80=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I=
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/confluentinc/confluent-kafka-go v1.7.0 h1:tXh3LWb2Ne0WiU3ng4h5qiGA9XV61rz46w60O+cq8bM=
github.com/confluentinc/confluent-kafka-go v1.7.0/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
@ -95,12 +101,23 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE=
github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
@ -146,8 +163,10 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
@ -226,11 +245,17 @@ github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9Y
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1 h1:6QPYqodiu3GuPL+7mfx+NwDdp2eTkp9IfEUpgAwUN0o=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/klauspost/compress v1.11.9 h1:5OCMOdde1TCT2sookEuVeEZzA8bmRSFV3AwPDZAG8AA=
@ -239,6 +264,8 @@ github.com/klauspost/pgzip v1.2.5 h1:qnWYvvKqedOF2ulHpMG72XQol4ILEJ8k2wwRl/Km8oE
github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
@ -255,7 +282,15 @@ github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hd
github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=
github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
@ -263,14 +298,34 @@ github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/oschwald/maxminddb-golang v1.7.0 h1:JmU4Q1WBv5Q+2KZy5xJI+98aUwTIrPPxZUkd5Cwr8Zc=
github.com/oschwald/maxminddb-golang v1.7.0/go.mod h1:RXZtst0N6+FY/3qCNmZMBApR19cdQj43/NM9VkrNAis=
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.12.1 h1:ZiaPsmm9uiBeaSMRznKsCDNtPCS0T3JVDGF+06gjBzk=
github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M=
github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc=
github.com/prometheus/common v0.32.1 h1:hWIdL3N2HoUx3B8j3YN9mWor0qhY/NlEKZEaXxuIRh4=
github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU=
@ -278,8 +333,10 @@ github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThC
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 h1:pntxY8Ary0t43dCZ5dqY4YTJCObLY1kIXl0uzMv+7DE=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
@ -287,8 +344,9 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tomasen/realip v0.0.0-20180522021738-f0c99a92ddce h1:fb190+cK2Xz/dvi9Hv8eCYJYvIGUTN2/KLq1pT6CjEc=
github.com/tomasen/realip v0.0.0-20180522021738-f0c99a92ddce/go.mod h1:o8v6yHRoik09Xen7gje4m9ERNah1d1PPsVq1VEx9vE4=
github.com/ua-parser/uap-go v0.0.0-20200325213135-e1c09f13e2fe h1:aj/vX5epIlQQBEocKoM9nSAiNpakdQzElc8SaRFPu+I=
@ -307,12 +365,25 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opentelemetry.io/otel v1.7.0 h1:Z2lA3Tdch0iDcrhJXDIlC94XE+bxok1F9B+4Lz/lGsM=
go.opentelemetry.io/otel v1.7.0/go.mod h1:5BdUoMIz5WEs0vt0CUEMtSSaTSHBBVwrhnz7+nrD5xk=
go.opentelemetry.io/otel/exporters/prometheus v0.30.0 h1:YXo5ZY5nofaEYMCMTTMaRH2cLDZB8+0UGuk5RwMfIo0=
go.opentelemetry.io/otel/exporters/prometheus v0.30.0/go.mod h1:qN5feW+0/d661KDtJuATEmHtw5bKBK7NSvNEP927zSs=
go.opentelemetry.io/otel/metric v0.30.0 h1:Hs8eQZ8aQgs0U49diZoaS6Uaxw3+bBE3lcMUKBFIk3c=
go.opentelemetry.io/otel/metric v0.30.0/go.mod h1:/ShZ7+TS4dHzDFmfi1kSXMhMVubNoP0oIaBp70J6UXU=
go.opentelemetry.io/otel/sdk v1.7.0 h1:4OmStpcKVOfvDOgCt7UriAPtKolwIhxpnSNI/yK+1B0=
go.opentelemetry.io/otel/sdk v1.7.0/go.mod h1:uTEOTwaqIVuTGiJN7ii13Ibp75wJmYUDe374q6cZwUU=
go.opentelemetry.io/otel/sdk/metric v0.30.0 h1:XTqQ4y3erR2Oj8xSAOL5ovO5011ch2ELg51z4fVkpME=
go.opentelemetry.io/otel/sdk/metric v0.30.0/go.mod h1:8AKFRi5HyvTR0RRty3paN1aMC9HMT+NzcEhw/BLkLX8=
go.opentelemetry.io/otel/trace v1.7.0 h1:O37Iogk1lEkMRXewVtZ1BBTVn5JEp8GrJvP92bJqC6o=
go.opentelemetry.io/otel/trace v1.7.0/go.mod h1:fzLSB9nqR2eXzxPXb2JW9IKE+ScyXA48yyE4TNvoHqU=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190411191339-88737f569e3a/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
@ -363,6 +434,7 @@ golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
@ -370,6 +442,7 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@ -396,8 +469,9 @@ golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420 h1:a8jGStKg0XqKDlKqjLrXn0ioF5MH36pT7Z0BRTqLhbk=
golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5 h1:wjuX4b5yYQnEQHzd+CBcrcC6OVR2J1CN6mUy0oSxIPo=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@ -428,6 +502,7 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -445,6 +520,7 @@ golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191224085550-c709ea063b76/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -457,6 +533,8 @@ golang.org/x/sys v0.0.0-20200501052902-10377860bb8e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200511232937-7e40ca221e25/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -464,18 +542,22 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210305230114-8fe3ee5dd75b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210503080704-8803ae5d1324/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603125802-9665404d3644/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 h1:RqytpXGR1iVNX7psjB3ff8y7sNFinVFvkx1c8SjBkio=
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 h1:XfKQ4OlFl8okEOr5UvAqFRVj8pY/4yfcXrddB8qAbU0=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -666,11 +748,11 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.7.0 h1:+RlmciBLDd/XwM1iudiG3HtCg45purnsOxEoY/+JZdQ=
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.7.0/go.mod h1:ZdI3yfYmdNSLQPNCpO1y00EHyWaHG5EnQEyL/ntAegY=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
@ -679,8 +761,12 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkep
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View file

@ -5,7 +5,7 @@ import (
)
type Config struct {
GroupEvents string
GroupEnder string
TopicTrigger string
LoggerTimeout int
TopicRawWeb string
@ -15,7 +15,7 @@ type Config struct {
func New() *Config {
return &Config{
GroupEvents: env.String("GROUP_ENDER"),
GroupEnder: env.String("GROUP_ENDER"),
TopicTrigger: env.String("TOPIC_TRIGGER"),
LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"),
TopicRawWeb: env.String("TOPIC_RAW_WEB"),

View file

@ -0,0 +1,25 @@
package heuristics
import (
"openreplay/backend/pkg/env"
)
type Config struct {
GroupHeuristics string
TopicTrigger string
LoggerTimeout int
TopicRawWeb string
TopicRawIOS string
ProducerTimeout int
}
func New() *Config {
return &Config{
GroupHeuristics: env.String("GROUP_HEURISTICS"),
TopicTrigger: env.String("TOPIC_TRIGGER"),
LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"),
TopicRawWeb: env.String("TOPIC_RAW_WEB"),
TopicRawIOS: env.String("TOPIC_RAW_IOS"),
ProducerTimeout: 2000,
}
}

View file

@ -1,4 +1,4 @@
package config
package http
import (
"openreplay/backend/pkg/env"

View file

@ -0,0 +1,17 @@
package integrations
import "openreplay/backend/pkg/env"
type Config struct {
TopicRawWeb string
PostgresURI string
TokenSecret string
}
func New() *Config {
return &Config{
TopicRawWeb: env.String("TOPIC_RAW_WEB"),
PostgresURI: env.String("POSTGRES_STRING"),
TokenSecret: env.String("TOKEN_SECRET"),
}
}

View file

@ -6,27 +6,27 @@ import (
)
type Config struct {
S3Region string
S3Bucket string
FSDir string
FSCleanHRS int
SessionFileSplitSize int
RetryTimeout time.Duration
GroupStorage string
TopicTrigger string
DeleteTimeout time.Duration
S3Region string
S3Bucket string
FSDir string
FSCleanHRS int
FileSplitSize int
RetryTimeout time.Duration
GroupStorage string
TopicTrigger string
DeleteTimeout time.Duration
}
func New() *Config {
return &Config{
S3Region: env.String("AWS_REGION_WEB"),
S3Bucket: env.String("S3_BUCKET_WEB"),
FSDir: env.String("FS_DIR"),
FSCleanHRS: env.Int("FS_CLEAN_HRS"),
SessionFileSplitSize: 200000, // ~200 kB
RetryTimeout: 2 * time.Minute,
GroupStorage: env.String("GROUP_STORAGE"),
TopicTrigger: env.String("TOPIC_TRIGGER"),
DeleteTimeout: 48 * time.Hour,
S3Region: env.String("AWS_REGION_WEB"),
S3Bucket: env.String("S3_BUCKET_WEB"),
FSDir: env.String("FS_DIR"),
FSCleanHRS: env.Int("FS_CLEAN_HRS"),
FileSplitSize: env.Int("FILE_SPLIT_SIZE"),
RetryTimeout: 2 * time.Minute,
GroupStorage: env.String("GROUP_STORAGE"),
TopicTrigger: env.String("TOPIC_TRIGGER"),
DeleteTimeout: 48 * time.Hour,
}
}

View file

@ -61,6 +61,16 @@ func (mi *Saver) InsertMessage(sessionID uint64, msg Message) error {
return mi.pg.InsertIOSScreenEnter(sessionID, m)
case *IOSCrash:
return mi.pg.InsertIOSCrash(sessionID, m)
case *RawErrorEvent:
return mi.pg.InsertWebErrorEvent(sessionID, &ErrorEvent{
MessageID: m.Meta().Index, // TODO: is it possible to catch panic here???
Timestamp: m.Timestamp,
Source: m.Source,
Name: m.Name,
Message: m.Message,
Payload: m.Payload,
})
}
return nil // "Not implemented"
}

View file

@ -6,8 +6,8 @@ import (
"log"
"math/rand"
"net/http"
"openreplay/backend/internal/ios"
"openreplay/backend/internal/uuid"
"openreplay/backend/internal/http/ios"
"openreplay/backend/internal/http/uuid"
"strconv"
"time"

View file

@ -1,13 +1,16 @@
package router
import (
"bytes"
"encoding/json"
"errors"
"go.opentelemetry.io/otel/attribute"
"io"
"io/ioutil"
"log"
"math/rand"
"net/http"
"openreplay/backend/internal/uuid"
"openreplay/backend/internal/http/uuid"
"strconv"
"time"
@ -16,6 +19,23 @@ import (
"openreplay/backend/pkg/token"
)
func (e *Router) readBody(w http.ResponseWriter, r *http.Request, limit int64) (io.ReadCloser, error) {
body := http.MaxBytesReader(w, r.Body, limit)
bodyBytes, err := ioutil.ReadAll(body)
if err != nil {
return nil, err
}
body.Close()
reqSize := len(bodyBytes)
e.requestSize.Record(
r.Context(),
float64(reqSize),
[]attribute.KeyValue{attribute.String("method", r.URL.Path)}...,
)
return ioutil.NopCloser(bytes.NewBuffer(bodyBytes)), nil
}
func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
@ -24,12 +44,18 @@ func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request)
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
return
}
body := http.MaxBytesReader(w, r.Body, e.cfg.JsonSizeLimit)
defer body.Close()
reqBody, err := e.readBody(w, r, e.cfg.JsonSizeLimit)
if err != nil {
log.Printf("error while reading request body: %s", err)
ResponseWithError(w, http.StatusRequestEntityTooLarge, err)
return
}
defer reqBody.Close()
// Parse request body
req := &StartSessionRequest{}
if err := json.NewDecoder(body).Decode(req); err != nil {
if err := json.NewDecoder(reqBody).Decode(req); err != nil {
ResponseWithError(w, http.StatusBadRequest, err)
return
}
@ -114,10 +140,16 @@ func (e *Router) pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request)
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
return
}
body := http.MaxBytesReader(w, r.Body, e.cfg.BeaconSizeLimit)
defer body.Close()
bytes, err := ioutil.ReadAll(body)
reqBody, err := e.readBody(w, r, e.cfg.BeaconSizeLimit)
if err != nil {
log.Printf("error while reading request body: %s", err)
ResponseWithError(w, http.StatusRequestEntityTooLarge, err)
return
}
defer reqBody.Close()
bytes, err := ioutil.ReadAll(reqBody)
if err != nil {
ResponseWithError(w, http.StatusInternalServerError, err) // TODO: Split environments; send error here only on staging
return
@ -139,12 +171,18 @@ func (e *Router) notStartedHandlerWeb(w http.ResponseWriter, r *http.Request) {
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
return
}
body := http.MaxBytesReader(w, r.Body, e.cfg.JsonSizeLimit)
defer body.Close()
reqBody, err := e.readBody(w, r, e.cfg.JsonSizeLimit)
if err != nil {
log.Printf("error while reading request body: %s", err)
ResponseWithError(w, http.StatusRequestEntityTooLarge, err)
return
}
defer reqBody.Close()
// Parse request body
req := &NotStartedRequest{}
if err := json.NewDecoder(body).Decode(req); err != nil {
if err := json.NewDecoder(reqBody).Decode(req); err != nil {
ResponseWithError(w, http.StatusBadRequest, err)
return
}
@ -160,7 +198,7 @@ func (e *Router) notStartedHandlerWeb(w http.ResponseWriter, r *http.Request) {
return
}
country := e.services.GeoIP.ExtractISOCodeFromHTTPRequest(r)
err := e.services.Database.InsertUnstartedSession(postgres.UnstartedSession{
err = e.services.Database.InsertUnstartedSession(postgres.UnstartedSession{
ProjectKey: *req.ProjectKey,
TrackerVersion: req.TrackerVersion,
DoNotTrack: req.DoNotTrack,

View file

@ -0,0 +1,119 @@
package router
import (
"context"
"fmt"
"github.com/gorilla/mux"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"log"
"net/http"
http3 "openreplay/backend/internal/config/http"
http2 "openreplay/backend/internal/http/services"
"openreplay/backend/pkg/monitoring"
"time"
)
type Router struct {
router *mux.Router
cfg *http3.Config
services *http2.ServicesBuilder
requestSize syncfloat64.Histogram
requestDuration syncfloat64.Histogram
totalRequests syncfloat64.Counter
}
func NewRouter(cfg *http3.Config, services *http2.ServicesBuilder, metrics *monitoring.Metrics) (*Router, error) {
switch {
case cfg == nil:
return nil, fmt.Errorf("config is empty")
case services == nil:
return nil, fmt.Errorf("services is empty")
case metrics == nil:
return nil, fmt.Errorf("metrics is empty")
}
e := &Router{
cfg: cfg,
services: services,
}
e.initMetrics(metrics)
e.init()
return e, nil
}
func (e *Router) init() {
e.router = mux.NewRouter()
// Root path
e.router.HandleFunc("/", e.root)
handlers := map[string]func(http.ResponseWriter, *http.Request){
"/v1/web/not-started": e.notStartedHandlerWeb,
"/v1/web/start": e.startSessionHandlerWeb,
"/v1/web/i": e.pushMessagesHandlerWeb,
"/v1/ios/start": e.startSessionHandlerIOS,
"/v1/ios/i": e.pushMessagesHandlerIOS,
"/v1/ios/late": e.pushLateMessagesHandlerIOS,
"/v1/ios/images": e.imagesUploadHandlerIOS,
}
prefix := "/ingest"
for path, handler := range handlers {
e.router.HandleFunc(path, handler).Methods("POST", "OPTIONS")
e.router.HandleFunc(prefix+path, handler).Methods("POST", "OPTIONS")
}
// CORS middleware
e.router.Use(e.corsMiddleware)
}
func (e *Router) initMetrics(metrics *monitoring.Metrics) {
var err error
e.requestSize, err = metrics.RegisterHistogram("requests_body_size")
if err != nil {
log.Printf("can't create requests_body_size metric: %s", err)
}
e.requestDuration, err = metrics.RegisterHistogram("requests_duration")
if err != nil {
log.Printf("can't create requests_duration metric: %s", err)
}
e.totalRequests, err = metrics.RegisterCounter("requests_total")
if err != nil {
log.Printf("can't create requests_total metric: %s", err)
}
}
func (e *Router) root(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}
func (e *Router) corsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Prepare headers for preflight requests
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type,Authorization")
if r.Method == http.MethodOptions {
w.Header().Set("Cache-Control", "max-age=86400")
w.WriteHeader(http.StatusOK)
return
}
log.Printf("Request: %v - %v ", r.Method, r.URL.Path)
requestStart := time.Now()
// Serve request
next.ServeHTTP(w, r)
metricsContext, _ := context.WithTimeout(context.Background(), time.Millisecond*100)
e.totalRequests.Add(metricsContext, 1)
e.requestDuration.Record(metricsContext,
float64(time.Now().Sub(requestStart).Milliseconds()),
[]attribute.KeyValue{attribute.String("method", r.URL.Path)}...,
)
})
}
func (e *Router) GetHandler() http.Handler {
return e.router
}

View file

@ -1,9 +1,9 @@
package services
import (
"openreplay/backend/internal/config"
"openreplay/backend/internal/geoip"
"openreplay/backend/internal/uaparser"
"openreplay/backend/internal/config/http"
"openreplay/backend/internal/http/geoip"
"openreplay/backend/internal/http/uaparser"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/flakeid"
"openreplay/backend/pkg/queue/types"
@ -21,7 +21,7 @@ type ServicesBuilder struct {
Storage *storage.S3
}
func New(cfg *config.Config, producer types.Producer, pgconn *cache.PGCache) *ServicesBuilder {
func New(cfg *http.Config, producer types.Producer, pgconn *cache.PGCache) *ServicesBuilder {
return &ServicesBuilder{
Database: pgconn,
Producer: producer,

View file

@ -1,10 +1,10 @@
package clientManager
import (
"openreplay/backend/internal/integrations/integration"
"strconv"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/services/integrations/integration"
)
type manager struct {

View file

@ -1,70 +0,0 @@
package router
import (
"github.com/gorilla/mux"
"log"
"net/http"
"openreplay/backend/internal/config"
http2 "openreplay/backend/internal/services"
)
type Router struct {
router *mux.Router
cfg *config.Config
services *http2.ServicesBuilder
}
func NewRouter(cfg *config.Config, services *http2.ServicesBuilder) (*Router, error) {
e := &Router{
cfg: cfg,
services: services,
}
e.init()
return e, nil
}
func (e *Router) init() {
e.router = mux.NewRouter()
// Root path
e.router.HandleFunc("/", e.root).Methods("POST")
// Web handlers
e.router.HandleFunc("/v1/web/not-started", e.notStartedHandlerWeb).Methods("POST")
e.router.HandleFunc("/v1/web/start", e.startSessionHandlerWeb).Methods("POST")
e.router.HandleFunc("/v1/web/i", e.pushMessagesHandlerWeb).Methods("POST")
// iOS handlers
e.router.HandleFunc("/v1/ios/start", e.startSessionHandlerIOS).Methods("POST")
e.router.HandleFunc("/v1/ios/i", e.pushMessagesHandlerIOS).Methods("POST")
e.router.HandleFunc("/v1/ios/late", e.pushLateMessagesHandlerIOS).Methods("POST")
e.router.HandleFunc("/v1/ios/images", e.imagesUploadHandlerIOS).Methods("POST")
// CORS middleware
e.router.Use(e.corsMiddleware)
}
func (e *Router) root(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}
func (e *Router) corsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Prepare headers for preflight requests
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type,Authorization")
if r.Method == http.MethodOptions {
w.Header().Set("Cache-Control", "max-age=86400")
w.WriteHeader(http.StatusOK)
return
}
log.Printf("Request: %v - %v ", r.Method, r.URL.Path)
// Serve request
next.ServeHTTP(w, r)
})
}
func (e *Router) GetHandler() http.Handler {
return e.router
}

View file

@ -1,7 +1,11 @@
package sessionender
import (
"context"
"fmt"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"log"
"openreplay/backend/pkg/monitoring"
"time"
)
@ -16,15 +20,31 @@ type session struct {
// SessionEnder updates timestamp of last message for each session
type SessionEnder struct {
timeout int64
sessions map[uint64]*session // map[sessionID]session
timeout int64
sessions map[uint64]*session // map[sessionID]session
activeSessions syncfloat64.UpDownCounter
totalSessions syncfloat64.Counter
}
func New(timeout int64) *SessionEnder {
return &SessionEnder{
timeout: timeout,
sessions: make(map[uint64]*session),
func New(metrics *monitoring.Metrics, timeout int64) (*SessionEnder, error) {
if metrics == nil {
return nil, fmt.Errorf("metrics module is empty")
}
activeSessions, err := metrics.RegisterUpDownCounter("sessions_active")
if err != nil {
return nil, fmt.Errorf("can't register session.active metric: %s", err)
}
totalSessions, err := metrics.RegisterCounter("sessions_total")
if err != nil {
return nil, fmt.Errorf("can't register session.total metric: %s", err)
}
return &SessionEnder{
timeout: timeout,
sessions: make(map[uint64]*session),
activeSessions: activeSessions,
totalSessions: totalSessions,
}, nil
}
// UpdateSession save timestamp for new sessions and update for existing sessions
@ -40,6 +60,9 @@ func (se *SessionEnder) UpdateSession(sessionID, timestamp uint64) {
lastTimestamp: currTS,
isEnded: false,
}
log.Printf("added new session: %d", sessionID)
se.activeSessions.Add(context.Background(), 1)
se.totalSessions.Add(context.Background(), 1)
return
}
if currTS > sess.lastTimestamp {
@ -51,12 +74,16 @@ func (se *SessionEnder) UpdateSession(sessionID, timestamp uint64) {
// HandleEndedSessions runs handler for each ended session and delete information about session in successful case
func (se *SessionEnder) HandleEndedSessions(handler EndedSessionHandler) {
deadLine := time.Now().UnixMilli() - se.timeout
allSessions, removedSessions := len(se.sessions), 0
for sessID, sess := range se.sessions {
if sess.isEnded || sess.lastTimestamp < deadLine {
sess.isEnded = true
if handler(sessID, sess.lastTimestamp) {
delete(se.sessions, sessID)
se.activeSessions.Add(context.Background(), -1)
removedSessions++
}
}
}
log.Printf("Removed %d of %d sessions", removedSessions, allSessions)
}

View file

@ -1,6 +1,7 @@
package assetscache
import (
"log"
"openreplay/backend/internal/config/sink"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue/types"
@ -25,38 +26,50 @@ func (e *AssetsCache) ParseAssets(sessID uint64, msg messages.Message) messages.
switch m := msg.(type) {
case *messages.SetNodeAttributeURLBased:
if m.Name == "src" || m.Name == "href" {
return &messages.SetNodeAttribute{
newMsg := &messages.SetNodeAttribute{
ID: m.ID,
Name: m.Name,
Value: e.handleURL(sessID, m.BaseURL, m.Value),
}
newMsg.SetMeta(msg.Meta())
return newMsg
} else if m.Name == "style" {
return &messages.SetNodeAttribute{
newMsg := &messages.SetNodeAttribute{
ID: m.ID,
Name: m.Name,
Value: e.handleCSS(sessID, m.BaseURL, m.Value),
}
newMsg.SetMeta(msg.Meta())
return newMsg
}
case *messages.SetCSSDataURLBased:
return &messages.SetCSSData{
newMsg := &messages.SetCSSData{
ID: m.ID,
Data: e.handleCSS(sessID, m.BaseURL, m.Data),
}
newMsg.SetMeta(msg.Meta())
return newMsg
case *messages.CSSInsertRuleURLBased:
return &messages.CSSInsertRule{
newMsg := &messages.CSSInsertRule{
ID: m.ID,
Index: m.Index,
Rule: e.handleCSS(sessID, m.BaseURL, m.Rule),
}
newMsg.SetMeta(msg.Meta())
return newMsg
}
return msg
}
func (e *AssetsCache) sendAssetForCache(sessionID uint64, baseURL string, relativeURL string) {
if fullURL, cacheable := assets.GetFullCachableURL(baseURL, relativeURL); cacheable {
e.producer.Produce(e.cfg.TopicCache, sessionID, messages.Encode(&messages.AssetCache{
URL: fullURL,
}))
if err := e.producer.Produce(
e.cfg.TopicCache,
sessionID,
messages.Encode(&messages.AssetCache{URL: fullURL}),
); err != nil {
log.Printf("can't send asset to cache topic, sessID: %d, err: %s", sessionID, err)
}
}
}

View file

@ -1,33 +0,0 @@
package storage
import (
"io/ioutil"
"log"
"os"
"strconv"
"time"
"openreplay/backend/pkg/flakeid"
)
func (s *Storage) CleanDir(dirname string) {
files, err := ioutil.ReadDir(dirname)
if err != nil {
log.Printf("Cannot read file directory. %v", err)
return
}
for _, f := range files {
name := f.Name()
id, err := strconv.ParseUint(name, 10, 64)
if err != nil {
log.Printf("Cannot parse session filename. %v", err)
continue
}
ts := int64(flakeid.ExtractTimestamp(id))
if time.UnixMilli(ts).Add(s.cfg.DeleteTimeout).Before(time.Now()) {
// returns an error. Don't log it since it can be race condition between worker instances
os.Remove(dirname + "/" + name)
}
}
}

View file

@ -0,0 +1,49 @@
package storage
import (
"log"
"sync"
"time"
)
type logCounter struct {
mu sync.Mutex
counter int
timestamp time.Time
lastTS time.Time
lastSessID uint64
}
func NewLogCounter() *logCounter {
nlc := &logCounter{}
nlc.init()
return nlc
}
func (c *logCounter) init() {
c.mu.Lock()
c.counter = 0
c.timestamp = time.Now()
c.mu.Unlock()
}
func (c *logCounter) Update(sessID uint64, ts time.Time) {
c.mu.Lock()
c.counter++
c.lastTS = ts
c.lastSessID = sessID
c.mu.Unlock()
}
func (c *logCounter) Print() {
c.mu.Lock()
log.Printf("count: %d, dur: %ds, msgTS: %s, sessID: %d, part: %d",
c.counter,
int(time.Now().Sub(c.timestamp).Seconds()),
c.lastTS.String(),
c.lastSessID,
c.lastSessID%16,
)
c.mu.Unlock()
c.init()
}

View file

@ -2,37 +2,74 @@ package storage
import (
"bytes"
"context"
"fmt"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"log"
config "openreplay/backend/internal/config/storage"
"openreplay/backend/pkg/flakeid"
"openreplay/backend/pkg/monitoring"
"openreplay/backend/pkg/storage"
"os"
"strconv"
"time"
)
type Storage struct {
cfg *config.Config
s3 *storage.S3
cfg *config.Config
s3 *storage.S3
startBytes []byte
totalSessions syncfloat64.Counter
sessionSize syncfloat64.Histogram
archivingTime syncfloat64.Histogram
}
func New(cfg *config.Config, s3 *storage.S3) (*Storage, error) {
func New(cfg *config.Config, s3 *storage.S3, metrics *monitoring.Metrics) (*Storage, error) {
switch {
case cfg == nil:
return nil, fmt.Errorf("config is empty")
case s3 == nil:
return nil, fmt.Errorf("s3 storage is empty")
}
return &Storage{s3: s3}, nil
// Create metrics
totalSessions, err := metrics.RegisterCounter("sessions_total")
if err != nil {
log.Printf("can't create sessions_total metric: %s", err)
}
sessionSize, err := metrics.RegisterHistogram("sessions_size")
if err != nil {
log.Printf("can't create session_size metric: %s", err)
}
archivingTime, err := metrics.RegisterHistogram("archiving_duration")
if err != nil {
log.Printf("can't create archiving_duration metric: %s", err)
}
return &Storage{
cfg: cfg,
s3: s3,
startBytes: make([]byte, cfg.FileSplitSize),
totalSessions: totalSessions,
sessionSize: sessionSize,
archivingTime: archivingTime,
}, nil
}
func (s *Storage) UploadKey(key string, retryCount int) {
start := time.Now()
if retryCount <= 0 {
return
}
file, err := os.Open(s.cfg.FSDir + "/" + key)
if err != nil {
log.Printf("File error: %v; Will retry %v more time(s)\n", err, retryCount)
sessID, _ := strconv.ParseUint(key, 10, 64)
log.Printf("File error: %v; Will retry %v more time(s); sessID: %s, part: %d, sessStart: %s\n",
err,
retryCount,
key,
sessID%16,
time.UnixMilli(int64(flakeid.ExtractTimestamp(sessID))),
)
time.AfterFunc(s.cfg.RetryTimeout, func() {
s.UploadKey(key, retryCount-1)
})
@ -40,19 +77,40 @@ func (s *Storage) UploadKey(key string, retryCount int) {
}
defer file.Close()
startBytes := make([]byte, s.cfg.SessionFileSplitSize)
nRead, err := file.Read(startBytes)
nRead, err := file.Read(s.startBytes)
if err != nil {
log.Printf("File read error: %f", err)
sessID, _ := strconv.ParseUint(key, 10, 64)
log.Printf("File read error: %s; sessID: %s, part: %d, sessStart: %s",
err,
key,
sessID%16,
time.UnixMilli(int64(flakeid.ExtractTimestamp(sessID))),
)
time.AfterFunc(s.cfg.RetryTimeout, func() {
s.UploadKey(key, retryCount-1)
})
return
}
startReader := bytes.NewBuffer(startBytes)
startReader := bytes.NewBuffer(s.startBytes[:nRead])
if err := s.s3.Upload(s.gzipFile(startReader), key, "application/octet-stream", true); err != nil {
log.Fatalf("Storage: start upload failed. %v\n", err)
}
if nRead == s.cfg.SessionFileSplitSize {
if nRead == s.cfg.FileSplitSize {
if err := s.s3.Upload(s.gzipFile(file), key+"e", "application/octet-stream", true); err != nil {
log.Fatalf("Storage: end upload failed. %v\n", err)
}
}
// Save metrics
var fileSize float64 = 0
fileInfo, err := file.Stat()
if err != nil {
log.Printf("can't get file info: %s", err)
} else {
fileSize = float64(fileInfo.Size())
}
ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*200)
s.archivingTime.Record(ctx, float64(time.Now().Sub(start).Milliseconds()))
s.sessionSize.Record(ctx, fileSize)
s.totalSessions.Add(ctx, 1)
}

View file

@ -8,10 +8,7 @@ import (
func (c *PGCache) GetSession(sessionID uint64) (*Session, error) {
if s, inCache := c.sessions[sessionID]; inCache {
// TODO: review. Might cause bugs in case of multiple instances
if s == nil {
return nil, pgx.ErrNoRows
}
// TODO: review. Might cause bugs in case of multiple PG instances
return s, nil
}
s, err := c.Conn.GetSession(sessionID)

View file

@ -46,17 +46,6 @@ func (b *EventMapper) Build() Message {
func (b *EventMapper) Handle(message Message, messageID uint64, timestamp uint64) Message {
switch msg := message.(type) {
case *RawErrorEvent:
// !!! This won't be handled because the Meta() timestamp emitted by `integrations` will be 0
// TODO: move to db directly
return &ErrorEvent{
MessageID: messageID,
Timestamp: msg.Timestamp,
Source: msg.Source,
Name: msg.Name,
Message: msg.Message,
Payload: msg.Payload,
}
case *MouseClick:
if msg.Label != "" {
return &ClickEvent{

View file

@ -1,7 +1,7 @@
package ios
import (
"openreplay/backend/internal/handlers"
"openreplay/backend/pkg/handlers"
. "openreplay/backend/pkg/messages"
)

View file

@ -1,8 +1,8 @@
package ios
import (
"openreplay/backend/internal/handlers"
"openreplay/backend/internal/handlers/web"
"openreplay/backend/pkg/handlers"
"openreplay/backend/pkg/handlers/web"
. "openreplay/backend/pkg/messages"
)

View file

@ -1,7 +1,7 @@
package ios
import (
"openreplay/backend/internal/handlers"
"openreplay/backend/pkg/handlers"
. "openreplay/backend/pkg/messages"
)

View file

@ -109,8 +109,9 @@ func (b *PerformanceAggregator) Build() Message {
b.PerformanceTrackAggr.AvgCPU = 100 - uint64(math.Round(b.sumTickRate*100/b.count))
b.PerformanceTrackAggr.AvgTotalJSHeapSize = uint64(math.Round(b.sumTotalJSHeapSize / b.count))
b.PerformanceTrackAggr.AvgUsedJSHeapSize = uint64(math.Round(b.sumUsedJSHeapSize / b.count))
msg := b.PerformanceTrackAggr
b.reset()
return b.PerformanceTrackAggr
return msg
}
b.reset()
return nil

View file

@ -40,6 +40,9 @@ func ReadBatchReader(reader io.Reader, messageHandler func(Message)) error {
timestamp = int64(m.Timestamp) // TODO(?): replace timestamp type to int64 everywhere (including encoding part in tracker)
// No skipping here for making it easy to encode back the same sequence of message
// continue readLoop
case *SessionStart:
// Save session start timestamp for collecting "empty" sessions
timestamp = int64(m.Timestamp)
}
msg.Meta().Index = index
msg.Meta().Timestamp = timestamp

View file

@ -9,6 +9,11 @@ func (m *message) Meta() *message {
return m
}
func (m *message) SetMeta(origin *message) {
m.Timestamp = origin.Timestamp
m.Index = origin.Index
}
type Message interface {
Encode() []byte
TypeID() int

View file

@ -0,0 +1,138 @@
package monitoring
import (
"fmt"
"log"
"net/http"
"go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/global"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
selector "go.opentelemetry.io/otel/sdk/metric/selector/simple"
)
// Metrics stores all collected metrics
type Metrics struct {
meter metric.Meter
counters map[string]syncfloat64.Counter
upDownCounters map[string]syncfloat64.UpDownCounter
histograms map[string]syncfloat64.Histogram
}
func New(name string) *Metrics {
m := &Metrics{
counters: make(map[string]syncfloat64.Counter),
upDownCounters: make(map[string]syncfloat64.UpDownCounter),
histograms: make(map[string]syncfloat64.Histogram),
}
m.initPrometheusDataExporter()
m.initMetrics(name)
return m
}
// initPrometheusDataExporter allows to use collected metrics in prometheus
func (m *Metrics) initPrometheusDataExporter() {
config := prometheus.Config{
DefaultHistogramBoundaries: []float64{1, 2, 5, 10, 20, 50},
}
c := controller.New(
processor.NewFactory(
selector.NewWithHistogramDistribution(
histogram.WithExplicitBoundaries(config.DefaultHistogramBoundaries),
),
aggregation.CumulativeTemporalitySelector(),
processor.WithMemory(true),
),
)
exporter, err := prometheus.New(config, c)
if err != nil {
log.Panicf("failed to initialize prometheus exporter %v", err)
}
global.SetMeterProvider(exporter.MeterProvider())
http.HandleFunc("/metrics", exporter.ServeHTTP)
go func() {
_ = http.ListenAndServe(":8888", nil)
}()
fmt.Println("Prometheus server running on :8888")
}
func (m *Metrics) initMetrics(name string) {
m.meter = global.Meter(name)
}
/*
Counter is a synchronous instrument that measures additive non-decreasing values, for example, the number of:
- processed requests
- received bytes
- disk reads
*/
func (m *Metrics) RegisterCounter(name string) (syncfloat64.Counter, error) {
if _, ok := m.counters[name]; ok {
return nil, fmt.Errorf("counter %s already exists", name)
}
counter, err := m.meter.SyncFloat64().Counter(name)
if err != nil {
return nil, fmt.Errorf("failed to initialize counter: %v", err)
}
m.counters[name] = counter
return counter, nil
}
func (m *Metrics) GetCounter(name string) syncfloat64.Counter {
return m.counters[name]
}
/*
UpDownCounter is a synchronous instrument which measures additive values that increase or decrease with time,
for example, the number of:
- active requests
- open connections
- memory in use (megabytes)
*/
func (m *Metrics) RegisterUpDownCounter(name string) (syncfloat64.UpDownCounter, error) {
if _, ok := m.upDownCounters[name]; ok {
return nil, fmt.Errorf("upDownCounter %s already exists", name)
}
counter, err := m.meter.SyncFloat64().UpDownCounter(name)
if err != nil {
return nil, fmt.Errorf("failed to initialize upDownCounter: %v", err)
}
m.upDownCounters[name] = counter
return counter, nil
}
func (m *Metrics) GetUpDownCounter(name string) syncfloat64.UpDownCounter {
return m.upDownCounters[name]
}
/*
Histogram is a synchronous instrument that produces a histogram from recorded values, for example:
- request latency
- request size
*/
func (m *Metrics) RegisterHistogram(name string) (syncfloat64.Histogram, error) {
if _, ok := m.histograms[name]; ok {
return nil, fmt.Errorf("histogram %s already exists", name)
}
hist, err := m.meter.SyncFloat64().Histogram(name)
if err != nil {
return nil, fmt.Errorf("failed to initialize histogram: %v", err)
}
m.histograms[name] = hist
return hist, nil
}
func (m *Metrics) GetHistogram(name string) syncfloat64.Histogram {
return m.histograms[name]
}

View file

@ -1,13 +1,15 @@
package builder
package sessions
import (
"log"
"openreplay/backend/pkg/handlers"
"time"
"openreplay/backend/internal/handlers"
. "openreplay/backend/pkg/messages"
)
type builder struct {
sessionID uint64
readyMsgs []Message
timestamp uint64
lastMessageID uint64
@ -16,8 +18,9 @@ type builder struct {
ended bool
}
func NewBuilder(handlers ...handlers.MessageProcessor) *builder {
func NewBuilder(sessionID uint64, handlers ...handlers.MessageProcessor) *builder {
return &builder{
sessionID: sessionID,
processors: handlers,
}
}
@ -41,18 +44,16 @@ func (b *builder) checkSessionEnd(message Message) {
func (b *builder) handleMessage(message Message, messageID uint64) {
if messageID < b.lastMessageID {
// May happen in case of duplicated messages in kafka (if `idempotence: false`)
log.Printf("skip message with wrong msgID, sessID: %d, msgID: %d, lastID: %d", b.sessionID, messageID, b.lastMessageID)
return
}
timestamp := GetTimestamp(message)
if timestamp == 0 {
// May happen in case of messages that are single-in-batch,
// e.g. SessionStart or RawErrorEvent (emitted by `integrations`).
// TODO: make timestamp system transparent;
log.Printf("skip message with empty timestamp, sessID: %d, msgID: %d, msgType: %d", b.sessionID, messageID, message.TypeID())
return
}
if timestamp < b.timestamp {
// Shouldn't happen after messageID check which is done above. TODO: log this case.
log.Printf("skip message with wrong timestamp, sessID: %d, msgID: %d, type: %d", b.sessionID, messageID, message.TypeID())
return
}

View file

@ -1,9 +1,9 @@
package builder
package sessions
import (
"openreplay/backend/pkg/handlers"
"time"
"openreplay/backend/internal/handlers"
. "openreplay/backend/pkg/messages"
)
@ -24,7 +24,7 @@ func NewBuilderMap(handlersFabric func() []handlers.MessageProcessor) *builderMa
func (m *builderMap) GetBuilder(sessionID uint64) *builder {
b := m.sessions[sessionID]
if b == nil {
b = NewBuilder(m.handlersFabric()...) // Should create new instances
b = NewBuilder(sessionID, m.handlersFabric()...) // Should create new instances
m.sessions[sessionID] = b
}
return b

View file

@ -93,6 +93,5 @@ func (r *Rewriter) RewriteURL(sessionID uint64, baseURL string, relativeURL stri
Host: r.assetsURL.Host,
Scheme: r.assetsURL.Scheme,
}
return u.String()
}

View file

@ -5,7 +5,9 @@ import (
"time"
"openreplay/backend/pkg/db/clickhouse"
. "openreplay/backend/pkg/db/types"
"openreplay/backend/pkg/env"
. "openreplay/backend/pkg/messages"
)
var ch *clickhouse.Connector
@ -25,39 +27,39 @@ func (si *Saver) InsertStats(session *Session, msg Message) error {
switch m := msg.(type) {
// Web
case *SessionEnd:
return si.pg.InsertWebSession(session)
return ch.InsertWebSession(session)
case *PerformanceTrackAggr:
return si.pg.InsertWebPerformanceTrackAggr(session, m)
return ch.InsertWebPerformanceTrackAggr(session, m)
case *ClickEvent:
return si.pg.InsertWebClickEvent(session, m)
return ch.InsertWebClickEvent(session, m)
case *InputEvent:
return si.pg.InsertWebInputEvent(session, m)
return ch.InsertWebInputEvent(session, m)
// Unique for Web
case *PageEvent:
si.pg.InsertWebPageEvent(session, m)
ch.InsertWebPageEvent(session, m)
case *ResourceEvent:
return si.pg.InsertWebResourceEvent(session, m)
return ch.InsertWebResourceEvent(session, m)
case *ErrorEvent:
return si.pg.InsertWebErrorEvent(session, m)
return ch.InsertWebErrorEvent(session, m)
case *LongTask:
return si.pg.InsertLongtask(session, m)
return ch.InsertLongtask(session, m)
// IOS
case *IOSSessionEnd:
return si.pg.InsertIOSSession(session)
return ch.InsertIOSSession(session)
case *IOSPerformanceAggregated:
return si.pg.InsertIOSPerformanceAggregated(session, m)
return ch.InsertIOSPerformanceAggregated(session, m)
case *IOSClickEvent:
return si.pg.InsertIOSClickEvent(session, m)
return ch.InsertIOSClickEvent(session, m)
case *IOSInputEvent:
return si.pg.InsertIOSInputEvent(session, m)
return ch.InsertIOSInputEvent(session, m)
// Unique for Web
case *IOSScreenEnter:
//ch.InsertIOSView(session, m)
case *IOSCrash:
return si.pg.InsertIOSCrash(session, m)
return ch.InsertIOSCrash(session, m)
case *IOSNetworkCall:
return si.pg.InsertIOSNetworkCall(session, m)
return ch.InsertIOSNetworkCall(session, m)
}
return nil
}

Some files were not shown because too many files have changed in this diff Show more