diff --git a/backend/build.sh b/backend/build.sh
index 84af1a919..a4d95341a 100755
--- a/backend/build.sh
+++ b/backend/build.sh
@@ -23,7 +23,7 @@ function build_service() {
image="$1"
echo "BUILDING $image"
case "$image" in
- http | db)
+ http | db | sink)
echo build http
docker build -t ${DOCKER_REPO:-'local'}/$image:${git_sha1} --platform linux/amd64 --build-arg SERVICE_NAME=$image -f ./cmd/Dockerfile .
[[ $PUSH_IMAGE -eq 1 ]] && {
diff --git a/backend/cmd/http/main.go b/backend/cmd/http/main.go
index 541baab62..3fdd8d34a 100644
--- a/backend/cmd/http/main.go
+++ b/backend/cmd/http/main.go
@@ -2,6 +2,10 @@ package main
import (
"log"
+ "os"
+ "os/signal"
+ "syscall"
+
"openreplay/backend/internal/config"
"openreplay/backend/internal/router"
"openreplay/backend/internal/server"
@@ -10,9 +14,6 @@ import (
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/pprof"
"openreplay/backend/pkg/queue"
- "os"
- "os/signal"
- "syscall"
)
func main() {
diff --git a/backend/services/sink/main.go b/backend/cmd/sink/main.go
similarity index 69%
rename from backend/services/sink/main.go
rename to backend/cmd/sink/main.go
index 1a23919c6..269b8fab0 100644
--- a/backend/services/sink/main.go
+++ b/backend/cmd/sink/main.go
@@ -9,37 +9,48 @@ import (
"os/signal"
"syscall"
- "openreplay/backend/pkg/env"
+ "openreplay/backend/internal/assetscache"
+ "openreplay/backend/internal/config/sink"
+ "openreplay/backend/internal/oswriter"
. "openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/queue/types"
+ "openreplay/backend/pkg/url/assets"
)
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
- FS_DIR := env.String("FS_DIR")
- if _, err := os.Stat(FS_DIR); os.IsNotExist(err) {
- log.Fatalf("%v doesn't exist. %v", FS_DIR, err)
+ cfg := sink.New()
+
+ if _, err := os.Stat(cfg.FsDir); os.IsNotExist(err) {
+ log.Fatalf("%v doesn't exist. %v", cfg.FsDir, err)
}
- writer := NewWriter(env.Uint16("FS_ULIMIT"), FS_DIR)
+ writer := oswriter.NewWriter(cfg.FsUlimit, cfg.FsDir)
+
+ producer := queue.NewProducer()
+ defer producer.Close(cfg.ProducerCloseTimeout)
+ rewriter := assets.NewRewriter(cfg.AssetsOrigin)
+ assetMessageHandler := assetscache.New(cfg, rewriter, producer)
count := 0
consumer := queue.NewMessageConsumer(
- env.String("GROUP_SINK"),
+ cfg.GroupSink,
[]string{
- env.String("TOPIC_RAW_WEB"),
- env.String("TOPIC_RAW_IOS"),
+ cfg.TopicRawIOS,
+ cfg.TopicRawWeb,
},
func(sessionID uint64, message Message, _ *types.Meta) {
+ count++
+
typeID := message.TypeID()
if !IsReplayerType(typeID) {
return
}
- count++
+ message = assetMessageHandler.ParseAssets(sessionID, message)
value := message.Encode()
var data []byte
diff --git a/backend/internal/assetscache/assets.go b/backend/internal/assetscache/assets.go
index 1ef70b56c..cf76bed8c 100644
--- a/backend/internal/assetscache/assets.go
+++ b/backend/internal/assetscache/assets.go
@@ -1,19 +1,19 @@
package assetscache
import (
- "openreplay/backend/internal/config"
+ "openreplay/backend/internal/config/sink"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/url/assets"
)
type AssetsCache struct {
- cfg *config.Config
+ cfg *sink.Config
rewriter *assets.Rewriter
producer types.Producer
}
-func New(cfg *config.Config, rewriter *assets.Rewriter, producer types.Producer) *AssetsCache {
+func New(cfg *sink.Config, rewriter *assets.Rewriter, producer types.Producer) *AssetsCache {
return &AssetsCache{
cfg: cfg,
rewriter: rewriter,
diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go
index 5b55ba346..b98f910c2 100644
--- a/backend/internal/config/config.go
+++ b/backend/internal/config/config.go
@@ -11,12 +11,9 @@ type Config struct {
HTTPTimeout time.Duration
TopicRawWeb string
TopicRawIOS string
- TopicCache string
- CacheAssets bool
BeaconSizeLimit int64
JsonSizeLimit int64
FileSizeLimit int64
- AssetsOrigin string
AWSRegion string
S3BucketIOSImages string
Postgres string
@@ -33,12 +30,9 @@ func New() *Config {
HTTPTimeout: time.Second * 60,
TopicRawWeb: env.String("TOPIC_RAW_WEB"),
TopicRawIOS: env.String("TOPIC_RAW_IOS"),
- TopicCache: env.String("TOPIC_CACHE"),
- CacheAssets: env.Bool("CACHE_ASSETS"),
BeaconSizeLimit: int64(env.Uint64("BEACON_SIZE_LIMIT")),
JsonSizeLimit: 1e3, // 1Kb
FileSizeLimit: 1e7, // 10Mb
- AssetsOrigin: env.String("ASSETS_ORIGIN"),
AWSRegion: env.String("AWS_REGION"),
S3BucketIOSImages: env.String("S3_BUCKET_IOS_IMAGES"),
Postgres: env.String("POSTGRES_STRING"),
diff --git a/backend/internal/config/sink/config.go b/backend/internal/config/sink/config.go
new file mode 100644
index 000000000..a78bfba63
--- /dev/null
+++ b/backend/internal/config/sink/config.go
@@ -0,0 +1,31 @@
+package sink
+
+import (
+ "openreplay/backend/pkg/env"
+)
+
+type Config struct {
+ FsDir string
+ FsUlimit uint16
+ GroupSink string
+ TopicRawWeb string
+ TopicRawIOS string
+ TopicCache string
+ CacheAssets bool
+ AssetsOrigin string
+ ProducerCloseTimeout int
+}
+
+func New() *Config {
+ return &Config{
+ FsDir: env.String("FS_DIR"),
+ FsUlimit: env.Uint16("FS_ULIMIT"),
+ GroupSink: env.String("GROUP_SINK"),
+ TopicRawWeb: env.String("TOPIC_RAW_WEB"),
+ TopicRawIOS: env.String("TOPIC_RAW_IOS"),
+ TopicCache: env.String("TOPIC_CACHE"),
+ CacheAssets: env.Bool("CACHE_ASSETS"),
+ AssetsOrigin: env.String("ASSETS_ORIGIN"),
+ ProducerCloseTimeout: 15000,
+ }
+}
diff --git a/backend/services/sink/writer.go b/backend/internal/oswriter/oswriter.go
similarity index 99%
rename from backend/services/sink/writer.go
rename to backend/internal/oswriter/oswriter.go
index 6fcfdddbc..839e61eba 100644
--- a/backend/services/sink/writer.go
+++ b/backend/internal/oswriter/oswriter.go
@@ -1,4 +1,4 @@
-package main
+package oswriter
import (
"math"
diff --git a/backend/internal/router/handlers-web.go b/backend/internal/router/handlers-web.go
index fc7c6421d..13301d97e 100644
--- a/backend/internal/router/handlers-web.go
+++ b/backend/internal/router/handlers-web.go
@@ -1,9 +1,9 @@
package router
import (
- "bytes"
"encoding/json"
"errors"
+ "io/ioutil"
"log"
"math/rand"
"net/http"
@@ -64,14 +64,14 @@ func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request)
ResponseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
return
}
- sessionID, err := e.services.Flaker.Compose(uint64(startTime.UnixNano() / 1e6))
+ sessionID, err := e.services.Flaker.Compose(uint64(startTime.UnixMilli()))
if err != nil {
ResponseWithError(w, http.StatusInternalServerError, err)
return
}
// TODO: if EXPIRED => send message for two sessions association
expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond)
- tokenData = &token.TokenData{ID: sessionID, ExpTime: expTime.UnixNano() / 1e6}
+ tokenData = &token.TokenData{ID: sessionID, ExpTime: expTime.UnixMilli()}
e.services.Producer.Produce(e.cfg.TopicRawWeb, tokenData.ID, Encode(&SessionStart{
Timestamp: req.Timestamp,
@@ -117,20 +117,15 @@ func (e *Router) pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request)
body := http.MaxBytesReader(w, r.Body, e.cfg.BeaconSizeLimit)
defer body.Close()
- var handledMessages bytes.Buffer
-
- // Process each message in request data
- err = ReadBatchReader(body, func(msg Message) {
- msg = e.services.Assets.ParseAssets(sessionData.ID, msg)
- handledMessages.Write(msg.Encode())
- })
+ bytes, err := ioutil.ReadAll(body)
if err != nil {
- ResponseWithError(w, http.StatusForbidden, err)
+ ResponseWithError(w, http.StatusInternalServerError, err) // TODO: Split environments; send error here only on staging
return
}
// Send processed messages to queue as array of bytes
- err = e.services.Producer.Produce(e.cfg.TopicRawWeb, sessionData.ID, handledMessages.Bytes())
+ // TODO: check bytes for nonsense crap
+ err = e.services.Producer.Produce(e.cfg.TopicRawWeb, sessionData.ID, bytes)
if err != nil {
log.Printf("can't send processed messages to queue: %s", err)
}
diff --git a/backend/internal/services/services.go b/backend/internal/services/services.go
index 5b84e1dfb..c9915c78e 100644
--- a/backend/internal/services/services.go
+++ b/backend/internal/services/services.go
@@ -1,7 +1,6 @@
package services
import (
- "openreplay/backend/internal/assetscache"
"openreplay/backend/internal/config"
"openreplay/backend/internal/geoip"
"openreplay/backend/internal/uaparser"
@@ -10,13 +9,11 @@ import (
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/storage"
"openreplay/backend/pkg/token"
- "openreplay/backend/pkg/url/assets"
)
type ServicesBuilder struct {
Database *cache.PGCache
Producer types.Producer
- Assets *assetscache.AssetsCache
Flaker *flakeid.Flaker
UaParser *uaparser.UAParser
GeoIP *geoip.GeoIP
@@ -25,11 +22,9 @@ type ServicesBuilder struct {
}
func New(cfg *config.Config, producer types.Producer, pgconn *cache.PGCache) *ServicesBuilder {
- rewriter := assets.NewRewriter(cfg.AssetsOrigin)
return &ServicesBuilder{
Database: pgconn,
Producer: producer,
- Assets: assetscache.New(cfg, rewriter, producer),
Storage: storage.NewS3(cfg.AWSRegion, cfg.S3BucketIOSImages),
Tokenizer: token.NewTokenizer(cfg.TokenSecret),
UaParser: uaparser.NewUAParser(cfg.UAParserFile),
diff --git a/backend/services/sink/build_hack b/backend/services/sink/build_hack
new file mode 100644
index 000000000..e69de29bb
diff --git a/frontend/app/components/Client/Users/components/UserForm/UserForm.tsx b/frontend/app/components/Client/Users/components/UserForm/UserForm.tsx
index 25f35097c..a56e62c83 100644
--- a/frontend/app/components/Client/Users/components/UserForm/UserForm.tsx
+++ b/frontend/app/components/Client/Users/components/UserForm/UserForm.tsx
@@ -94,7 +94,7 @@ function UserForm(props: Props) {
- { !isEnterprise && (
+ { isEnterprise && (