From 8b9eb90afaa16e6162a59c13b59466390045b995 Mon Sep 17 00:00:00 2001 From: ShiKhu Date: Tue, 10 Aug 2021 17:34:57 +0800 Subject: [PATCH] feat (backend-http): BEACON_SIZE_LIMIT env var --- backend/Dockerfile | 1 + backend/Dockerfile.bundle | 1 + backend/services/http/assets.go | 6 +++--- backend/services/http/handlers.go | 13 ++++++------- backend/services/http/main.go | 19 +++++++++++-------- 5 files changed, 22 insertions(+), 18 deletions(-) diff --git a/backend/Dockerfile b/backend/Dockerfile index 43ea47d22..e9ddba135 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -25,6 +25,7 @@ ENV TZ=UTC \ MAXMINDDB_FILE=/root/geoip.mmdb \ UAPARSER_FILE=/root/regexes.yaml \ HTTP_PORT=80 \ + BEACON_SIZE_LIMIT=1000000 \ KAFKA_USE_SSL=true \ REDIS_STREAMS_MAX_LEN=3000 \ TOPIC_RAW=raw \ diff --git a/backend/Dockerfile.bundle b/backend/Dockerfile.bundle index 3105742cf..904bb45f4 100644 --- a/backend/Dockerfile.bundle +++ b/backend/Dockerfile.bundle @@ -26,6 +26,7 @@ ENV TZ=UTC \ MAXMINDDB_FILE=/root/geoip.mmdb \ UAPARSER_FILE=/root/regexes.yaml \ HTTP_PORT=80 \ + BEACON_SIZE_LIMIT=1000000 \ KAFKA_USE_SSL=true \ REDIS_STREAMS_MAX_LEN=3000 \ TOPIC_RAW=raw \ diff --git a/backend/services/http/assets.go b/backend/services/http/assets.go index 8ed625859..dd3dd4703 100644 --- a/backend/services/http/assets.go +++ b/backend/services/http/assets.go @@ -7,7 +7,7 @@ import ( func sendAssetForCache(sessionID uint64, baseURL string, relativeURL string) { if fullURL, cacheable := assets.GetFullCachableURL(baseURL, relativeURL); cacheable { - producer.Produce(topicTrigger, sessionID, messages.Encode(&messages.AssetCache{ + producer.Produce(TOPIC_TRIGGER, sessionID, messages.Encode(&messages.AssetCache{ URL: fullURL, })) } @@ -20,7 +20,7 @@ func sendAssetsForCacheFromCSS(sessionID uint64, baseURL string, css string) { } func handleURL(sessionID uint64, baseURL string, url string) string { - if cacheAssets { + if CACHE_ASSESTS { rewrittenURL, isCachable := rewriter.RewriteURL(sessionID, baseURL, url) if isCachable { sendAssetForCache(sessionID, baseURL, url) @@ -31,7 +31,7 @@ func handleURL(sessionID uint64, baseURL string, url string) string { } func handleCSS(sessionID uint64, baseURL string, css string) string { - if cacheAssets { + if CACHE_ASSESTS { sendAssetsForCacheFromCSS(sessionID, baseURL, css) return rewriter.RewriteCSS(sessionID, baseURL, css) } diff --git a/backend/services/http/handlers.go b/backend/services/http/handlers.go index 761aab764..e22a97abe 100644 --- a/backend/services/http/handlers.go +++ b/backend/services/http/handlers.go @@ -19,7 +19,6 @@ import ( ) const JSON_SIZE_LIMIT int64 = 1e3 // 1Kb -const BATCH_SIZE_LIMIT int64 = 1e6 // 1Mb func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) { type request struct { @@ -90,7 +89,7 @@ func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) { tokenData = &token.TokenData{sessionID, expTime.UnixNano() / 1e6} country := geoIP.ExtractISOCodeFromHTTPRequest(r) - producer.Produce(topicRaw, tokenData.ID, Encode(&SessionStart{ + producer.Produce(TOPIC_RAW, tokenData.ID, Encode(&SessionStart{ Timestamp: req.Timestamp, ProjectID: uint64(p.ProjectID), TrackerVersion: req.TrackerVersion, @@ -120,7 +119,7 @@ func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) { } func pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64) { - body := http.MaxBytesReader(w, r.Body, BATCH_SIZE_LIMIT) + body := http.MaxBytesReader(w, r.Body, BEACON_SIZE_LIMIT) //defer body.Close() var reader io.ReadCloser switch r.Header.Get("Content-Encoding") { @@ -139,7 +138,7 @@ func pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64) { responseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging return } - producer.Produce(topicRaw, sessionID, buf) // What if not able to send? + producer.Produce(TOPIC_RAW, sessionID, buf) // What if not able to send? w.WriteHeader(http.StatusOK) } @@ -158,7 +157,7 @@ func pushMessagesSeparatelyHandler(w http.ResponseWriter, r *http.Request) { responseWithError(w, http.StatusUnauthorized, err) return } - body := http.MaxBytesReader(w, r.Body, BATCH_SIZE_LIMIT) + body := http.MaxBytesReader(w, r.Body, BEACON_SIZE_LIMIT) //defer body.Close() buf, err := ioutil.ReadAll(body) if err != nil { @@ -234,8 +233,8 @@ func pushMessagesSeparatelyHandler(w http.ResponseWriter, r *http.Request) { responseWithError(w, http.StatusForbidden, err) return } - producer.Produce(topicRaw, sessionData.ID, rewritenBuf) - //producer.Produce(topicAnalytics, sessionData.ID, WriteBatch(analyticsMessages)) + producer.Produce(TOPIC_RAW, sessionData.ID, rewritenBuf) + //producer.Produce(TOPIC_ANALYTICS, sessionData.ID, WriteBatch(analyticsMessages)) //duration := time.Now().Sub(startTime) //log.Printf("Sended batch within %v nsec; %v nsek/byte", duration.Nanoseconds(), duration.Nanoseconds()/int64(len(buf))) w.WriteHeader(http.StatusOK) diff --git a/backend/services/http/main.go b/backend/services/http/main.go index 27a83548b..647dc42f5 100644 --- a/backend/services/http/main.go +++ b/backend/services/http/main.go @@ -31,20 +31,22 @@ var uaParser *uaparser.UAParser var geoIP *geoip.GeoIP var tokenizer *token.Tokenizer var s3 *storage.S3 -var topicRaw string -var topicTrigger string -var topicAnalytics string + +var TOPIC_RAW string +var TOPIC_TRIGGER string +var TOPIC_ANALYTICS string // var kafkaTopicEvents string -var cacheAssets bool +var CACHE_ASSESTS bool +var BEACON_SIZE_LIMIT int64 func main() { log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) producer = queue.NewProducer() defer producer.Close(15000) - topicRaw = env.String("TOPIC_RAW") - topicTrigger = env.String("TOPIC_TRIGGER") - topicAnalytics = env.String("TOPIC_ANALYTICS") + TOPIC_RAW = env.String("TOPIC_RAW") + TOPIC_TRIGGER = env.String("TOPIC_TRIGGER") + TOPIC_ANALYTICS = env.String("TOPIC_ANALYTICS") rewriter = assets.NewRewriter(env.String("ASSETS_ORIGIN")) pgconn = cache.NewPGCache(postgres.NewConn(env.String("POSTGRES_STRING")), 1000 * 60 * 20) defer pgconn.Close() @@ -53,7 +55,8 @@ func main() { uaParser = uaparser.NewUAParser(env.String("UAPARSER_FILE")) geoIP = geoip.NewGeoIP(env.String("MAXMINDDB_FILE")) flaker = flakeid.NewFlaker(env.WorkerID()) - cacheAssets = env.Bool("CACHE_ASSETS") + CACHE_ASSESTS = env.Bool("CACHE_ASSETS") + BEACON_SIZE_LIMIT = int64(env.Uint64("BEACON_SIZE_LIMIT")) HTTP_PORT := env.String("HTTP_PORT")