feat (backend-http): BEACON_SIZE_LIMIT env var
This commit is contained in:
parent
b7ac0da761
commit
8b9eb90afa
5 changed files with 22 additions and 18 deletions
|
|
@ -25,6 +25,7 @@ ENV TZ=UTC \
|
|||
MAXMINDDB_FILE=/root/geoip.mmdb \
|
||||
UAPARSER_FILE=/root/regexes.yaml \
|
||||
HTTP_PORT=80 \
|
||||
BEACON_SIZE_LIMIT=1000000 \
|
||||
KAFKA_USE_SSL=true \
|
||||
REDIS_STREAMS_MAX_LEN=3000 \
|
||||
TOPIC_RAW=raw \
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ ENV TZ=UTC \
|
|||
MAXMINDDB_FILE=/root/geoip.mmdb \
|
||||
UAPARSER_FILE=/root/regexes.yaml \
|
||||
HTTP_PORT=80 \
|
||||
BEACON_SIZE_LIMIT=1000000 \
|
||||
KAFKA_USE_SSL=true \
|
||||
REDIS_STREAMS_MAX_LEN=3000 \
|
||||
TOPIC_RAW=raw \
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ import (
|
|||
|
||||
func sendAssetForCache(sessionID uint64, baseURL string, relativeURL string) {
|
||||
if fullURL, cacheable := assets.GetFullCachableURL(baseURL, relativeURL); cacheable {
|
||||
producer.Produce(topicTrigger, sessionID, messages.Encode(&messages.AssetCache{
|
||||
producer.Produce(TOPIC_TRIGGER, sessionID, messages.Encode(&messages.AssetCache{
|
||||
URL: fullURL,
|
||||
}))
|
||||
}
|
||||
|
|
@ -20,7 +20,7 @@ func sendAssetsForCacheFromCSS(sessionID uint64, baseURL string, css string) {
|
|||
}
|
||||
|
||||
func handleURL(sessionID uint64, baseURL string, url string) string {
|
||||
if cacheAssets {
|
||||
if CACHE_ASSESTS {
|
||||
rewrittenURL, isCachable := rewriter.RewriteURL(sessionID, baseURL, url)
|
||||
if isCachable {
|
||||
sendAssetForCache(sessionID, baseURL, url)
|
||||
|
|
@ -31,7 +31,7 @@ func handleURL(sessionID uint64, baseURL string, url string) string {
|
|||
}
|
||||
|
||||
func handleCSS(sessionID uint64, baseURL string, css string) string {
|
||||
if cacheAssets {
|
||||
if CACHE_ASSESTS {
|
||||
sendAssetsForCacheFromCSS(sessionID, baseURL, css)
|
||||
return rewriter.RewriteCSS(sessionID, baseURL, css)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,7 +19,6 @@ import (
|
|||
)
|
||||
|
||||
const JSON_SIZE_LIMIT int64 = 1e3 // 1Kb
|
||||
const BATCH_SIZE_LIMIT int64 = 1e6 // 1Mb
|
||||
|
||||
func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
|
||||
type request struct {
|
||||
|
|
@ -90,7 +89,7 @@ func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
|
|||
tokenData = &token.TokenData{sessionID, expTime.UnixNano() / 1e6}
|
||||
|
||||
country := geoIP.ExtractISOCodeFromHTTPRequest(r)
|
||||
producer.Produce(topicRaw, tokenData.ID, Encode(&SessionStart{
|
||||
producer.Produce(TOPIC_RAW, tokenData.ID, Encode(&SessionStart{
|
||||
Timestamp: req.Timestamp,
|
||||
ProjectID: uint64(p.ProjectID),
|
||||
TrackerVersion: req.TrackerVersion,
|
||||
|
|
@ -120,7 +119,7 @@ func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
func pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64) {
|
||||
body := http.MaxBytesReader(w, r.Body, BATCH_SIZE_LIMIT)
|
||||
body := http.MaxBytesReader(w, r.Body, BEACON_SIZE_LIMIT)
|
||||
//defer body.Close()
|
||||
var reader io.ReadCloser
|
||||
switch r.Header.Get("Content-Encoding") {
|
||||
|
|
@ -139,7 +138,7 @@ func pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64) {
|
|||
responseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
|
||||
return
|
||||
}
|
||||
producer.Produce(topicRaw, sessionID, buf) // What if not able to send?
|
||||
producer.Produce(TOPIC_RAW, sessionID, buf) // What if not able to send?
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
|
|
@ -158,7 +157,7 @@ func pushMessagesSeparatelyHandler(w http.ResponseWriter, r *http.Request) {
|
|||
responseWithError(w, http.StatusUnauthorized, err)
|
||||
return
|
||||
}
|
||||
body := http.MaxBytesReader(w, r.Body, BATCH_SIZE_LIMIT)
|
||||
body := http.MaxBytesReader(w, r.Body, BEACON_SIZE_LIMIT)
|
||||
//defer body.Close()
|
||||
buf, err := ioutil.ReadAll(body)
|
||||
if err != nil {
|
||||
|
|
@ -234,8 +233,8 @@ func pushMessagesSeparatelyHandler(w http.ResponseWriter, r *http.Request) {
|
|||
responseWithError(w, http.StatusForbidden, err)
|
||||
return
|
||||
}
|
||||
producer.Produce(topicRaw, sessionData.ID, rewritenBuf)
|
||||
//producer.Produce(topicAnalytics, sessionData.ID, WriteBatch(analyticsMessages))
|
||||
producer.Produce(TOPIC_RAW, sessionData.ID, rewritenBuf)
|
||||
//producer.Produce(TOPIC_ANALYTICS, sessionData.ID, WriteBatch(analyticsMessages))
|
||||
//duration := time.Now().Sub(startTime)
|
||||
//log.Printf("Sended batch within %v nsec; %v nsek/byte", duration.Nanoseconds(), duration.Nanoseconds()/int64(len(buf)))
|
||||
w.WriteHeader(http.StatusOK)
|
||||
|
|
|
|||
|
|
@ -31,20 +31,22 @@ var uaParser *uaparser.UAParser
|
|||
var geoIP *geoip.GeoIP
|
||||
var tokenizer *token.Tokenizer
|
||||
var s3 *storage.S3
|
||||
var topicRaw string
|
||||
var topicTrigger string
|
||||
var topicAnalytics string
|
||||
|
||||
var TOPIC_RAW string
|
||||
var TOPIC_TRIGGER string
|
||||
var TOPIC_ANALYTICS string
|
||||
// var kafkaTopicEvents string
|
||||
var cacheAssets bool
|
||||
var CACHE_ASSESTS bool
|
||||
var BEACON_SIZE_LIMIT int64
|
||||
|
||||
func main() {
|
||||
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
|
||||
|
||||
producer = queue.NewProducer()
|
||||
defer producer.Close(15000)
|
||||
topicRaw = env.String("TOPIC_RAW")
|
||||
topicTrigger = env.String("TOPIC_TRIGGER")
|
||||
topicAnalytics = env.String("TOPIC_ANALYTICS")
|
||||
TOPIC_RAW = env.String("TOPIC_RAW")
|
||||
TOPIC_TRIGGER = env.String("TOPIC_TRIGGER")
|
||||
TOPIC_ANALYTICS = env.String("TOPIC_ANALYTICS")
|
||||
rewriter = assets.NewRewriter(env.String("ASSETS_ORIGIN"))
|
||||
pgconn = cache.NewPGCache(postgres.NewConn(env.String("POSTGRES_STRING")), 1000 * 60 * 20)
|
||||
defer pgconn.Close()
|
||||
|
|
@ -53,7 +55,8 @@ func main() {
|
|||
uaParser = uaparser.NewUAParser(env.String("UAPARSER_FILE"))
|
||||
geoIP = geoip.NewGeoIP(env.String("MAXMINDDB_FILE"))
|
||||
flaker = flakeid.NewFlaker(env.WorkerID())
|
||||
cacheAssets = env.Bool("CACHE_ASSETS")
|
||||
CACHE_ASSESTS = env.Bool("CACHE_ASSETS")
|
||||
BEACON_SIZE_LIMIT = int64(env.Uint64("BEACON_SIZE_LIMIT"))
|
||||
|
||||
HTTP_PORT := env.String("HTTP_PORT")
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue