From 10edeb6e2dd1f2bd2898323d372af7c65ee62336 Mon Sep 17 00:00:00 2001 From: Alexander Zavorotynskiy Date: Fri, 29 Apr 2022 16:53:28 +0200 Subject: [PATCH] Refactoring of http handlers --- backend/pkg/messages/batch.go | 39 +--------- .../pkg/messages/legacy-message-transform.go | 13 +--- backend/pkg/queue/messages.go | 3 +- backend/services/http/handlers-web.go | 78 ++++++++++--------- backend/services/http/main.go | 24 ++++-- backend/services/http/model.go | 30 +++++++ 6 files changed, 94 insertions(+), 93 deletions(-) create mode 100644 backend/services/http/model.go diff --git a/backend/pkg/messages/batch.go b/backend/pkg/messages/batch.go index 9241672a3..850f22de9 100644 --- a/backend/pkg/messages/batch.go +++ b/backend/pkg/messages/batch.go @@ -1,17 +1,12 @@ package messages import ( - "bytes" "io" "github.com/pkg/errors" ) -func ReadBatch(b []byte, callback func(Message)) error { - return ReadBatchReader(bytes.NewReader(b), callback) -} - -func ReadBatchReader(reader io.Reader, callback func(Message)) error { +func ReadBatchReader(reader io.Reader, messageHandler func(Message)) error { var index uint64 var timestamp int64 for { @@ -21,7 +16,7 @@ func ReadBatchReader(reader io.Reader, callback func(Message)) error { } else if err != nil { return errors.Wrapf(err, "Batch Message decoding error on message with index %v", index) } - msg = transformDepricated(msg) + msg = transformDeprecated(msg) isBatchMeta := false switch m := msg.(type) { @@ -48,37 +43,11 @@ func ReadBatchReader(reader io.Reader, callback func(Message)) error { } msg.Meta().Index = index msg.Meta().Timestamp = timestamp - callback(msg) + + messageHandler(msg) if !isBatchMeta { // Without that indexes will be unique anyway, though shifted by 1 because BatchMeta is not counted in tracker index++ } } return errors.New("Error of the codeflow. (Should return on EOF)") } - -const AVG_MESSAGE_SIZE = 40 // TODO: calculate OR calculate dynamically -func WriteBatch(mList []Message) []byte { - batch := make([]byte, AVG_MESSAGE_SIZE*len(mList)) - p := 0 - for _, msg := range mList { - msgBytes := msg.Encode() - if len(batch) < p+len(msgBytes) { - newBatch := make([]byte, 2*len(batch)+len(msgBytes)) - copy(newBatch, batch) - batch = newBatch - } - copy(batch[p:], msgBytes) - p += len(msgBytes) - } - return batch[:p] -} - -func RewriteBatch(reader io.Reader, rewrite func(Message) Message) ([]byte, error) { - mList := make([]Message, 0, 10) // 10? - if err := ReadBatchReader(reader, func(m Message) { - mList = append(mList, rewrite(m)) - }); err != nil { - return nil, err - } - return WriteBatch(mList), nil -} diff --git a/backend/pkg/messages/legacy-message-transform.go b/backend/pkg/messages/legacy-message-transform.go index c22b73378..6774d95b1 100644 --- a/backend/pkg/messages/legacy-message-transform.go +++ b/backend/pkg/messages/legacy-message-transform.go @@ -1,6 +1,6 @@ package messages -func transformDepricated(msg Message) Message { +func transformDeprecated(msg Message) Message { switch m := msg.(type) { case *MouseClickDepricated: meta := m.Meta() @@ -11,17 +11,6 @@ func transformDepricated(msg Message) Message { HesitationTime: m.HesitationTime, Label: m.Label, } - // case *FetchDepricated: - // return &Fetch { - // Method: m.Method, - // URL: m.URL, - // Request: m.Request, - // Response: m.Response, - // Status: m.Status, - // Timestamp: m.Timestamp, - // Duration: m.Duration, - // // Headers: '' - // } default: return msg } diff --git a/backend/pkg/queue/messages.go b/backend/pkg/queue/messages.go index 0ab184ee6..9b9ff43a5 100644 --- a/backend/pkg/queue/messages.go +++ b/backend/pkg/queue/messages.go @@ -1,6 +1,7 @@ package queue import ( + "bytes" "log" "openreplay/backend/pkg/messages" @@ -9,7 +10,7 @@ import ( func NewMessageConsumer(group string, topics []string, handler types.DecodedMessageHandler, autoCommit bool) types.Consumer { return NewConsumer(group, topics, func(sessionID uint64, value []byte, meta *types.Meta) { - if err := messages.ReadBatch(value, func(msg messages.Message) { + if err := messages.ReadBatchReader(bytes.NewReader(value), func(msg messages.Message) { handler(sessionID, msg, meta) }); err != nil { log.Printf("Decode error: %v\n", err) diff --git a/backend/services/http/handlers-web.go b/backend/services/http/handlers-web.go index e5cf57a60..0c2fa386c 100644 --- a/backend/services/http/handlers-web.go +++ b/backend/services/http/handlers-web.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "encoding/json" "errors" "log" @@ -15,37 +16,23 @@ import ( ) func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) { - type request struct { - Token string `json:"token"` - UserUUID *string `json:"userUUID"` - RevID string `json:"revID"` - Timestamp uint64 `json:"timestamp"` - TrackerVersion string `json:"trackerVersion"` - IsSnippet bool `json:"isSnippet"` - DeviceMemory uint64 `json:"deviceMemory"` - JsHeapSizeLimit uint64 `json:"jsHeapSizeLimit"` - ProjectKey *string `json:"projectKey"` - Reset bool `json:"reset"` - UserID string `json:"userID"` - } - type response struct { - Timestamp int64 `json:"timestamp"` - Delay int64 `json:"delay"` - Token string `json:"token"` - UserUUID string `json:"userUUID"` - SessionID string `json:"sessionID"` - BeaconSizeLimit int64 `json:"beaconSizeLimit"` - } - startTime := time.Now() - req := &request{} - body := http.MaxBytesReader(w, r.Body, JSON_SIZE_LIMIT) // what if Body == nil?? // use r.ContentLength to return specific error? + + // Check request body + if r.Body == nil { + responseWithError(w, http.StatusBadRequest, errors.New("request body is empty")) + } + body := http.MaxBytesReader(w, r.Body, JSON_SIZE_LIMIT) defer body.Close() + + // Parse request body + req := &startSessionRequest{} if err := json.NewDecoder(body).Decode(req); err != nil { responseWithError(w, http.StatusBadRequest, err) return } + // Handler's logic if req.ProjectKey == nil { responseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required")) return @@ -82,9 +69,8 @@ func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) { } // TODO: if EXPIRED => send message for two sessions association expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond) - tokenData = &token.TokenData{sessionID, expTime.UnixNano() / 1e6} + tokenData = &token.TokenData{ID: sessionID, ExpTime: expTime.UnixNano() / 1e6} - country := geoIP.ExtractISOCodeFromHTTPRequest(r) producer.Produce(TOPIC_RAW_WEB, tokenData.ID, Encode(&SessionStart{ Timestamp: req.Timestamp, ProjectID: uint64(p.ProjectID), @@ -98,14 +84,14 @@ func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) { UserBrowserVersion: ua.BrowserVersion, UserDevice: ua.Device, UserDeviceType: ua.DeviceType, - UserCountry: country, + UserCountry: geoIP.ExtractISOCodeFromHTTPRequest(r), UserDeviceMemorySize: req.DeviceMemory, UserDeviceHeapSize: req.JsHeapSizeLimit, UserID: req.UserID, })) } - responseWithJSON(w, &response{ + responseWithJSON(w, &startSessionResponse{ Token: tokenizer.Compose(*tokenData), UserUUID: userUUID, SessionID: strconv.FormatUint(tokenData.ID, 10), @@ -114,15 +100,24 @@ func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) { } func pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request) { + // Check authorization sessionData, err := tokenizer.ParseFromHTTPRequest(r) if err != nil { responseWithError(w, http.StatusUnauthorized, err) return } + + // Check request body + if r.Body == nil { + responseWithError(w, http.StatusBadRequest, errors.New("request body is empty")) + } body := http.MaxBytesReader(w, r.Body, BEACON_SIZE_LIMIT) defer body.Close() - rewritenBuf, err := RewriteBatch(body, func(msg Message) Message { + var handledMessages bytes.Buffer + + // Process each message in request data + err = ReadBatchReader(body, func(msg Message) { switch m := msg.(type) { case *SetNodeAttributeURLBased: if m.Name == "src" || m.Name == "href" { @@ -150,30 +145,38 @@ func pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request) { Rule: handleCSS(sessionData.ID, m.BaseURL, m.Rule), } } - - return msg + handledMessages.Write(msg.Encode()) }) if err != nil { responseWithError(w, http.StatusForbidden, err) return } - producer.Produce(TOPIC_RAW_WEB, sessionData.ID, rewritenBuf) + + // Send processed messages to queue as array of bytes + err = producer.Produce(TOPIC_RAW_WEB, sessionData.ID, handledMessages.Bytes()) + if err != nil { + log.Printf("can't send processed messages to queue: %s", err) + } + w.WriteHeader(http.StatusOK) } func notStartedHandlerWeb(w http.ResponseWriter, r *http.Request) { - type request struct { - ProjectKey *string `json:"projectKey"` - TrackerVersion string `json:"trackerVersion"` - DoNotTrack bool `json:"DoNotTrack"` + // Check request body + if r.Body == nil { + responseWithError(w, http.StatusBadRequest, errors.New("request body is empty")) } - req := &request{} body := http.MaxBytesReader(w, r.Body, JSON_SIZE_LIMIT) defer body.Close() + + // Parse request body + req := ¬StartedRequest{} if err := json.NewDecoder(body).Decode(req); err != nil { responseWithError(w, http.StatusBadRequest, err) return } + + // Handler's logic if req.ProjectKey == nil { responseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required")) return @@ -201,5 +204,6 @@ func notStartedHandlerWeb(w http.ResponseWriter, r *http.Request) { if err != nil { log.Printf("Unable to insert Unstarted Session: %v\n", err) } + w.WriteHeader(http.StatusOK) } diff --git a/backend/services/http/main.go b/backend/services/http/main.go index 1f3bc93b3..43dd6865e 100644 --- a/backend/services/http/main.go +++ b/backend/services/http/main.go @@ -4,6 +4,7 @@ import ( "context" "log" "net/http" + "openreplay/backend/pkg/db/postgres" "os" "os/signal" "syscall" @@ -11,7 +12,6 @@ import ( "golang.org/x/net/http2" "openreplay/backend/pkg/db/cache" - "openreplay/backend/pkg/db/postgres" "openreplay/backend/pkg/env" "openreplay/backend/pkg/flakeid" "openreplay/backend/pkg/queue" @@ -47,26 +47,32 @@ func main() { log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) pprof.StartProfilingServer() + // Queue producer = queue.NewProducer() defer producer.Close(15000) + + // Database + pgconn = cache.NewPGCache(postgres.NewConn(env.String("POSTGRES_STRING")), 1000*60*20) + defer pgconn.Close() + + // Envs TOPIC_RAW_WEB = env.String("TOPIC_RAW_WEB") TOPIC_RAW_IOS = env.String("TOPIC_RAW_IOS") TOPIC_CACHE = env.String("TOPIC_CACHE") TOPIC_TRIGGER = env.String("TOPIC_TRIGGER") - //TOPIC_ANALYTICS = env.String("TOPIC_ANALYTICS") + CACHE_ASSESTS = env.Bool("CACHE_ASSETS") + BEACON_SIZE_LIMIT = int64(env.Uint64("BEACON_SIZE_LIMIT")) + HTTP_PORT := env.String("HTTP_PORT") + + // Modules rewriter = assets.NewRewriter(env.String("ASSETS_ORIGIN")) - pgconn = cache.NewPGCache(postgres.NewConn(env.String("POSTGRES_STRING")), 1000*60*20) - defer pgconn.Close() s3 = storage.NewS3(env.String("AWS_REGION"), env.String("S3_BUCKET_IOS_IMAGES")) tokenizer = token.NewTokenizer(env.String("TOKEN_SECRET")) uaParser = uaparser.NewUAParser(env.String("UAPARSER_FILE")) geoIP = geoip.NewGeoIP(env.String("MAXMINDDB_FILE")) flaker = flakeid.NewFlaker(env.WorkerID()) - CACHE_ASSESTS = env.Bool("CACHE_ASSETS") - BEACON_SIZE_LIMIT = int64(env.Uint64("BEACON_SIZE_LIMIT")) - - HTTP_PORT := env.String("HTTP_PORT") + // Server server := &http.Server{ Addr: ":" + HTTP_PORT, Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -140,6 +146,7 @@ func main() { } }), } + http2.ConfigureServer(server, nil) go func() { if err := server.ListenAndServe(); err != nil { @@ -148,6 +155,7 @@ func main() { } }() log.Printf("Server successfully started on port %v\n", HTTP_PORT) + sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) <-sigchan diff --git a/backend/services/http/model.go b/backend/services/http/model.go new file mode 100644 index 000000000..5a7dd28bc --- /dev/null +++ b/backend/services/http/model.go @@ -0,0 +1,30 @@ +package main + +type startSessionRequest struct { + Token string `json:"token"` + UserUUID *string `json:"userUUID"` + RevID string `json:"revID"` + Timestamp uint64 `json:"timestamp"` + TrackerVersion string `json:"trackerVersion"` + IsSnippet bool `json:"isSnippet"` + DeviceMemory uint64 `json:"deviceMemory"` + JsHeapSizeLimit uint64 `json:"jsHeapSizeLimit"` + ProjectKey *string `json:"projectKey"` + Reset bool `json:"reset"` + UserID string `json:"userID"` +} + +type startSessionResponse struct { + Timestamp int64 `json:"timestamp"` + Delay int64 `json:"delay"` + Token string `json:"token"` + UserUUID string `json:"userUUID"` + SessionID string `json:"sessionID"` + BeaconSizeLimit int64 `json:"beaconSizeLimit"` +} + +type notStartedRequest struct { + ProjectKey *string `json:"projectKey"` + TrackerVersion string `json:"trackerVersion"` + DoNotTrack bool `json:"DoNotTrack"` +}