From ae4c6e5cad212bf4b107d25005f30191d5e915ee Mon Sep 17 00:00:00 2001 From: Alex Kaminskii Date: Sat, 7 May 2022 23:52:48 +0200 Subject: [PATCH 1/4] refactor(backend-sink): go go standarts --- backend/{services => cmd}/sink/main.go | 18 ++++++++------- backend/internal/config/sink/config.go | 23 +++++++++++++++++++ .../oswriter/oswriter.go} | 2 +- backend/services/sink/build_hack | 0 4 files changed, 34 insertions(+), 9 deletions(-) rename backend/{services => cmd}/sink/main.go (82%) create mode 100644 backend/internal/config/sink/config.go rename backend/{services/sink/writer.go => internal/oswriter/oswriter.go} (99%) create mode 100644 backend/services/sink/build_hack diff --git a/backend/services/sink/main.go b/backend/cmd/sink/main.go similarity index 82% rename from backend/services/sink/main.go rename to backend/cmd/sink/main.go index 1a23919c6..fcab40321 100644 --- a/backend/services/sink/main.go +++ b/backend/cmd/sink/main.go @@ -9,7 +9,8 @@ import ( "os/signal" "syscall" - "openreplay/backend/pkg/env" + "openreplay/backend/internal/config/sink" + "openreplay/backend/internal/oswriter" . "openreplay/backend/pkg/messages" "openreplay/backend/pkg/queue" "openreplay/backend/pkg/queue/types" @@ -18,20 +19,21 @@ import ( func main() { log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) - FS_DIR := env.String("FS_DIR") - if _, err := os.Stat(FS_DIR); os.IsNotExist(err) { - log.Fatalf("%v doesn't exist. %v", FS_DIR, err) + cfg := sink.New() + + if _, err := os.Stat(cfg.FsDir); os.IsNotExist(err) { + log.Fatalf("%v doesn't exist. %v", cfg.FsDir, err) } - writer := NewWriter(env.Uint16("FS_ULIMIT"), FS_DIR) + writer := oswriter.NewWriter(cfg.FsUlimit, cfg.FsDir) count := 0 consumer := queue.NewMessageConsumer( - env.String("GROUP_SINK"), + cfg.GroupSink, []string{ - env.String("TOPIC_RAW_WEB"), - env.String("TOPIC_RAW_IOS"), + cfg.TopicRawIOS, + cfg.TopicRawWeb, }, func(sessionID uint64, message Message, _ *types.Meta) { typeID := message.TypeID() diff --git a/backend/internal/config/sink/config.go b/backend/internal/config/sink/config.go new file mode 100644 index 000000000..4837e86e3 --- /dev/null +++ b/backend/internal/config/sink/config.go @@ -0,0 +1,23 @@ +package sink + +import ( + "openreplay/backend/pkg/env" +) + +type Config struct { + FsDir string + FsUlimit uint16 + GroupSink string + TopicRawWeb string + TopicRawIOS string +} + +func New() *Config { + return &Config{ + FsDir: env.String("FS_DIR"), + FsUlimit: env.Uint16("FS_ULIMIT"), + GroupSink: env.String("GROUP_SINK"), + TopicRawWeb: env.String("TOPIC_RAW_WEB"), + TopicRawIOS: env.String("TOPIC_RAW_IOS"), + } +} diff --git a/backend/services/sink/writer.go b/backend/internal/oswriter/oswriter.go similarity index 99% rename from backend/services/sink/writer.go rename to backend/internal/oswriter/oswriter.go index 6fcfdddbc..839e61eba 100644 --- a/backend/services/sink/writer.go +++ b/backend/internal/oswriter/oswriter.go @@ -1,4 +1,4 @@ -package main +package oswriter import ( "math" diff --git a/backend/services/sink/build_hack b/backend/services/sink/build_hack new file mode 100644 index 000000000..e69de29bb From ea2d13dac6a93b6e57454ad379237f641afecf90 Mon Sep 17 00:00:00 2001 From: Alex Kaminskii Date: Wed, 11 May 2022 16:27:01 +0200 Subject: [PATCH 2/4] chore(backend-sink): sink in cmd --- backend/build.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/build.sh b/backend/build.sh index 84af1a919..a4d95341a 100755 --- a/backend/build.sh +++ b/backend/build.sh @@ -23,7 +23,7 @@ function build_service() { image="$1" echo "BUILDING $image" case "$image" in - http | db) + http | db | sink) echo build http docker build -t ${DOCKER_REPO:-'local'}/$image:${git_sha1} --platform linux/amd64 --build-arg SERVICE_NAME=$image -f ./cmd/Dockerfile . [[ $PUSH_IMAGE -eq 1 ]] && { From a241830e719dced02f29d0cef6ca0ed22a8bcf53 Mon Sep 17 00:00:00 2001 From: Alex Kaminskii Date: Wed, 11 May 2022 16:32:27 +0200 Subject: [PATCH 3/4] refactor(backend-sink/http): move URLrewriter to sink --- backend/cmd/http/main.go | 7 ++++--- backend/cmd/sink/main.go | 11 ++++++++++- backend/internal/assetscache/assets.go | 6 +++--- backend/internal/config/config.go | 6 ------ backend/internal/config/sink/config.go | 26 +++++++++++++++---------- backend/internal/router/handlers-web.go | 15 +++++--------- backend/internal/services/services.go | 5 ----- 7 files changed, 38 insertions(+), 38 deletions(-) diff --git a/backend/cmd/http/main.go b/backend/cmd/http/main.go index 541baab62..3fdd8d34a 100644 --- a/backend/cmd/http/main.go +++ b/backend/cmd/http/main.go @@ -2,6 +2,10 @@ package main import ( "log" + "os" + "os/signal" + "syscall" + "openreplay/backend/internal/config" "openreplay/backend/internal/router" "openreplay/backend/internal/server" @@ -10,9 +14,6 @@ import ( "openreplay/backend/pkg/db/postgres" "openreplay/backend/pkg/pprof" "openreplay/backend/pkg/queue" - "os" - "os/signal" - "syscall" ) func main() { diff --git a/backend/cmd/sink/main.go b/backend/cmd/sink/main.go index fcab40321..25969146c 100644 --- a/backend/cmd/sink/main.go +++ b/backend/cmd/sink/main.go @@ -9,11 +9,13 @@ import ( "os/signal" "syscall" + "openreplay/backend/internal/assetscache" "openreplay/backend/internal/config/sink" "openreplay/backend/internal/oswriter" . "openreplay/backend/pkg/messages" "openreplay/backend/pkg/queue" "openreplay/backend/pkg/queue/types" + "openreplay/backend/pkg/url/assets" ) func main() { @@ -27,6 +29,11 @@ func main() { writer := oswriter.NewWriter(cfg.FsUlimit, cfg.FsDir) + producer := queue.NewProducer() + defer producer.Close(15000) + rewriter := assets.NewRewriter(cfg.AssetsOrigin) + assetMessageHandler := assetscache.New(cfg, rewriter, producer) + count := 0 consumer := queue.NewMessageConsumer( @@ -36,12 +43,14 @@ func main() { cfg.TopicRawWeb, }, func(sessionID uint64, message Message, _ *types.Meta) { + count++ + typeID := message.TypeID() if !IsReplayerType(typeID) { return } - count++ + message = assetMessageHandler.ParseAssets(sessionID, message) value := message.Encode() var data []byte diff --git a/backend/internal/assetscache/assets.go b/backend/internal/assetscache/assets.go index 1ef70b56c..cf76bed8c 100644 --- a/backend/internal/assetscache/assets.go +++ b/backend/internal/assetscache/assets.go @@ -1,19 +1,19 @@ package assetscache import ( - "openreplay/backend/internal/config" + "openreplay/backend/internal/config/sink" "openreplay/backend/pkg/messages" "openreplay/backend/pkg/queue/types" "openreplay/backend/pkg/url/assets" ) type AssetsCache struct { - cfg *config.Config + cfg *sink.Config rewriter *assets.Rewriter producer types.Producer } -func New(cfg *config.Config, rewriter *assets.Rewriter, producer types.Producer) *AssetsCache { +func New(cfg *sink.Config, rewriter *assets.Rewriter, producer types.Producer) *AssetsCache { return &AssetsCache{ cfg: cfg, rewriter: rewriter, diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index 5b55ba346..b98f910c2 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -11,12 +11,9 @@ type Config struct { HTTPTimeout time.Duration TopicRawWeb string TopicRawIOS string - TopicCache string - CacheAssets bool BeaconSizeLimit int64 JsonSizeLimit int64 FileSizeLimit int64 - AssetsOrigin string AWSRegion string S3BucketIOSImages string Postgres string @@ -33,12 +30,9 @@ func New() *Config { HTTPTimeout: time.Second * 60, TopicRawWeb: env.String("TOPIC_RAW_WEB"), TopicRawIOS: env.String("TOPIC_RAW_IOS"), - TopicCache: env.String("TOPIC_CACHE"), - CacheAssets: env.Bool("CACHE_ASSETS"), BeaconSizeLimit: int64(env.Uint64("BEACON_SIZE_LIMIT")), JsonSizeLimit: 1e3, // 1Kb FileSizeLimit: 1e7, // 10Mb - AssetsOrigin: env.String("ASSETS_ORIGIN"), AWSRegion: env.String("AWS_REGION"), S3BucketIOSImages: env.String("S3_BUCKET_IOS_IMAGES"), Postgres: env.String("POSTGRES_STRING"), diff --git a/backend/internal/config/sink/config.go b/backend/internal/config/sink/config.go index 4837e86e3..be939df95 100644 --- a/backend/internal/config/sink/config.go +++ b/backend/internal/config/sink/config.go @@ -5,19 +5,25 @@ import ( ) type Config struct { - FsDir string - FsUlimit uint16 - GroupSink string - TopicRawWeb string - TopicRawIOS string + FsDir string + FsUlimit uint16 + GroupSink string + TopicRawWeb string + TopicRawIOS string + TopicCache string + CacheAssets bool + AssetsOrigin string } func New() *Config { return &Config{ - FsDir: env.String("FS_DIR"), - FsUlimit: env.Uint16("FS_ULIMIT"), - GroupSink: env.String("GROUP_SINK"), - TopicRawWeb: env.String("TOPIC_RAW_WEB"), - TopicRawIOS: env.String("TOPIC_RAW_IOS"), + FsDir: env.String("FS_DIR"), + FsUlimit: env.Uint16("FS_ULIMIT"), + GroupSink: env.String("GROUP_SINK"), + TopicRawWeb: env.String("TOPIC_RAW_WEB"), + TopicRawIOS: env.String("TOPIC_RAW_IOS"), + TopicCache: env.String("TOPIC_CACHE"), + CacheAssets: env.Bool("CACHE_ASSETS"), + AssetsOrigin: env.String("ASSETS_ORIGIN"), } } diff --git a/backend/internal/router/handlers-web.go b/backend/internal/router/handlers-web.go index fc7c6421d..798348bdf 100644 --- a/backend/internal/router/handlers-web.go +++ b/backend/internal/router/handlers-web.go @@ -1,9 +1,9 @@ package router import ( - "bytes" "encoding/json" "errors" + "io/ioutil" "log" "math/rand" "net/http" @@ -117,20 +117,15 @@ func (e *Router) pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request) body := http.MaxBytesReader(w, r.Body, e.cfg.BeaconSizeLimit) defer body.Close() - var handledMessages bytes.Buffer - - // Process each message in request data - err = ReadBatchReader(body, func(msg Message) { - msg = e.services.Assets.ParseAssets(sessionData.ID, msg) - handledMessages.Write(msg.Encode()) - }) + bytes, err := ioutil.ReadAll(body) if err != nil { - ResponseWithError(w, http.StatusForbidden, err) + ResponseWithError(w, http.StatusInternalServerError, err) // TODO: Split environments; send error here only on staging return } // Send processed messages to queue as array of bytes - err = e.services.Producer.Produce(e.cfg.TopicRawWeb, sessionData.ID, handledMessages.Bytes()) + // TODO: check bytes for nonsense crap + err = e.services.Producer.Produce(e.cfg.TopicRawWeb, sessionData.ID, bytes) if err != nil { log.Printf("can't send processed messages to queue: %s", err) } diff --git a/backend/internal/services/services.go b/backend/internal/services/services.go index 5b84e1dfb..c9915c78e 100644 --- a/backend/internal/services/services.go +++ b/backend/internal/services/services.go @@ -1,7 +1,6 @@ package services import ( - "openreplay/backend/internal/assetscache" "openreplay/backend/internal/config" "openreplay/backend/internal/geoip" "openreplay/backend/internal/uaparser" @@ -10,13 +9,11 @@ import ( "openreplay/backend/pkg/queue/types" "openreplay/backend/pkg/storage" "openreplay/backend/pkg/token" - "openreplay/backend/pkg/url/assets" ) type ServicesBuilder struct { Database *cache.PGCache Producer types.Producer - Assets *assetscache.AssetsCache Flaker *flakeid.Flaker UaParser *uaparser.UAParser GeoIP *geoip.GeoIP @@ -25,11 +22,9 @@ type ServicesBuilder struct { } func New(cfg *config.Config, producer types.Producer, pgconn *cache.PGCache) *ServicesBuilder { - rewriter := assets.NewRewriter(cfg.AssetsOrigin) return &ServicesBuilder{ Database: pgconn, Producer: producer, - Assets: assetscache.New(cfg, rewriter, producer), Storage: storage.NewS3(cfg.AWSRegion, cfg.S3BucketIOSImages), Tokenizer: token.NewTokenizer(cfg.TokenSecret), UaParser: uaparser.NewUAParser(cfg.UAParserFile), From 396f1a16af569b4c7dbf0e63b08bdbf7ccaabf44 Mon Sep 17 00:00:00 2001 From: Alex Kaminskii Date: Wed, 11 May 2022 17:36:35 +0200 Subject: [PATCH 4/4] refactor(backend-sink): producer close timeout value to config --- backend/cmd/sink/main.go | 2 +- backend/internal/config/sink/config.go | 34 ++++++++++++++------------ 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/backend/cmd/sink/main.go b/backend/cmd/sink/main.go index 25969146c..269b8fab0 100644 --- a/backend/cmd/sink/main.go +++ b/backend/cmd/sink/main.go @@ -30,7 +30,7 @@ func main() { writer := oswriter.NewWriter(cfg.FsUlimit, cfg.FsDir) producer := queue.NewProducer() - defer producer.Close(15000) + defer producer.Close(cfg.ProducerCloseTimeout) rewriter := assets.NewRewriter(cfg.AssetsOrigin) assetMessageHandler := assetscache.New(cfg, rewriter, producer) diff --git a/backend/internal/config/sink/config.go b/backend/internal/config/sink/config.go index be939df95..a78bfba63 100644 --- a/backend/internal/config/sink/config.go +++ b/backend/internal/config/sink/config.go @@ -5,25 +5,27 @@ import ( ) type Config struct { - FsDir string - FsUlimit uint16 - GroupSink string - TopicRawWeb string - TopicRawIOS string - TopicCache string - CacheAssets bool - AssetsOrigin string + FsDir string + FsUlimit uint16 + GroupSink string + TopicRawWeb string + TopicRawIOS string + TopicCache string + CacheAssets bool + AssetsOrigin string + ProducerCloseTimeout int } func New() *Config { return &Config{ - FsDir: env.String("FS_DIR"), - FsUlimit: env.Uint16("FS_ULIMIT"), - GroupSink: env.String("GROUP_SINK"), - TopicRawWeb: env.String("TOPIC_RAW_WEB"), - TopicRawIOS: env.String("TOPIC_RAW_IOS"), - TopicCache: env.String("TOPIC_CACHE"), - CacheAssets: env.Bool("CACHE_ASSETS"), - AssetsOrigin: env.String("ASSETS_ORIGIN"), + FsDir: env.String("FS_DIR"), + FsUlimit: env.Uint16("FS_ULIMIT"), + GroupSink: env.String("GROUP_SINK"), + TopicRawWeb: env.String("TOPIC_RAW_WEB"), + TopicRawIOS: env.String("TOPIC_RAW_IOS"), + TopicCache: env.String("TOPIC_CACHE"), + CacheAssets: env.Bool("CACHE_ASSETS"), + AssetsOrigin: env.String("ASSETS_ORIGIN"), + ProducerCloseTimeout: 15000, } }