feat(canvases): improved performance

This commit is contained in:
Alexander 2025-03-06 14:12:49 +01:00
parent 6dc3dcfd4e
commit 4ebbfd3501
17 changed files with 138 additions and 81 deletions

View file

@ -116,7 +116,7 @@ RUN if [ "$SERVICE_NAME" = "http" ]; then \
wget https://static.openreplay.com/geoip/GeoLite2-City.mmdb -O "$MAXMINDDB_FILE"; \
elif [ "$SERVICE_NAME" = "imagestorage" ]; then \
apk add --no-cache zstd; \
elif [ "$SERVICE_NAME" = "canvas-handler" ]; then \
elif [ "$SERVICE_NAME" = "canvases" ]; then \
apk add --no-cache zstd; \
elif [ "$SERVICE_NAME" = "spot" ]; then \
apk add --no-cache ffmpeg; \

View file

@ -8,8 +8,8 @@ import (
"syscall"
"time"
"openreplay/backend/internal/canvas-handler"
config "openreplay/backend/internal/config/canvas-handler"
"openreplay/backend/internal/canvases"
config "openreplay/backend/internal/config/canvases"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/metrics"
@ -29,7 +29,10 @@ func main() {
log.Fatal(ctx, "can't init object storage: %s", err)
}
srv, err := canvas_handler.New(cfg, log, objStore)
producer := queue.NewProducer(cfg.MessageSizeLimit, true)
defer producer.Close(15000)
srv, err := canvases.New(cfg, log, objStore, producer)
if err != nil {
log.Fatal(ctx, "can't init canvas service: %s", err)
}
@ -38,6 +41,7 @@ func main() {
cfg.GroupCanvasImage,
[]string{
cfg.TopicCanvasImages,
cfg.TopicCanvasTrigger,
},
messages.NewImagesMessageIterator(func(data []byte, sessID uint64) {
isSessionEnd := func(data []byte) bool {
@ -55,14 +59,34 @@ func main() {
}
return true
}
isTriggerEvent := func(data []byte) (string, string, bool) {
reader := messages.NewBytesReader(data)
msgType, err := reader.ReadUint()
if err != nil {
return "", "", false
}
if msgType != messages.MsgCustomEvent {
return "", "", false
}
msg, err := messages.ReadMessage(msgType, reader)
if err != nil {
return "", "", false
}
customEvent := msg.(*messages.CustomEvent)
return customEvent.Payload, customEvent.Name, true
}
sessCtx := context.WithValue(context.Background(), "sessionID", sessID)
if isSessionEnd(data) {
if err := srv.PackSessionCanvases(sessCtx, sessID); err != nil {
if err := srv.PrepareSessionCanvases(sessCtx, sessID); err != nil {
if !strings.Contains(err.Error(), "no such file or directory") {
log.Error(sessCtx, "can't pack session's canvases: %s", err)
}
}
} else if path, name, ok := isTriggerEvent(data); ok {
if err := srv.ProcessSessionCanvas(sessCtx, sessID, path, name); err != nil {
log.Error(sessCtx, "can't process session's canvas: %s", err)
}
} else {
if err := srv.SaveCanvasToDisk(sessCtx, sessID, data); err != nil {
log.Error(sessCtx, "can't process canvas image: %s", err)

View file

@ -1,4 +1,4 @@
package canvas_handler
package canvases
import (
"bytes"
@ -12,10 +12,12 @@ import (
"strings"
"time"
config "openreplay/backend/internal/config/canvas-handler"
config "openreplay/backend/internal/config/canvases"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/objectstorage"
"openreplay/backend/pkg/pool"
"openreplay/backend/pkg/queue/types"
)
type ImageStorage struct {
@ -23,8 +25,10 @@ type ImageStorage struct {
log logger.Logger
basePath string
saverPool pool.WorkerPool
packerPool pool.WorkerPool
uploaderPool pool.WorkerPool
objStorage objectstorage.ObjectStorage
producer types.Producer
}
type saveTask struct {
@ -34,13 +38,20 @@ type saveTask struct {
image *bytes.Buffer
}
type packTask struct {
ctx context.Context
sessionID uint64
path string
name string
}
type uploadTask struct {
ctx context.Context
path string
name string
}
func New(cfg *config.Config, log logger.Logger, objStorage objectstorage.ObjectStorage) (*ImageStorage, error) {
func New(cfg *config.Config, log logger.Logger, objStorage objectstorage.ObjectStorage, producer types.Producer) (*ImageStorage, error) {
switch {
case cfg == nil:
return nil, fmt.Errorf("config is empty")
@ -54,9 +65,11 @@ func New(cfg *config.Config, log logger.Logger, objStorage objectstorage.ObjectS
log: log,
basePath: path,
objStorage: objStorage,
producer: producer,
}
s.saverPool = pool.NewPool(4, 8, s.writeToDisk)
s.uploaderPool = pool.NewPool(4, 8, s.sendToS3)
s.saverPool = pool.NewPool(2, 2, s.writeToDisk)
s.packerPool = pool.NewPool(8, 16, s.packCanvas)
s.uploaderPool = pool.NewPool(8, 16, s.sendToS3)
return s, nil
}
@ -101,7 +114,8 @@ func (v *ImageStorage) writeToDisk(payload interface{}) {
return
}
func (v *ImageStorage) PackSessionCanvases(ctx context.Context, sessID uint64) error {
func (v *ImageStorage) PrepareSessionCanvases(ctx context.Context, sessID uint64) error {
start := time.Now()
path := fmt.Sprintf("%s%d/", v.basePath, sessID)
// Check that the directory exists
@ -131,12 +145,33 @@ func (v *ImageStorage) PackSessionCanvases(ctx context.Context, sessID uint64) e
names[canvasID] = true
}
sessionID := strconv.FormatUint(sessID, 10)
for name := range names {
msg := &messages.CustomEvent{
Name: name,
Payload: path,
}
if err := v.producer.Produce(v.cfg.TopicCanvasTrigger, sessID, msg.Encode()); err != nil {
v.log.Error(ctx, "can't send canvas trigger: %s", err)
}
}
v.log.Info(ctx, "session canvases (%d) prepared in %.3fs, session: %d", len(names), time.Since(start).Seconds(), sessID)
return nil
}
func (v *ImageStorage) ProcessSessionCanvas(ctx context.Context, sessID uint64, path, name string) error {
v.packerPool.Submit(&packTask{ctx: ctx, sessionID: sessID, path: path, name: name})
return nil
}
func (v *ImageStorage) packCanvas(payload interface{}) {
task := payload.(*packTask)
start := time.Now()
sessionID := strconv.FormatUint(task.sessionID, 10)
// Save to archives
archPath := fmt.Sprintf("%s%s.tar.zst", path, name)
archPath := fmt.Sprintf("%s%s.tar.zst", task.path, task.name)
fullCmd := fmt.Sprintf("find %s -type f -name '%s*' ! -name '*.tar.zst' | tar -cf - --files-from=- | zstd -f -o %s",
path, name, archPath)
task.path, task.name, archPath)
cmd := exec.Command("sh", "-c", fullCmd)
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
@ -144,11 +179,10 @@ func (v *ImageStorage) PackSessionCanvases(ctx context.Context, sessID uint64) e
err := cmd.Run()
if err != nil {
return fmt.Errorf("failed to execute command, err: %s, stderr: %v", err, stderr.String())
v.log.Fatal(task.ctx, "failed to execute command, err: %s, stderr: %v", err, stderr.String())
}
v.uploaderPool.Submit(&uploadTask{ctx: ctx, path: archPath, name: sessionID + "/" + name + ".tar.zst"})
}
return nil
v.log.Info(task.ctx, "canvas packed successfully in %.3fs, session: %d", time.Since(start).Seconds(), task.sessionID)
v.uploaderPool.Submit(&uploadTask{ctx: task.ctx, path: archPath, name: sessionID + "/" + task.name + ".tar.zst"})
}
func (v *ImageStorage) sendToS3(payload interface{}) {
@ -161,6 +195,5 @@ func (v *ImageStorage) sendToS3(payload interface{}) {
if err := v.objStorage.Upload(bytes.NewReader(video), task.name, "application/octet-stream", objectstorage.NoContentEncoding, objectstorage.Zstd); err != nil {
v.log.Fatal(task.ctx, "failed to upload canvas to storage: %s", err)
}
v.log.Info(task.ctx, "replay file (size: %d) uploaded successfully in %v", len(video), time.Since(start))
return
v.log.Info(task.ctx, "replay file (size: %d) uploaded successfully in %.3fs", len(video), time.Since(start).Seconds())
}

View file

@ -1,4 +1,4 @@
package canvas_handler
package canvases
import (
"openreplay/backend/internal/config/common"
@ -12,8 +12,8 @@ type Config struct {
objectstorage.ObjectsConfig
FSDir string `env:"FS_DIR,required"`
CanvasDir string `env:"CANVAS_DIR,default=canvas"`
TopicCanvasImages string `env:"TOPIC_CANVAS_IMAGES,required"`
TopicCanvasTrigger string `env:"TOPIC_CANVAS_TRIGGER,required"`
TopicCanvasImages string `env:"TOPIC_CANVAS_IMAGES,required"` // For canvas images and sessionEnd events from ender
TopicCanvasTrigger string `env:"TOPIC_CANVAS_TRIGGER,required"` // For trigger events to start processing (archive and upload)
GroupCanvasImage string `env:"GROUP_CANVAS_IMAGE,required"`
UseProfiler bool `env:"PROFILER_ENABLED,default=false"`
}

View file

@ -1,15 +0,0 @@
apiVersion: v1
kind: Pod
metadata:
name: "{{ include "canvas-handler.fullname" . }}-test-connection"
labels:
{{- include "canvas-handler.labels" . | nindent 4 }}
annotations:
"helm.sh/hook": test
spec:
containers:
- name: wget
image: busybox
command: ['wget']
args: ['{{ include "canvas-handler.fullname" . }}:{{ .Values.service.port }}']
restartPolicy: Never

View file

@ -1,5 +1,5 @@
apiVersion: v2
name: canvas-handler
name: canvases
description: A Helm chart for Kubernetes
# A chart can be either an 'application' or a 'library' chart.

View file

@ -6,16 +6,16 @@
{{- end }}
{{- end }}
{{- else if contains "NodePort" .Values.service.type }}
export NODE_PORT=$(kubectl get --namespace {{ .Release.Namespace }} -o jsonpath="{.spec.ports[0].nodePort}" services {{ include "canvas-handler.fullname" . }})
export NODE_PORT=$(kubectl get --namespace {{ .Release.Namespace }} -o jsonpath="{.spec.ports[0].nodePort}" services {{ include "canvases.fullname" . }})
export NODE_IP=$(kubectl get nodes --namespace {{ .Release.Namespace }} -o jsonpath="{.items[0].status.addresses[0].address}")
echo http://$NODE_IP:$NODE_PORT
{{- else if contains "LoadBalancer" .Values.service.type }}
NOTE: It may take a few minutes for the LoadBalancer IP to be available.
You can watch the status of by running 'kubectl get --namespace {{ .Release.Namespace }} svc -w {{ include "canvas-handler.fullname" . }}'
export SERVICE_IP=$(kubectl get svc --namespace {{ .Release.Namespace }} {{ include "canvas-handler.fullname" . }} --template "{{"{{ range (index .status.loadBalancer.ingress 0) }}{{.}}{{ end }}"}}")
You can watch the status of by running 'kubectl get --namespace {{ .Release.Namespace }} svc -w {{ include "canvases.fullname" . }}'
export SERVICE_IP=$(kubectl get svc --namespace {{ .Release.Namespace }} {{ include "canvases.fullname" . }} --template "{{"{{ range (index .status.loadBalancer.ingress 0) }}{{.}}{{ end }}"}}")
echo http://$SERVICE_IP:{{ .Values.service.port }}
{{- else if contains "ClusterIP" .Values.service.type }}
export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "canvas-handler.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}")
export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "canvases.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}")
export CONTAINER_PORT=$(kubectl get pod --namespace {{ .Release.Namespace }} $POD_NAME -o jsonpath="{.spec.containers[0].ports[0].containerPort}")
echo "Visit http://127.0.0.1:8080 to use your application"
kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8080:$CONTAINER_PORT

View file

@ -1,7 +1,7 @@
{{/*
Expand the name of the chart.
*/}}
{{- define "canvas-handler.name" -}}
{{- define "canvases.name" -}}
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }}
{{- end }}
@ -10,7 +10,7 @@ Create a default fully qualified app name.
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
If release name contains chart name it will be used as a full name.
*/}}
{{- define "canvas-handler.fullname" -}}
{{- define "canvases.fullname" -}}
{{- if .Values.fullnameOverride }}
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }}
{{- else }}
@ -26,16 +26,16 @@ If release name contains chart name it will be used as a full name.
{{/*
Create chart name and version as used by the chart label.
*/}}
{{- define "canvas-handler.chart" -}}
{{- define "canvases.chart" -}}
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }}
{{- end }}
{{/*
Common labels
*/}}
{{- define "canvas-handler.labels" -}}
helm.sh/chart: {{ include "canvas-handler.chart" . }}
{{ include "canvas-handler.selectorLabels" . }}
{{- define "canvases.labels" -}}
helm.sh/chart: {{ include "canvases.chart" . }}
{{ include "canvases.selectorLabels" . }}
{{- if .Chart.AppVersion }}
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
{{- end }}
@ -48,17 +48,17 @@ app.kubernetes.io/managed-by: {{ .Release.Service }}
{{/*
Selector labels
*/}}
{{- define "canvas-handler.selectorLabels" -}}
app.kubernetes.io/name: {{ include "canvas-handler.name" . }}
{{- define "canvases.selectorLabels" -}}
app.kubernetes.io/name: {{ include "canvases.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- end }}
{{/*
Create the name of the service account to use
*/}}
{{- define "canvas-handler.serviceAccountName" -}}
{{- define "canvases.serviceAccountName" -}}
{{- if .Values.serviceAccount.create }}
{{- default (include "canvas-handler.fullname" .) .Values.serviceAccount.name }}
{{- default (include "canvases.fullname" .) .Values.serviceAccount.name }}
{{- else }}
{{- default "default" .Values.serviceAccount.name }}
{{- end }}

View file

@ -1,17 +1,17 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "canvas-handler.fullname" . }}
name: {{ include "canvases.fullname" . }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "canvas-handler.labels" . | nindent 4 }}
{{- include "canvases.labels" . | nindent 4 }}
spec:
{{- if not .Values.autoscaling.enabled }}
replicas: {{ .Values.replicaCount }}
{{- end }}
selector:
matchLabels:
{{- include "canvas-handler.selectorLabels" . | nindent 6 }}
{{- include "canvases.selectorLabels" . | nindent 6 }}
template:
metadata:
{{- with .Values.podAnnotations }}
@ -19,13 +19,13 @@ spec:
{{- toYaml . | nindent 8 }}
{{- end }}
labels:
{{- include "canvas-handler.selectorLabels" . | nindent 8 }}
{{- include "canvases.selectorLabels" . | nindent 8 }}
spec:
{{- with .Values.imagePullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}
serviceAccountName: {{ include "canvas-handler.serviceAccountName" . }}
serviceAccountName: {{ include "canvases.serviceAccountName" . }}
securityContext:
{{- toYaml .Values.podSecurityContext | nindent 8 }}
shareProcessNamespace: true

View file

@ -2,15 +2,15 @@
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: {{ include "canvas-handler.fullname" . }}
name: {{ include "canvases.fullname" . }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "canvas-handler.labels" . | nindent 4 }}
{{- include "canvases.labels" . | nindent 4 }}
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: {{ include "canvas-handler.fullname" . }}
name: {{ include "canvases.fullname" . }}
minReplicas: {{ .Values.autoscaling.minReplicas }}
maxReplicas: {{ .Values.autoscaling.maxReplicas }}
metrics:

View file

@ -1,5 +1,5 @@
{{- if .Values.ingress.enabled -}}
{{- $fullName := include "canvas-handler.fullname" . -}}
{{- $fullName := include "canvases.fullname" . -}}
{{- $svcPort := .Values.service.ports.http -}}
{{- if and .Values.ingress.className (not (semverCompare ">=1.18-0" .Capabilities.KubeVersion.GitVersion)) }}
{{- if not (hasKey .Values.ingress.annotations "kubernetes.io/ingress.class") }}
@ -18,7 +18,7 @@ metadata:
name: {{ $fullName }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "canvas-handler.labels" . | nindent 4 }}
{{- include "canvases.labels" . | nindent 4 }}
{{- with .Values.ingress.annotations }}
annotations:
{{- toYaml . | nindent 4 }}

