refactor(backend-sink/http): move URLrewriter to sink
This commit is contained in:
parent
ea2d13dac6
commit
a241830e71
7 changed files with 38 additions and 38 deletions
|
|
@ -2,6 +2,10 @@ package main
|
|||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"openreplay/backend/internal/config"
|
||||
"openreplay/backend/internal/router"
|
||||
"openreplay/backend/internal/server"
|
||||
|
|
@ -10,9 +14,6 @@ import (
|
|||
"openreplay/backend/pkg/db/postgres"
|
||||
"openreplay/backend/pkg/pprof"
|
||||
"openreplay/backend/pkg/queue"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
|
|
|||
|
|
@ -9,11 +9,13 @@ import (
|
|||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"openreplay/backend/internal/assetscache"
|
||||
"openreplay/backend/internal/config/sink"
|
||||
"openreplay/backend/internal/oswriter"
|
||||
. "openreplay/backend/pkg/messages"
|
||||
"openreplay/backend/pkg/queue"
|
||||
"openreplay/backend/pkg/queue/types"
|
||||
"openreplay/backend/pkg/url/assets"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
|
@ -27,6 +29,11 @@ func main() {
|
|||
|
||||
writer := oswriter.NewWriter(cfg.FsUlimit, cfg.FsDir)
|
||||
|
||||
producer := queue.NewProducer()
|
||||
defer producer.Close(15000)
|
||||
rewriter := assets.NewRewriter(cfg.AssetsOrigin)
|
||||
assetMessageHandler := assetscache.New(cfg, rewriter, producer)
|
||||
|
||||
count := 0
|
||||
|
||||
consumer := queue.NewMessageConsumer(
|
||||
|
|
@ -36,12 +43,14 @@ func main() {
|
|||
cfg.TopicRawWeb,
|
||||
},
|
||||
func(sessionID uint64, message Message, _ *types.Meta) {
|
||||
count++
|
||||
|
||||
typeID := message.TypeID()
|
||||
if !IsReplayerType(typeID) {
|
||||
return
|
||||
}
|
||||
|
||||
count++
|
||||
message = assetMessageHandler.ParseAssets(sessionID, message)
|
||||
|
||||
value := message.Encode()
|
||||
var data []byte
|
||||
|
|
|
|||
|
|
@ -1,19 +1,19 @@
|
|||
package assetscache
|
||||
|
||||
import (
|
||||
"openreplay/backend/internal/config"
|
||||
"openreplay/backend/internal/config/sink"
|
||||
"openreplay/backend/pkg/messages"
|
||||
"openreplay/backend/pkg/queue/types"
|
||||
"openreplay/backend/pkg/url/assets"
|
||||
)
|
||||
|
||||
type AssetsCache struct {
|
||||
cfg *config.Config
|
||||
cfg *sink.Config
|
||||
rewriter *assets.Rewriter
|
||||
producer types.Producer
|
||||
}
|
||||
|
||||
func New(cfg *config.Config, rewriter *assets.Rewriter, producer types.Producer) *AssetsCache {
|
||||
func New(cfg *sink.Config, rewriter *assets.Rewriter, producer types.Producer) *AssetsCache {
|
||||
return &AssetsCache{
|
||||
cfg: cfg,
|
||||
rewriter: rewriter,
|
||||
|
|
|
|||
|
|
@ -11,12 +11,9 @@ type Config struct {
|
|||
HTTPTimeout time.Duration
|
||||
TopicRawWeb string
|
||||
TopicRawIOS string
|
||||
TopicCache string
|
||||
CacheAssets bool
|
||||
BeaconSizeLimit int64
|
||||
JsonSizeLimit int64
|
||||
FileSizeLimit int64
|
||||
AssetsOrigin string
|
||||
AWSRegion string
|
||||
S3BucketIOSImages string
|
||||
Postgres string
|
||||
|
|
@ -33,12 +30,9 @@ func New() *Config {
|
|||
HTTPTimeout: time.Second * 60,
|
||||
TopicRawWeb: env.String("TOPIC_RAW_WEB"),
|
||||
TopicRawIOS: env.String("TOPIC_RAW_IOS"),
|
||||
TopicCache: env.String("TOPIC_CACHE"),
|
||||
CacheAssets: env.Bool("CACHE_ASSETS"),
|
||||
BeaconSizeLimit: int64(env.Uint64("BEACON_SIZE_LIMIT")),
|
||||
JsonSizeLimit: 1e3, // 1Kb
|
||||
FileSizeLimit: 1e7, // 10Mb
|
||||
AssetsOrigin: env.String("ASSETS_ORIGIN"),
|
||||
AWSRegion: env.String("AWS_REGION"),
|
||||
S3BucketIOSImages: env.String("S3_BUCKET_IOS_IMAGES"),
|
||||
Postgres: env.String("POSTGRES_STRING"),
|
||||
|
|
|
|||
|
|
@ -5,19 +5,25 @@ import (
|
|||
)
|
||||
|
||||
type Config struct {
|
||||
FsDir string
|
||||
FsUlimit uint16
|
||||
GroupSink string
|
||||
TopicRawWeb string
|
||||
TopicRawIOS string
|
||||
FsDir string
|
||||
FsUlimit uint16
|
||||
GroupSink string
|
||||
TopicRawWeb string
|
||||
TopicRawIOS string
|
||||
TopicCache string
|
||||
CacheAssets bool
|
||||
AssetsOrigin string
|
||||
}
|
||||
|
||||
func New() *Config {
|
||||
return &Config{
|
||||
FsDir: env.String("FS_DIR"),
|
||||
FsUlimit: env.Uint16("FS_ULIMIT"),
|
||||
GroupSink: env.String("GROUP_SINK"),
|
||||
TopicRawWeb: env.String("TOPIC_RAW_WEB"),
|
||||
TopicRawIOS: env.String("TOPIC_RAW_IOS"),
|
||||
FsDir: env.String("FS_DIR"),
|
||||
FsUlimit: env.Uint16("FS_ULIMIT"),
|
||||
GroupSink: env.String("GROUP_SINK"),
|
||||
TopicRawWeb: env.String("TOPIC_RAW_WEB"),
|
||||
TopicRawIOS: env.String("TOPIC_RAW_IOS"),
|
||||
TopicCache: env.String("TOPIC_CACHE"),
|
||||
CacheAssets: env.Bool("CACHE_ASSETS"),
|
||||
AssetsOrigin: env.String("ASSETS_ORIGIN"),
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,9 +1,9 @@
|
|||
package router
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
|
|
@ -117,20 +117,15 @@ func (e *Router) pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request)
|
|||
body := http.MaxBytesReader(w, r.Body, e.cfg.BeaconSizeLimit)
|
||||
defer body.Close()
|
||||
|
||||
var handledMessages bytes.Buffer
|
||||
|
||||
// Process each message in request data
|
||||
err = ReadBatchReader(body, func(msg Message) {
|
||||
msg = e.services.Assets.ParseAssets(sessionData.ID, msg)
|
||||
handledMessages.Write(msg.Encode())
|
||||
})
|
||||
bytes, err := ioutil.ReadAll(body)
|
||||
if err != nil {
|
||||
ResponseWithError(w, http.StatusForbidden, err)
|
||||
ResponseWithError(w, http.StatusInternalServerError, err) // TODO: Split environments; send error here only on staging
|
||||
return
|
||||
}
|
||||
|
||||
// Send processed messages to queue as array of bytes
|
||||
err = e.services.Producer.Produce(e.cfg.TopicRawWeb, sessionData.ID, handledMessages.Bytes())
|
||||
// TODO: check bytes for nonsense crap
|
||||
err = e.services.Producer.Produce(e.cfg.TopicRawWeb, sessionData.ID, bytes)
|
||||
if err != nil {
|
||||
log.Printf("can't send processed messages to queue: %s", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
package services
|
||||
|
||||
import (
|
||||
"openreplay/backend/internal/assetscache"
|
||||
"openreplay/backend/internal/config"
|
||||
"openreplay/backend/internal/geoip"
|
||||
"openreplay/backend/internal/uaparser"
|
||||
|
|
@ -10,13 +9,11 @@ import (
|
|||
"openreplay/backend/pkg/queue/types"
|
||||
"openreplay/backend/pkg/storage"
|
||||
"openreplay/backend/pkg/token"
|
||||
"openreplay/backend/pkg/url/assets"
|
||||
)
|
||||
|
||||
type ServicesBuilder struct {
|
||||
Database *cache.PGCache
|
||||
Producer types.Producer
|
||||
Assets *assetscache.AssetsCache
|
||||
Flaker *flakeid.Flaker
|
||||
UaParser *uaparser.UAParser
|
||||
GeoIP *geoip.GeoIP
|
||||
|
|
@ -25,11 +22,9 @@ type ServicesBuilder struct {
|
|||
}
|
||||
|
||||
func New(cfg *config.Config, producer types.Producer, pgconn *cache.PGCache) *ServicesBuilder {
|
||||
rewriter := assets.NewRewriter(cfg.AssetsOrigin)
|
||||
return &ServicesBuilder{
|
||||
Database: pgconn,
|
||||
Producer: producer,
|
||||
Assets: assetscache.New(cfg, rewriter, producer),
|
||||
Storage: storage.NewS3(cfg.AWSRegion, cfg.S3BucketIOSImages),
|
||||
Tokenizer: token.NewTokenizer(cfg.TokenSecret),
|
||||
UaParser: uaparser.NewUAParser(cfg.UAParserFile),
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue