Merge pull request #453 from openreplay/sink_refactor

Sink refactor

* structure -> go standarts
* move URLrewrite to sink (free http from encoding-decoding)
This commit is contained in:
Alex K 2022-05-12 15:03:32 +02:00 committed by GitHub
commit 4f1a686787
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 61 additions and 35 deletions

View file

@ -23,7 +23,7 @@ function build_service() {
image="$1"
echo "BUILDING $image"
case "$image" in
http | db)
http | db | sink)
echo build http
docker build -t ${DOCKER_REPO:-'local'}/$image:${git_sha1} --platform linux/amd64 --build-arg SERVICE_NAME=$image -f ./cmd/Dockerfile .
[[ $PUSH_IMAGE -eq 1 ]] && {

View file

@ -9,37 +9,48 @@ import (
"os/signal"
"syscall"
"openreplay/backend/pkg/env"
"openreplay/backend/internal/assetscache"
"openreplay/backend/internal/config/sink"
"openreplay/backend/internal/oswriter"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/url/assets"
)
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
FS_DIR := env.String("FS_DIR")
if _, err := os.Stat(FS_DIR); os.IsNotExist(err) {
log.Fatalf("%v doesn't exist. %v", FS_DIR, err)
cfg := sink.New()
if _, err := os.Stat(cfg.FsDir); os.IsNotExist(err) {
log.Fatalf("%v doesn't exist. %v", cfg.FsDir, err)
}
writer := NewWriter(env.Uint16("FS_ULIMIT"), FS_DIR)
writer := oswriter.NewWriter(cfg.FsUlimit, cfg.FsDir)
producer := queue.NewProducer()
defer producer.Close(cfg.ProducerCloseTimeout)
rewriter := assets.NewRewriter(cfg.AssetsOrigin)
assetMessageHandler := assetscache.New(cfg, rewriter, producer)
count := 0
consumer := queue.NewMessageConsumer(
env.String("GROUP_SINK"),
cfg.GroupSink,
[]string{
env.String("TOPIC_RAW_WEB"),
env.String("TOPIC_RAW_IOS"),
cfg.TopicRawIOS,
cfg.TopicRawWeb,
},
func(sessionID uint64, message Message, _ *types.Meta) {
count++
typeID := message.TypeID()
if !IsReplayerType(typeID) {
return
}
count++
message = assetMessageHandler.ParseAssets(sessionID, message)
value := message.Encode()
var data []byte

View file

@ -1,19 +1,19 @@
package assetscache
import (
"openreplay/backend/internal/config"
"openreplay/backend/internal/config/sink"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/url/assets"
)
type AssetsCache struct {
cfg *config.Config
cfg *sink.Config
rewriter *assets.Rewriter
producer types.Producer
}
func New(cfg *config.Config, rewriter *assets.Rewriter, producer types.Producer) *AssetsCache {
func New(cfg *sink.Config, rewriter *assets.Rewriter, producer types.Producer) *AssetsCache {
return &AssetsCache{
cfg: cfg,
rewriter: rewriter,

View file

@ -11,12 +11,9 @@ type Config struct {
HTTPTimeout time.Duration
TopicRawWeb string
TopicRawIOS string
TopicCache string
CacheAssets bool
BeaconSizeLimit int64
JsonSizeLimit int64
FileSizeLimit int64
AssetsOrigin string
AWSRegion string
S3BucketIOSImages string
Postgres string
@ -33,12 +30,9 @@ func New() *Config {
HTTPTimeout: time.Second * 60,
TopicRawWeb: env.String("TOPIC_RAW_WEB"),
TopicRawIOS: env.String("TOPIC_RAW_IOS"),
TopicCache: env.String("TOPIC_CACHE"),
CacheAssets: env.Bool("CACHE_ASSETS"),
BeaconSizeLimit: int64(env.Uint64("BEACON_SIZE_LIMIT")),
JsonSizeLimit: 1e3, // 1Kb
FileSizeLimit: 1e7, // 10Mb
AssetsOrigin: env.String("ASSETS_ORIGIN"),
AWSRegion: env.String("AWS_REGION"),
S3BucketIOSImages: env.String("S3_BUCKET_IOS_IMAGES"),
Postgres: env.String("POSTGRES_STRING"),

View file

@ -0,0 +1,31 @@
package sink
import (
"openreplay/backend/pkg/env"
)
type Config struct {
FsDir string
FsUlimit uint16
GroupSink string
TopicRawWeb string
TopicRawIOS string
TopicCache string
CacheAssets bool
AssetsOrigin string
ProducerCloseTimeout int
}
func New() *Config {
return &Config{
FsDir: env.String("FS_DIR"),
FsUlimit: env.Uint16("FS_ULIMIT"),
GroupSink: env.String("GROUP_SINK"),
TopicRawWeb: env.String("TOPIC_RAW_WEB"),
TopicRawIOS: env.String("TOPIC_RAW_IOS"),
TopicCache: env.String("TOPIC_CACHE"),
CacheAssets: env.Bool("CACHE_ASSETS"),
AssetsOrigin: env.String("ASSETS_ORIGIN"),
ProducerCloseTimeout: 15000,
}
}

View file

@ -1,4 +1,4 @@
package main
package oswriter
import (
"math"

View file

@ -1,9 +1,9 @@
package router
import (
"bytes"
"encoding/json"
"errors"
"io/ioutil"
"log"
"math/rand"
"net/http"
@ -117,20 +117,15 @@ func (e *Router) pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request)
body := http.MaxBytesReader(w, r.Body, e.cfg.BeaconSizeLimit)
defer body.Close()
var handledMessages bytes.Buffer
// Process each message in request data
err = ReadBatchReader(body, func(msg Message) {
msg = e.services.Assets.ParseAssets(sessionData.ID, msg)
handledMessages.Write(msg.Encode())
})
bytes, err := ioutil.ReadAll(body)
if err != nil {
ResponseWithError(w, http.StatusForbidden, err)
ResponseWithError(w, http.StatusInternalServerError, err) // TODO: Split environments; send error here only on staging
return
}
// Send processed messages to queue as array of bytes
err = e.services.Producer.Produce(e.cfg.TopicRawWeb, sessionData.ID, handledMessages.Bytes())
// TODO: check bytes for nonsense crap
err = e.services.Producer.Produce(e.cfg.TopicRawWeb, sessionData.ID, bytes)
if err != nil {
log.Printf("can't send processed messages to queue: %s", err)
}

View file

@ -1,7 +1,6 @@
package services
import (
"openreplay/backend/internal/assetscache"
"openreplay/backend/internal/config"
"openreplay/backend/internal/geoip"
"openreplay/backend/internal/uaparser"
@ -10,13 +9,11 @@ import (
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/storage"
"openreplay/backend/pkg/token"
"openreplay/backend/pkg/url/assets"
)
type ServicesBuilder struct {
Database *cache.PGCache
Producer types.Producer
Assets *assetscache.AssetsCache
Flaker *flakeid.Flaker
UaParser *uaparser.UAParser
GeoIP *geoip.GeoIP
@ -25,11 +22,9 @@ type ServicesBuilder struct {
}
func New(cfg *config.Config, producer types.Producer, pgconn *cache.PGCache) *ServicesBuilder {
rewriter := assets.NewRewriter(cfg.AssetsOrigin)
return &ServicesBuilder{
Database: pgconn,
Producer: producer,
Assets: assetscache.New(cfg, rewriter, producer),
Storage: storage.NewS3(cfg.AWSRegion, cfg.S3BucketIOSImages),
Tokenizer: token.NewTokenizer(cfg.TokenSecret),
UaParser: uaparser.NewUAParser(cfg.UAParserFile),

View file