View file

@ -1,10 +1,10 @@
apiVersion: v1
kind: Service
metadata:
name: {{ include "canvas-handler.fullname" . }}
name: {{ include "canvases.fullname" . }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "canvas-handler.labels" . | nindent 4 }}
{{- include "canvases.labels" . | nindent 4 }}
spec:
type: {{ .Values.service.type }}
ports:
@ -15,4 +15,4 @@ spec:
name: {{ $key }}
{{- end}}
selector:
{{- include "canvas-handler.selectorLabels" . | nindent 4 }}
{{- include "canvases.selectorLabels" . | nindent 4 }}

View file

@ -2,10 +2,10 @@
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: {{ include "canvas-handler.fullname" . }}
name: {{ include "canvases.fullname" . }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "canvas-handler.labels" . | nindent 4 }}
{{- include "canvases.labels" . | nindent 4 }}
{{- if .Values.serviceMonitor.additionalLabels }}
{{- toYaml .Values.serviceMonitor.additionalLabels | nindent 4 }}
{{- end }}
@ -14,5 +14,5 @@ spec:
{{- .Values.serviceMonitor.scrapeConfigs | toYaml | nindent 4 }}
selector:
matchLabels:
{{- include "canvas-handler.selectorLabels" . | nindent 6 }}
{{- include "canvases.selectorLabels" . | nindent 6 }}
{{- end }}

