Canvas support (#1711)

* feat(http): added new parameters to start response and new endpoint for canvas screenshorts

* fix(http): added new topic to dockerfile

* feat(http): try different multipart parser

* feat(image-storage): reused the same workflow for canvas topic handler

* feat(video-storage): new canvas parser and ffmpeg script

* feat(video-storage): use correct replay name for canvas

* feat(backend): added new message (CanvasNode)

* feat(backend): add canvas record events to db

* feat(backend): implemented missing new method for azure

---------

Co-authored-by: Alexander <zavorotynskiy@pm.me>
This commit is contained in:
ⵄⵎⵉⵔⵓⵛ 2023-11-27 16:22:03 +01:00 committed by GitHub
parent f321fccc11
commit 6ae3ee1927
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 1823 additions and 1501 deletions

View file

@ -56,6 +56,7 @@ ENV TZ=UTC \
TOPIC_ANALYTICS=analytics \
TOPIC_TRIGGER=trigger \
TOPIC_MOBILE_TRIGGER=mobile-trigger \
TOPIC_CANVAS_IMAGES=canvas-images \
GROUP_SINK=sink \
GROUP_STORAGE=storage \
GROUP_DB=db \

View file

@ -60,7 +60,7 @@ func main() {
messages.MsgFetch, messages.MsgNetworkRequest, messages.MsgGraphQL, messages.MsgStateAction, messages.MsgMouseClick,
messages.MsgSetPageLocation, messages.MsgPageLoadTiming, messages.MsgPageRenderTiming,
messages.MsgPageEvent, messages.MsgMouseThrashing, messages.MsgInputChange,
messages.MsgUnbindNodes,
messages.MsgUnbindNodes, messages.MsgCanvasNode,
// Mobile messages
messages.MsgIOSSessionStart, messages.MsgIOSSessionEnd, messages.MsgIOSUserID, messages.MsgIOSUserAnonymousID,
messages.MsgIOSMetadata, messages.MsgIOSEvent, messages.MsgIOSNetworkCall,

View file

@ -43,6 +43,20 @@ func main() {
cfg.MessageSizeLimit,
)
canvasConsumer := queue.NewConsumer(
cfg.GroupImageStorage,
[]string{
cfg.TopicCanvasImages,
},
messages.NewImagesMessageIterator(func(data []byte, sessID uint64) {
if err := srv.ProcessCanvas(sessID, data); err != nil {
log.Printf("can't process canvas image: %s", err)
}
}, nil, true),
false,
cfg.MessageSizeLimit,
)
log.Printf("Image storage service started\n")
sigchan := make(chan os.Signal, 1)
@ -61,6 +75,9 @@ func main() {
if err := consumer.Commit(); err != nil {
log.Printf("can't commit messages: %s", err)
}
if err := canvasConsumer.Commit(); err != nil {
log.Printf("can't commit messages: %s", err)
}
case msg := <-consumer.Rebalanced():
log.Println(msg)
default:
@ -68,6 +85,10 @@ func main() {
if err != nil {
log.Fatalf("Error on images consumption: %v", err)
}
err = canvasConsumer.ConsumeNext()
if err != nil {
log.Fatalf("Error on images consumption: %v", err)
}
}
}
}

View file

@ -46,7 +46,7 @@ func main() {
func(msg messages.Message) {
sesEnd := msg.(*messages.IOSSessionEnd)
log.Printf("recieved mobile session end: %d", sesEnd.SessionID())
if err := srv.Process(sesEnd.SessionID(), workDir+"/screenshots/"+strconv.FormatUint(sesEnd.SessionID(), 10)+"/"); err != nil {
if err := srv.Process(sesEnd.SessionID(), workDir+"/screenshots/"+strconv.FormatUint(sesEnd.SessionID(), 10)+"/", false); err != nil {
log.Printf("upload session err: %s, sessID: %d", err, msg.SessionID())
}
},
@ -57,6 +57,26 @@ func main() {
cfg.MessageSizeLimit,
)
canvasConsumer := queue.NewConsumer(
cfg.GroupVideoStorage,
[]string{
cfg.TopicTrigger,
},
messages.NewMessageIterator(
func(msg messages.Message) {
sesEnd := msg.(*messages.SessionEnd)
log.Printf("recieved session end: %d, let's check canvas", sesEnd.SessionID())
if err := srv.Process(sesEnd.SessionID(), workDir+"/canvas/"+strconv.FormatUint(sesEnd.SessionID(), 10)+"/", true); err != nil {
log.Printf("upload session err: %s, sessID: %d", err, msg.SessionID())
}
},
[]int{messages.MsgSessionEnd},
true,
),
false,
cfg.MessageSizeLimit,
)
log.Printf("Video storage service started\n")
sigchan := make(chan os.Signal, 1)
@ -75,6 +95,9 @@ func main() {
if err := consumer.Commit(); err != nil {
log.Printf("can't commit messages: %s", err)
}
if err := canvasConsumer.Commit(); err != nil {
log.Printf("can't commit messages: %s", err)
}
case msg := <-consumer.Rebalanced():
log.Println(msg)
default:
@ -82,6 +105,10 @@ func main() {
if err != nil {
log.Fatalf("Error on end event consumption: %v", err)
}
err = canvasConsumer.ConsumeNext()
if err != nil {
log.Fatalf("Error on end event consumption: %v", err)
}
}
}
}

View file

@ -20,6 +20,7 @@ type Config struct {
TopicRawWeb string `env:"TOPIC_RAW_WEB,required"`
TopicRawIOS string `env:"TOPIC_RAW_IOS,required"`
TopicRawImages string `env:"TOPIC_RAW_IMAGES,required"`
TopicCanvasImages string `env:"TOPIC_CANVAS_IMAGES,required"`
BeaconSizeLimit int64 `env:"BEACON_SIZE_LIMIT,required"`
CompressionThreshold int64 `env:"COMPRESSION_THRESHOLD,default=20000"`
JsonSizeLimit int64 `env:"JSON_SIZE_LIMIT,default=1000"`

View file

@ -9,7 +9,9 @@ type Config struct {
common.Config
FSDir string `env:"FS_DIR,required"`
ScreenshotsDir string `env:"SCREENSHOTS_DIR,default=screenshots"`
CanvasDir string `env:"CANVAS_DIR,default=canvas"`
TopicRawImages string `env:"TOPIC_RAW_IMAGES,required"`
TopicCanvasImages string `env:"TOPIC_CANVAS_IMAGES,required"`
GroupImageStorage string `env:"GROUP_IMAGE_STORAGE,required"`
UseProfiler bool `env:"PROFILER_ENABLED,default=false"`
}

View file

@ -12,6 +12,7 @@ type Config struct {
FSDir string `env:"FS_DIR,required"`
GroupVideoStorage string `env:"GROUP_VIDEO_STORAGE,required"`
TopicMobileTrigger string `env:"TOPIC_MOBILE_TRIGGER,required"`
TopicTrigger string `env:"TOPIC_TRIGGER,required"`
VideoReplayFPS int `env:"VIDEO_REPLAY_FPS,default=3"`
UseProfiler bool `env:"PROFILER_ENABLED,default=false"`
}

View file

@ -190,6 +190,10 @@ func (s *saverImpl) handleMessage(msg Message) error {
return err
}
return s.sessions.UpdateIssuesStats(session.SessionID, 0, 50)
case *CanvasNode:
if err = s.pg.InsertCanvasNode(session, m); err != nil {
return err
}
}
return nil
}

View file

@ -6,9 +6,11 @@ import (
"fmt"
"github.com/gorilla/mux"
"io"
"io/ioutil"
"log"
"math/rand"
"net/http"
"openreplay/backend/internal/http/util"
"openreplay/backend/pkg/featureflags"
"openreplay/backend/pkg/sessions"
"openreplay/backend/pkg/uxtesting"
@ -224,6 +226,9 @@ func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request)
CompressionThreshold: e.getCompressionThreshold(),
StartTimestamp: int64(flakeid.ExtractTimestamp(tokenData.ID)),
Delay: tokenData.Delay,
CanvasEnabled: true, // keep it in project settings
CanvasImageQuality: "medium",
CanvasFrameRate: 2,
}, startTime, r.URL.Path, bodySize)
}
@ -488,3 +493,76 @@ func (e *Router) getUXUploadUrl(w http.ResponseWriter, r *http.Request) {
}
ResponseWithJSON(w, &UrlResponse{URL: url}, startTime, r.URL.Path, bodySize)
}
type ScreenshotMessage struct {
Name string
Data []byte
}
func (e *Router) imagesUploaderHandlerWeb(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r)
if err != nil { // Should accept expired token?
ResponseWithError(w, http.StatusUnauthorized, err, startTime, r.URL.Path, 0)
return
}
if r.Body == nil {
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"), startTime, r.URL.Path, 0)
return
}
r.Body = http.MaxBytesReader(w, r.Body, e.cfg.FileSizeLimit)
defer r.Body.Close()
// Parse the multipart form
err = r.ParseMultipartForm(10 << 20) // Max upload size 10 MB
if err == http.ErrNotMultipart || err == http.ErrMissingBoundary {
ResponseWithError(w, http.StatusUnsupportedMediaType, err, startTime, r.URL.Path, 0)
return
} else if err != nil {
ResponseWithError(w, http.StatusInternalServerError, err, startTime, r.URL.Path, 0) // TODO: send error here only on staging
return
}
// Iterate over uploaded files
for _, fileHeaderList := range r.MultipartForm.File {
for _, fileHeader := range fileHeaderList {
file, err := fileHeader.Open()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Read the file content
fileBytes, err := ioutil.ReadAll(file)
if err != nil {
file.Close()
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
file.Close()
fileName := util.SafeString(fileHeader.Filename)
log.Printf("fileName: %s, fileSize: %d", fileName, len(fileBytes))
// Create a message to send to Kafka
msg := ScreenshotMessage{
Name: fileName,
Data: fileBytes,
}
data, err := json.Marshal(&msg)
if err != nil {
log.Printf("can't marshal screenshot message, err: %s", err)
continue
}
// Send the message to queue
if err := e.services.Producer.Produce(e.cfg.TopicCanvasImages, sessionData.ID, data); err != nil {
log.Printf("failed to produce canvas image message: %v", err)
}
}
}
ResponseOK(w, startTime, r.URL.Path, 0)
}

