diff --git a/backend/build.sh b/backend/build.sh index 84af1a919..a4d95341a 100755 --- a/backend/build.sh +++ b/backend/build.sh @@ -23,7 +23,7 @@ function build_service() { image="$1" echo "BUILDING $image" case "$image" in - http | db) + http | db | sink) echo build http docker build -t ${DOCKER_REPO:-'local'}/$image:${git_sha1} --platform linux/amd64 --build-arg SERVICE_NAME=$image -f ./cmd/Dockerfile . [[ $PUSH_IMAGE -eq 1 ]] && { diff --git a/backend/services/sink/main.go b/backend/cmd/sink/main.go similarity index 69% rename from backend/services/sink/main.go rename to backend/cmd/sink/main.go index 1a23919c6..269b8fab0 100644 --- a/backend/services/sink/main.go +++ b/backend/cmd/sink/main.go @@ -9,37 +9,48 @@ import ( "os/signal" "syscall" - "openreplay/backend/pkg/env" + "openreplay/backend/internal/assetscache" + "openreplay/backend/internal/config/sink" + "openreplay/backend/internal/oswriter" . "openreplay/backend/pkg/messages" "openreplay/backend/pkg/queue" "openreplay/backend/pkg/queue/types" + "openreplay/backend/pkg/url/assets" ) func main() { log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) - FS_DIR := env.String("FS_DIR") - if _, err := os.Stat(FS_DIR); os.IsNotExist(err) { - log.Fatalf("%v doesn't exist. %v", FS_DIR, err) + cfg := sink.New() + + if _, err := os.Stat(cfg.FsDir); os.IsNotExist(err) { + log.Fatalf("%v doesn't exist. %v", cfg.FsDir, err) } - writer := NewWriter(env.Uint16("FS_ULIMIT"), FS_DIR) + writer := oswriter.NewWriter(cfg.FsUlimit, cfg.FsDir) + + producer := queue.NewProducer() + defer producer.Close(cfg.ProducerCloseTimeout) + rewriter := assets.NewRewriter(cfg.AssetsOrigin) + assetMessageHandler := assetscache.New(cfg, rewriter, producer) count := 0 consumer := queue.NewMessageConsumer( - env.String("GROUP_SINK"), + cfg.GroupSink, []string{ - env.String("TOPIC_RAW_WEB"), - env.String("TOPIC_RAW_IOS"), + cfg.TopicRawIOS, + cfg.TopicRawWeb, }, func(sessionID uint64, message Message, _ *types.Meta) { + count++ + typeID := message.TypeID() if !IsReplayerType(typeID) { return } - count++ + message = assetMessageHandler.ParseAssets(sessionID, message) value := message.Encode() var data []byte diff --git a/backend/internal/assetscache/assets.go b/backend/internal/assetscache/assets.go index 1ef70b56c..cf76bed8c 100644 --- a/backend/internal/assetscache/assets.go +++ b/backend/internal/assetscache/assets.go @@ -1,19 +1,19 @@ package assetscache import ( - "openreplay/backend/internal/config" + "openreplay/backend/internal/config/sink" "openreplay/backend/pkg/messages" "openreplay/backend/pkg/queue/types" "openreplay/backend/pkg/url/assets" ) type AssetsCache struct { - cfg *config.Config + cfg *sink.Config rewriter *assets.Rewriter producer types.Producer } -func New(cfg *config.Config, rewriter *assets.Rewriter, producer types.Producer) *AssetsCache { +func New(cfg *sink.Config, rewriter *assets.Rewriter, producer types.Producer) *AssetsCache { return &AssetsCache{ cfg: cfg, rewriter: rewriter, diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index 5b55ba346..b98f910c2 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -11,12 +11,9 @@ type Config struct { HTTPTimeout time.Duration TopicRawWeb string TopicRawIOS string - TopicCache string - CacheAssets bool BeaconSizeLimit int64 JsonSizeLimit int64 FileSizeLimit int64 - AssetsOrigin string AWSRegion string S3BucketIOSImages string Postgres string @@ -33,12 +30,9 @@ func New() *Config { HTTPTimeout: time.Second * 60, TopicRawWeb: env.String("TOPIC_RAW_WEB"), TopicRawIOS: env.String("TOPIC_RAW_IOS"), - TopicCache: env.String("TOPIC_CACHE"), - CacheAssets: env.Bool("CACHE_ASSETS"), BeaconSizeLimit: int64(env.Uint64("BEACON_SIZE_LIMIT")), JsonSizeLimit: 1e3, // 1Kb FileSizeLimit: 1e7, // 10Mb - AssetsOrigin: env.String("ASSETS_ORIGIN"), AWSRegion: env.String("AWS_REGION"), S3BucketIOSImages: env.String("S3_BUCKET_IOS_IMAGES"), Postgres: env.String("POSTGRES_STRING"), diff --git a/backend/internal/config/sink/config.go b/backend/internal/config/sink/config.go new file mode 100644 index 000000000..a78bfba63 --- /dev/null +++ b/backend/internal/config/sink/config.go @@ -0,0 +1,31 @@ +package sink + +import ( + "openreplay/backend/pkg/env" +) + +type Config struct { + FsDir string + FsUlimit uint16 + GroupSink string + TopicRawWeb string + TopicRawIOS string + TopicCache string + CacheAssets bool + AssetsOrigin string + ProducerCloseTimeout int +} + +func New() *Config { + return &Config{ + FsDir: env.String("FS_DIR"), + FsUlimit: env.Uint16("FS_ULIMIT"), + GroupSink: env.String("GROUP_SINK"), + TopicRawWeb: env.String("TOPIC_RAW_WEB"), + TopicRawIOS: env.String("TOPIC_RAW_IOS"), + TopicCache: env.String("TOPIC_CACHE"), + CacheAssets: env.Bool("CACHE_ASSETS"), + AssetsOrigin: env.String("ASSETS_ORIGIN"), + ProducerCloseTimeout: 15000, + } +} diff --git a/backend/services/sink/writer.go b/backend/internal/oswriter/oswriter.go similarity index 99% rename from backend/services/sink/writer.go rename to backend/internal/oswriter/oswriter.go index 6fcfdddbc..839e61eba 100644 --- a/backend/services/sink/writer.go +++ b/backend/internal/oswriter/oswriter.go @@ -1,4 +1,4 @@ -package main +package oswriter import ( "math" diff --git a/backend/internal/router/handlers-web.go b/backend/internal/router/handlers-web.go index b73a6d534..13301d97e 100644 --- a/backend/internal/router/handlers-web.go +++ b/backend/internal/router/handlers-web.go @@ -1,9 +1,9 @@ package router import ( - "bytes" "encoding/json" "errors" + "io/ioutil" "log" "math/rand" "net/http" @@ -117,20 +117,15 @@ func (e *Router) pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request) body := http.MaxBytesReader(w, r.Body, e.cfg.BeaconSizeLimit) defer body.Close() - var handledMessages bytes.Buffer - - // Process each message in request data - err = ReadBatchReader(body, func(msg Message) { - msg = e.services.Assets.ParseAssets(sessionData.ID, msg) - handledMessages.Write(msg.Encode()) - }) + bytes, err := ioutil.ReadAll(body) if err != nil { - ResponseWithError(w, http.StatusForbidden, err) + ResponseWithError(w, http.StatusInternalServerError, err) // TODO: Split environments; send error here only on staging return } // Send processed messages to queue as array of bytes - err = e.services.Producer.Produce(e.cfg.TopicRawWeb, sessionData.ID, handledMessages.Bytes()) + // TODO: check bytes for nonsense crap + err = e.services.Producer.Produce(e.cfg.TopicRawWeb, sessionData.ID, bytes) if err != nil { log.Printf("can't send processed messages to queue: %s", err) } diff --git a/backend/internal/services/services.go b/backend/internal/services/services.go index 5b84e1dfb..c9915c78e 100644 --- a/backend/internal/services/services.go +++ b/backend/internal/services/services.go @@ -1,7 +1,6 @@ package services import ( - "openreplay/backend/internal/assetscache" "openreplay/backend/internal/config" "openreplay/backend/internal/geoip" "openreplay/backend/internal/uaparser" @@ -10,13 +9,11 @@ import ( "openreplay/backend/pkg/queue/types" "openreplay/backend/pkg/storage" "openreplay/backend/pkg/token" - "openreplay/backend/pkg/url/assets" ) type ServicesBuilder struct { Database *cache.PGCache Producer types.Producer - Assets *assetscache.AssetsCache Flaker *flakeid.Flaker UaParser *uaparser.UAParser GeoIP *geoip.GeoIP @@ -25,11 +22,9 @@ type ServicesBuilder struct { } func New(cfg *config.Config, producer types.Producer, pgconn *cache.PGCache) *ServicesBuilder { - rewriter := assets.NewRewriter(cfg.AssetsOrigin) return &ServicesBuilder{ Database: pgconn, Producer: producer, - Assets: assetscache.New(cfg, rewriter, producer), Storage: storage.NewS3(cfg.AWSRegion, cfg.S3BucketIOSImages), Tokenizer: token.NewTokenizer(cfg.TokenSecret), UaParser: uaparser.NewUAParser(cfg.UAParserFile), diff --git a/backend/services/sink/build_hack b/backend/services/sink/build_hack new file mode 100644 index 000000000..e69de29bb