View file

@ -2,10 +2,10 @@
apiVersion: v1
kind: ServiceAccount
metadata:
name: {{ include "canvas-handler.serviceAccountName" . }}
name: {{ include "canvases.serviceAccountName" . }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "canvas-handler.labels" . | nindent 4 }}
{{- include "canvases.labels" . | nindent 4 }}
{{- with .Values.serviceAccount.annotations }}
annotations:
{{- toYaml . | nindent 4 }}

View file

@ -0,0 +1,15 @@
apiVersion: v1
kind: Pod
metadata:
name: "{{ include "canvases.fullname" . }}-test-connection"
labels:
{{- include "canvases.labels" . | nindent 4 }}
annotations:
"helm.sh/hook": test
spec:
containers:
- name: wget
image: busybox
command: ['wget']
args: ['{{ include "canvases.fullname" . }}:{{ .Values.service.port }}']
restartPolicy: Never

View file

@ -5,14 +5,14 @@
replicaCount: 1
image:
repository: "{{ .Values.global.openReplayContainerRegistry }}/canvas-handler"
repository: "{{ .Values.global.openReplayContainerRegistry }}/canvases"
pullPolicy: IfNotPresent
# Overrides the image tag whose default is the chart appVersion.
tag: ""
imagePullSecrets: []
nameOverride: "canvas-handler"
fullnameOverride: "canvas-handler-openreplay"
nameOverride: "canvases"
fullnameOverride: "canvases-openreplay"
serviceAccount:
# Specifies whether a service account should be created