diff --git a/backend/cmd/http/main.go b/backend/cmd/http/main.go index 541baab62..3fdd8d34a 100644 --- a/backend/cmd/http/main.go +++ b/backend/cmd/http/main.go @@ -2,6 +2,10 @@ package main import ( "log" + "os" + "os/signal" + "syscall" + "openreplay/backend/internal/config" "openreplay/backend/internal/router" "openreplay/backend/internal/server" @@ -10,9 +14,6 @@ import ( "openreplay/backend/pkg/db/postgres" "openreplay/backend/pkg/pprof" "openreplay/backend/pkg/queue" - "os" - "os/signal" - "syscall" ) func main() { diff --git a/backend/cmd/sink/main.go b/backend/cmd/sink/main.go index fcab40321..25969146c 100644 --- a/backend/cmd/sink/main.go +++ b/backend/cmd/sink/main.go @@ -9,11 +9,13 @@ import ( "os/signal" "syscall" + "openreplay/backend/internal/assetscache" "openreplay/backend/internal/config/sink" "openreplay/backend/internal/oswriter" . "openreplay/backend/pkg/messages" "openreplay/backend/pkg/queue" "openreplay/backend/pkg/queue/types" + "openreplay/backend/pkg/url/assets" ) func main() { @@ -27,6 +29,11 @@ func main() { writer := oswriter.NewWriter(cfg.FsUlimit, cfg.FsDir) + producer := queue.NewProducer() + defer producer.Close(15000) + rewriter := assets.NewRewriter(cfg.AssetsOrigin) + assetMessageHandler := assetscache.New(cfg, rewriter, producer) + count := 0 consumer := queue.NewMessageConsumer( @@ -36,12 +43,14 @@ func main() { cfg.TopicRawWeb, }, func(sessionID uint64, message Message, _ *types.Meta) { + count++ + typeID := message.TypeID() if !IsReplayerType(typeID) { return } - count++ + message = assetMessageHandler.ParseAssets(sessionID, message) value := message.Encode() var data []byte diff --git a/backend/internal/assetscache/assets.go b/backend/internal/assetscache/assets.go index 1ef70b56c..cf76bed8c 100644 --- a/backend/internal/assetscache/assets.go +++ b/backend/internal/assetscache/assets.go @@ -1,19 +1,19 @@ package assetscache import ( - "openreplay/backend/internal/config" + "openreplay/backend/internal/config/sink" "openreplay/backend/pkg/messages" "openreplay/backend/pkg/queue/types" "openreplay/backend/pkg/url/assets" ) type AssetsCache struct { - cfg *config.Config + cfg *sink.Config rewriter *assets.Rewriter producer types.Producer } -func New(cfg *config.Config, rewriter *assets.Rewriter, producer types.Producer) *AssetsCache { +func New(cfg *sink.Config, rewriter *assets.Rewriter, producer types.Producer) *AssetsCache { return &AssetsCache{ cfg: cfg, rewriter: rewriter, diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index 5b55ba346..b98f910c2 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -11,12 +11,9 @@ type Config struct { HTTPTimeout time.Duration TopicRawWeb string TopicRawIOS string - TopicCache string - CacheAssets bool BeaconSizeLimit int64 JsonSizeLimit int64 FileSizeLimit int64 - AssetsOrigin string AWSRegion string S3BucketIOSImages string Postgres string @@ -33,12 +30,9 @@ func New() *Config { HTTPTimeout: time.Second * 60, TopicRawWeb: env.String("TOPIC_RAW_WEB"), TopicRawIOS: env.String("TOPIC_RAW_IOS"), - TopicCache: env.String("TOPIC_CACHE"), - CacheAssets: env.Bool("CACHE_ASSETS"), BeaconSizeLimit: int64(env.Uint64("BEACON_SIZE_LIMIT")), JsonSizeLimit: 1e3, // 1Kb FileSizeLimit: 1e7, // 10Mb - AssetsOrigin: env.String("ASSETS_ORIGIN"), AWSRegion: env.String("AWS_REGION"), S3BucketIOSImages: env.String("S3_BUCKET_IOS_IMAGES"), Postgres: env.String("POSTGRES_STRING"), diff --git a/backend/internal/config/sink/config.go b/backend/internal/config/sink/config.go index 4837e86e3..be939df95 100644 --- a/backend/internal/config/sink/config.go +++ b/backend/internal/config/sink/config.go @@ -5,19 +5,25 @@ import ( ) type Config struct { - FsDir string - FsUlimit uint16 - GroupSink string - TopicRawWeb string - TopicRawIOS string + FsDir string + FsUlimit uint16 + GroupSink string + TopicRawWeb string + TopicRawIOS string + TopicCache string + CacheAssets bool + AssetsOrigin string } func New() *Config { return &Config{ - FsDir: env.String("FS_DIR"), - FsUlimit: env.Uint16("FS_ULIMIT"), - GroupSink: env.String("GROUP_SINK"), - TopicRawWeb: env.String("TOPIC_RAW_WEB"), - TopicRawIOS: env.String("TOPIC_RAW_IOS"), + FsDir: env.String("FS_DIR"), + FsUlimit: env.Uint16("FS_ULIMIT"), + GroupSink: env.String("GROUP_SINK"), + TopicRawWeb: env.String("TOPIC_RAW_WEB"), + TopicRawIOS: env.String("TOPIC_RAW_IOS"), + TopicCache: env.String("TOPIC_CACHE"), + CacheAssets: env.Bool("CACHE_ASSETS"), + AssetsOrigin: env.String("ASSETS_ORIGIN"), } } diff --git a/backend/internal/router/handlers-web.go b/backend/internal/router/handlers-web.go index fc7c6421d..798348bdf 100644 --- a/backend/internal/router/handlers-web.go +++ b/backend/internal/router/handlers-web.go @@ -1,9 +1,9 @@ package router import ( - "bytes" "encoding/json" "errors" + "io/ioutil" "log" "math/rand" "net/http" @@ -117,20 +117,15 @@ func (e *Router) pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request) body := http.MaxBytesReader(w, r.Body, e.cfg.BeaconSizeLimit) defer body.Close() - var handledMessages bytes.Buffer - - // Process each message in request data - err = ReadBatchReader(body, func(msg Message) { - msg = e.services.Assets.ParseAssets(sessionData.ID, msg) - handledMessages.Write(msg.Encode()) - }) + bytes, err := ioutil.ReadAll(body) if err != nil { - ResponseWithError(w, http.StatusForbidden, err) + ResponseWithError(w, http.StatusInternalServerError, err) // TODO: Split environments; send error here only on staging return } // Send processed messages to queue as array of bytes - err = e.services.Producer.Produce(e.cfg.TopicRawWeb, sessionData.ID, handledMessages.Bytes()) + // TODO: check bytes for nonsense crap + err = e.services.Producer.Produce(e.cfg.TopicRawWeb, sessionData.ID, bytes) if err != nil { log.Printf("can't send processed messages to queue: %s", err) } diff --git a/backend/internal/services/services.go b/backend/internal/services/services.go index 5b84e1dfb..c9915c78e 100644 --- a/backend/internal/services/services.go +++ b/backend/internal/services/services.go @@ -1,7 +1,6 @@ package services import ( - "openreplay/backend/internal/assetscache" "openreplay/backend/internal/config" "openreplay/backend/internal/geoip" "openreplay/backend/internal/uaparser" @@ -10,13 +9,11 @@ import ( "openreplay/backend/pkg/queue/types" "openreplay/backend/pkg/storage" "openreplay/backend/pkg/token" - "openreplay/backend/pkg/url/assets" ) type ServicesBuilder struct { Database *cache.PGCache Producer types.Producer - Assets *assetscache.AssetsCache Flaker *flakeid.Flaker UaParser *uaparser.UAParser GeoIP *geoip.GeoIP @@ -25,11 +22,9 @@ type ServicesBuilder struct { } func New(cfg *config.Config, producer types.Producer, pgconn *cache.PGCache) *ServicesBuilder { - rewriter := assets.NewRewriter(cfg.AssetsOrigin) return &ServicesBuilder{ Database: pgconn, Producer: producer, - Assets: assetscache.New(cfg, rewriter, producer), Storage: storage.NewS3(cfg.AWSRegion, cfg.S3BucketIOSImages), Tokenizer: token.NewTokenizer(cfg.TokenSecret), UaParser: uaparser.NewUAParser(cfg.UAParserFile),