View file

@ -31,6 +31,9 @@ type StartSessionResponse struct {
ProjectID string `json:"projectID"`
BeaconSizeLimit int64 `json:"beaconSizeLimit"`
CompressionThreshold int64 `json:"compressionThreshold"`
CanvasEnabled bool `json:"canvasEnabled"` // false default
CanvasImageQuality string `json:"canvasQuality"` // low | medium | high
CanvasFrameRate int `json:"canvasFPS"` // 2 default
}
type NotStartedRequest struct {

View file

@ -105,6 +105,7 @@ func (e *Router) init() {
"/v1/web/start": e.startSessionHandlerWeb,
"/v1/web/i": e.pushMessagesHandlerWeb,
"/v1/web/feature-flags": e.featureFlagsHandlerWeb,
"/v1/web/images": e.imagesUploaderHandlerWeb,
"/v1/mobile/start": e.startSessionHandlerIOS,
"/v1/mobile/i": e.pushMessagesHandlerIOS,
"/v1/mobile/late": e.pushLateMessagesHandlerIOS,

View file

@ -3,6 +3,7 @@ package imagestorage
import (
"archive/tar"
"bytes"
"encoding/json"
"fmt"
"io"
"log"
@ -14,9 +15,17 @@ import (
config "openreplay/backend/internal/config/imagestorage"
)
type ImageType uint8
const (
screenshot ImageType = iota
canvas
)
type Task struct {
sessionID uint64 // to generate path
images map[string]*bytes.Buffer
imageType ImageType
isBreakTask bool
}
@ -60,6 +69,22 @@ func (v *ImageStorage) Process(sessID uint64, data []byte) error {
return nil
}
type ScreenshotMessage struct {
Name string
Data []byte
}
func (v *ImageStorage) ProcessCanvas(sessID uint64, data []byte) error {
var msg = &ScreenshotMessage{}
if err := json.Unmarshal(data, msg); err != nil {
log.Printf("can't parse canvas message, err: %s", err)
}
// Use the same workflow
v.writeToDiskTasks <- &Task{sessionID: sessID, images: map[string]*bytes.Buffer{msg.Name: bytes.NewBuffer(msg.Data)}, imageType: canvas}
log.Printf("new canvas image, sessID: %d, name: %s, size: %d mb", sessID, msg.Name, len(msg.Data)/1024/1024)
return nil
}
func (v *ImageStorage) extract(sessID uint64, data []byte) error {
images := make(map[string]*bytes.Buffer)
uncompressedStream, err := gzip.NewReader(bytes.NewReader(data))
@ -88,16 +113,23 @@ func (v *ImageStorage) extract(sessID uint64, data []byte) error {
}
}
v.writeToDiskTasks <- &Task{sessionID: sessID, images: images}
v.writeToDiskTasks <- &Task{sessionID: sessID, images: images, imageType: screenshot}
return nil
}
func (v *ImageStorage) writeToDisk(task *Task) {
// Build the directory path
path := v.cfg.FSDir + "/"
if v.cfg.ScreenshotsDir != "" {
path += v.cfg.ScreenshotsDir + "/"
if task.imageType == screenshot {
if v.cfg.ScreenshotsDir != "" {
path += v.cfg.ScreenshotsDir + "/"
}
} else {
if v.cfg.CanvasDir != "" {
path += v.cfg.CanvasDir + "/"
}
}
path += strconv.FormatUint(task.sessionID, 10) + "/"
// Ensure the directory exists

View file

@ -7,14 +7,18 @@ import (
"log"
config "openreplay/backend/internal/config/videostorage"
"openreplay/backend/pkg/objectstorage"
"os"
"os/exec"
"sort"
"strconv"
"strings"
"time"
)
type Task struct {
sessionID string
path string
name string
isBreakTask bool
}
@ -62,6 +66,7 @@ func (v *VideoStorage) makeVideo(sessID uint64, filesPath string) error {
cmd := exec.Command("ffmpeg", "-y", "-f", "image2", "-framerate", v.framerate, "-start_number", "000000", "-i",
imagesPath, "-vf", "scale=-2:1064", "-c:v", "libx264", "-preset", "ultrafast", "-crf", "23",
videoPath)
// ffmpeg -y -f concat -safe 0 -i 1699978964098_29-list -vf --pix_fmt yuv420p -preset ultrafast canvas.mp4
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
@ -76,6 +81,84 @@ func (v *VideoStorage) makeVideo(sessID uint64, filesPath string) error {
return nil
}
func (v *VideoStorage) makeCanvasVideo(sessID uint64, filesPath string) error {
files, err := ioutil.ReadDir(filesPath)
if err != nil {
return err
}
if len(files) == 0 {
return nil
}
log.Printf("There are %d canvas images of session %d\n", len(files), 0)
type canvasData struct {
files map[int]string
times []int
}
images := make(map[string]*canvasData)
for _, file := range files {
name := strings.Split(file.Name(), ".")
parts := strings.Split(name[0], "_")
if len(name) != 2 || len(parts) != 3 {
log.Printf("unknown file name: %s, skipping", file.Name())
continue
}
canvasID := fmt.Sprintf("%s_%s", parts[0], parts[1])
canvasTS, _ := strconv.Atoi(parts[2])
log.Printf("%s : %d", canvasID, canvasTS)
if _, ok := images[canvasID]; !ok {
images[canvasID] = &canvasData{
files: make(map[int]string),
times: make([]int, 0),
}
}
images[canvasID].files[canvasTS] = file.Name()
images[canvasID].times = append(images[canvasID].times, canvasTS)
}
for name, cData := range images {
// Write to file
mixList := fmt.Sprintf("%s/%s-list", filesPath, name)
outputFile, err := os.Create(mixList)
if err != nil {
log.Printf("can't create mix list, err: %s", err)
continue
}
sort.Ints(cData.times)
for i := 0; i < len(cData.times)-1; i++ {
dur := float64(cData.times[i+1]-cData.times[i]) / 1000.0
line := fmt.Sprintf("file %s\nduration %.3f\n", cData.files[cData.times[i]], dur)
_, err := outputFile.WriteString(line)
if err != nil {
outputFile.Close()
log.Printf("%s", err)
continue
}
log.Printf(line)
}
outputFile.Close()
// Run ffmpeg to build video
start := time.Now()
sessionID := strconv.FormatUint(sessID, 10)
videoPath := fmt.Sprintf("%s/%s.mp4", filesPath, name)
cmd := exec.Command("ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", mixList, "-vsync", "vfr",
"-pix_fmt", "yuv420p", "-preset", "ultrafast", videoPath)
// ffmpeg -f concat -safe 0 -i input.txt -vsync vfr -pix_fmt yuv420p output.mp4
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
err = cmd.Run()
if err != nil {
log.Fatalf("Failed to execute command: %v, stderr: %v", err, stderr.String())
}
log.Printf("made video replay in %v", time.Since(start))
v.sendToS3Tasks <- &Task{sessionID: sessionID, path: videoPath, name: "/" + name + ".mp4"}
}
return nil
}
func (v *VideoStorage) sendToS3(task *Task) {
start := time.Now()
// Read video file from disk
@ -84,14 +167,23 @@ func (v *VideoStorage) sendToS3(task *Task) {
log.Fatalf("Failed to read video file: %v", err)
}
// Upload video file to S3
if err := v.objStorage.Upload(bytes.NewReader(video), task.sessionID+"/replay.mp4", "video/mp4", objectstorage.NoCompression); err != nil {
key := task.sessionID
if task.name != "" {
key += task.name
} else {
key += "/replay.mp4"
}
if err := v.objStorage.Upload(bytes.NewReader(video), key, "video/mp4", objectstorage.NoCompression); err != nil {
log.Fatalf("Storage: start upload video replay failed. %s", err)
}
log.Printf("Video file (size: %d) uploaded successfully in %v", len(video), time.Since(start))
return
}
func (v *VideoStorage) Process(sessID uint64, filesPath string) error {
func (v *VideoStorage) Process(sessID uint64, filesPath string, isCanvas bool) error {
if isCanvas {
return v.makeCanvasVideo(sessID, filesPath)
}
return v.makeVideo(sessID, filesPath)
}

View file

@ -29,6 +29,7 @@ type BulkSet struct {
webCustomEvents Bulk
webClickEvents Bulk
webNetworkRequest Bulk
webCanvasNodes Bulk
workerTask chan *bulksTask
done chan struct{}
finished chan struct{}
@ -76,6 +77,8 @@ func (conn *BulkSet) Get(name string) Bulk {
return conn.webClickEvents
case "webNetworkRequest":
return conn.webNetworkRequest
case "canvasNodes":
return conn.webCanvasNodes
default:
return nil
}
@ -199,6 +202,14 @@ func (conn *BulkSet) initBulks() {
if err != nil {
log.Fatalf("can't create webNetworkRequest bulk: %s", err)
}
conn.webCanvasNodes, err = NewBulk(conn.c,
"events.canvas_recordings",
"(session_id, recording_id, timestamp)",
"($%d, $%d, $%d)",
3, 200)
if err != nil {
log.Fatalf("can't create webCanvasNodes bulk: %s", err)
}
}
func (conn *BulkSet) Send() {
@ -219,6 +230,7 @@ func (conn *BulkSet) Send() {
newTask.bulks = append(newTask.bulks, conn.webCustomEvents)
newTask.bulks = append(newTask.bulks, conn.webClickEvents)
newTask.bulks = append(newTask.bulks, conn.webNetworkRequest)
newTask.bulks = append(newTask.bulks, conn.webCanvasNodes)
conn.workerTask <- newTask

View file

@ -211,6 +211,14 @@ func (conn *Conn) InsertMouseThrashing(sess *sessions.Session, e *messages.Mouse
return nil
}
func (conn *Conn) InsertCanvasNode(sess *sessions.Session, m *messages.CanvasNode) error {
canvasID := fmt.Sprintf("%d_%s", m.Timestamp, m.NodeId)
if err := conn.bulks.Get("canvasNodes").Append(sess.SessionID, canvasID, m.Timestamp); err != nil {
log.Printf("insert canvas node %s to db, err: %s", canvasID, err)
}
return nil
}
func (conn *Conn) InsertWebStatsPerformance(p *messages.PerformanceTrackAggr) error {
sessionID := p.SessionID()
timestamp := (p.TimestampEnd + p.TimestampStart) / 2

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -5,6 +5,8 @@ import (
"context"
"errors"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
"io"
"log"
"os"
@ -20,7 +22,9 @@ import (
type storageImpl struct {
client *azblob.Client
cred *azblob.SharedKeyCredential
container string
account string
tags map[string]string
}
@ -39,7 +43,9 @@ func NewStorage(cfg *config.ObjectsConfig) (objectstorage.ObjectStorage, error)
}
return &storageImpl{
client: client,
cred: cred,
container: cfg.BucketName,
account: cfg.AzureAccountName,
tags: loadFileTag(),
}, nil
}
@ -116,6 +122,24 @@ func (s *storageImpl) GetCreationTime(key string) *time.Time {
return get.LastModified
}
func (s *storageImpl) GetPreSignedUploadUrl(key string) (string, error) {
// Set the desired SAS permissions and options for uploading
sasQueryParams, err := sas.BlobSignatureValues{
Protocol: sas.ProtocolHTTPS,
StartTime: time.Now().UTC(),
ExpiryTime: time.Now().UTC().Add(time.Hour),
Permissions: to.Ptr(sas.BlobPermissions{Read: true, Create: true, Write: true, Tag: true}).String(),
ContainerName: s.container,
BlobName: key,
}.SignWithSharedKey(s.cred)
if err != nil {
return "", err
}
sasURL := fmt.Sprintf("https://%s.blob.core.windows.net/?%s", s.account, sasQueryParams.Encode())
return sasURL, nil
}
func loadFileTag() map[string]string {
// Load file tag from env
key := "retention"