Refactoring of http handlers

This commit is contained in:
Alexander Zavorotynskiy 2022-04-29 16:53:28 +02:00
parent dc69131499
commit 10edeb6e2d
6 changed files with 94 additions and 93 deletions

View file

@ -1,17 +1,12 @@
package messages
import (
"bytes"
"io"
"github.com/pkg/errors"
)
func ReadBatch(b []byte, callback func(Message)) error {
return ReadBatchReader(bytes.NewReader(b), callback)
}
func ReadBatchReader(reader io.Reader, callback func(Message)) error {
func ReadBatchReader(reader io.Reader, messageHandler func(Message)) error {
var index uint64
var timestamp int64
for {
@ -21,7 +16,7 @@ func ReadBatchReader(reader io.Reader, callback func(Message)) error {
} else if err != nil {
return errors.Wrapf(err, "Batch Message decoding error on message with index %v", index)
}
msg = transformDepricated(msg)
msg = transformDeprecated(msg)
isBatchMeta := false
switch m := msg.(type) {
@ -48,37 +43,11 @@ func ReadBatchReader(reader io.Reader, callback func(Message)) error {
}
msg.Meta().Index = index
msg.Meta().Timestamp = timestamp
callback(msg)
messageHandler(msg)
if !isBatchMeta { // Without that indexes will be unique anyway, though shifted by 1 because BatchMeta is not counted in tracker
index++
}
}
return errors.New("Error of the codeflow. (Should return on EOF)")
}
const AVG_MESSAGE_SIZE = 40 // TODO: calculate OR calculate dynamically
func WriteBatch(mList []Message) []byte {
batch := make([]byte, AVG_MESSAGE_SIZE*len(mList))
p := 0
for _, msg := range mList {
msgBytes := msg.Encode()
if len(batch) < p+len(msgBytes) {
newBatch := make([]byte, 2*len(batch)+len(msgBytes))
copy(newBatch, batch)
batch = newBatch
}
copy(batch[p:], msgBytes)
p += len(msgBytes)
}
return batch[:p]
}
func RewriteBatch(reader io.Reader, rewrite func(Message) Message) ([]byte, error) {
mList := make([]Message, 0, 10) // 10?
if err := ReadBatchReader(reader, func(m Message) {
mList = append(mList, rewrite(m))
}); err != nil {
return nil, err
}
return WriteBatch(mList), nil
}

View file

@ -1,6 +1,6 @@
package messages
func transformDepricated(msg Message) Message {
func transformDeprecated(msg Message) Message {
switch m := msg.(type) {
case *MouseClickDepricated:
meta := m.Meta()
@ -11,17 +11,6 @@ func transformDepricated(msg Message) Message {
HesitationTime: m.HesitationTime,
Label: m.Label,
}
// case *FetchDepricated:
// return &Fetch {
// Method: m.Method,
// URL: m.URL,
// Request: m.Request,
// Response: m.Response,
// Status: m.Status,
// Timestamp: m.Timestamp,
// Duration: m.Duration,
// // Headers: ''
// }
default:
return msg
}

View file

@ -1,6 +1,7 @@
package queue
import (
"bytes"
"log"
"openreplay/backend/pkg/messages"
@ -9,7 +10,7 @@ import (
func NewMessageConsumer(group string, topics []string, handler types.DecodedMessageHandler, autoCommit bool) types.Consumer {
return NewConsumer(group, topics, func(sessionID uint64, value []byte, meta *types.Meta) {
if err := messages.ReadBatch(value, func(msg messages.Message) {
if err := messages.ReadBatchReader(bytes.NewReader(value), func(msg messages.Message) {
handler(sessionID, msg, meta)
}); err != nil {
log.Printf("Decode error: %v\n", err)

View file

@ -1,6 +1,7 @@
package main
import (
"bytes"
"encoding/json"
"errors"
"log"
@ -15,37 +16,23 @@ import (
)
func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
type request struct {
Token string `json:"token"`
UserUUID *string `json:"userUUID"`
RevID string `json:"revID"`
Timestamp uint64 `json:"timestamp"`
TrackerVersion string `json:"trackerVersion"`
IsSnippet bool `json:"isSnippet"`
DeviceMemory uint64 `json:"deviceMemory"`
JsHeapSizeLimit uint64 `json:"jsHeapSizeLimit"`
ProjectKey *string `json:"projectKey"`
Reset bool `json:"reset"`
UserID string `json:"userID"`
}
type response struct {
Timestamp int64 `json:"timestamp"`
Delay int64 `json:"delay"`
Token string `json:"token"`
UserUUID string `json:"userUUID"`
SessionID string `json:"sessionID"`
BeaconSizeLimit int64 `json:"beaconSizeLimit"`
}
startTime := time.Now()
req := &request{}
body := http.MaxBytesReader(w, r.Body, JSON_SIZE_LIMIT) // what if Body == nil?? // use r.ContentLength to return specific error?
// Check request body
if r.Body == nil {
responseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
}
body := http.MaxBytesReader(w, r.Body, JSON_SIZE_LIMIT)
defer body.Close()
// Parse request body
req := &startSessionRequest{}
if err := json.NewDecoder(body).Decode(req); err != nil {
responseWithError(w, http.StatusBadRequest, err)
return
}
// Handler's logic
if req.ProjectKey == nil {
responseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"))
return
@ -82,9 +69,8 @@ func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
}
// TODO: if EXPIRED => send message for two sessions association
expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond)
tokenData = &token.TokenData{sessionID, expTime.UnixNano() / 1e6}
tokenData = &token.TokenData{ID: sessionID, ExpTime: expTime.UnixNano() / 1e6}
country := geoIP.ExtractISOCodeFromHTTPRequest(r)
producer.Produce(TOPIC_RAW_WEB, tokenData.ID, Encode(&SessionStart{
Timestamp: req.Timestamp,
ProjectID: uint64(p.ProjectID),
@ -98,14 +84,14 @@ func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
UserBrowserVersion: ua.BrowserVersion,
UserDevice: ua.Device,
UserDeviceType: ua.DeviceType,
UserCountry: country,
UserCountry: geoIP.ExtractISOCodeFromHTTPRequest(r),
UserDeviceMemorySize: req.DeviceMemory,
UserDeviceHeapSize: req.JsHeapSizeLimit,
UserID: req.UserID,
}))
}
responseWithJSON(w, &response{
responseWithJSON(w, &startSessionResponse{
Token: tokenizer.Compose(*tokenData),
UserUUID: userUUID,
SessionID: strconv.FormatUint(tokenData.ID, 10),
@ -114,15 +100,24 @@ func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
}
func pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request) {
// Check authorization
sessionData, err := tokenizer.ParseFromHTTPRequest(r)
if err != nil {
responseWithError(w, http.StatusUnauthorized, err)
return
}
// Check request body
if r.Body == nil {
responseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
}
body := http.MaxBytesReader(w, r.Body, BEACON_SIZE_LIMIT)
defer body.Close()
rewritenBuf, err := RewriteBatch(body, func(msg Message) Message {
var handledMessages bytes.Buffer
// Process each message in request data
err = ReadBatchReader(body, func(msg Message) {
switch m := msg.(type) {
case *SetNodeAttributeURLBased:
if m.Name == "src" || m.Name == "href" {
@ -150,30 +145,38 @@ func pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request) {
Rule: handleCSS(sessionData.ID, m.BaseURL, m.Rule),
}
}
return msg
handledMessages.Write(msg.Encode())
})
if err != nil {
responseWithError(w, http.StatusForbidden, err)
return
}
producer.Produce(TOPIC_RAW_WEB, sessionData.ID, rewritenBuf)
// Send processed messages to queue as array of bytes
err = producer.Produce(TOPIC_RAW_WEB, sessionData.ID, handledMessages.Bytes())
if err != nil {
log.Printf("can't send processed messages to queue: %s", err)
}
w.WriteHeader(http.StatusOK)
}
func notStartedHandlerWeb(w http.ResponseWriter, r *http.Request) {
type request struct {
ProjectKey *string `json:"projectKey"`
TrackerVersion string `json:"trackerVersion"`
DoNotTrack bool `json:"DoNotTrack"`
// Check request body
if r.Body == nil {
responseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
}
req := &request{}
body := http.MaxBytesReader(w, r.Body, JSON_SIZE_LIMIT)
defer body.Close()
// Parse request body
req := &notStartedRequest{}
if err := json.NewDecoder(body).Decode(req); err != nil {
responseWithError(w, http.StatusBadRequest, err)
return
}
// Handler's logic
if req.ProjectKey == nil {
responseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"))
return
@ -201,5 +204,6 @@ func notStartedHandlerWeb(w http.ResponseWriter, r *http.Request) {
if err != nil {
log.Printf("Unable to insert Unstarted Session: %v\n", err)
}
w.WriteHeader(http.StatusOK)
}

View file

@ -4,6 +4,7 @@ import (
"context"
"log"
"net/http"
"openreplay/backend/pkg/db/postgres"
"os"
"os/signal"
"syscall"
@ -11,7 +12,6 @@ import (
"golang.org/x/net/http2"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/env"
"openreplay/backend/pkg/flakeid"
"openreplay/backend/pkg/queue"
@ -47,26 +47,32 @@ func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
pprof.StartProfilingServer()
// Queue
producer = queue.NewProducer()
defer producer.Close(15000)
// Database
pgconn = cache.NewPGCache(postgres.NewConn(env.String("POSTGRES_STRING")), 1000*60*20)
defer pgconn.Close()
// Envs
TOPIC_RAW_WEB = env.String("TOPIC_RAW_WEB")
TOPIC_RAW_IOS = env.String("TOPIC_RAW_IOS")
TOPIC_CACHE = env.String("TOPIC_CACHE")
TOPIC_TRIGGER = env.String("TOPIC_TRIGGER")
//TOPIC_ANALYTICS = env.String("TOPIC_ANALYTICS")
CACHE_ASSESTS = env.Bool("CACHE_ASSETS")
BEACON_SIZE_LIMIT = int64(env.Uint64("BEACON_SIZE_LIMIT"))
HTTP_PORT := env.String("HTTP_PORT")
// Modules
rewriter = assets.NewRewriter(env.String("ASSETS_ORIGIN"))
pgconn = cache.NewPGCache(postgres.NewConn(env.String("POSTGRES_STRING")), 1000*60*20)
defer pgconn.Close()
s3 = storage.NewS3(env.String("AWS_REGION"), env.String("S3_BUCKET_IOS_IMAGES"))
tokenizer = token.NewTokenizer(env.String("TOKEN_SECRET"))
uaParser = uaparser.NewUAParser(env.String("UAPARSER_FILE"))
geoIP = geoip.NewGeoIP(env.String("MAXMINDDB_FILE"))
flaker = flakeid.NewFlaker(env.WorkerID())
CACHE_ASSESTS = env.Bool("CACHE_ASSETS")
BEACON_SIZE_LIMIT = int64(env.Uint64("BEACON_SIZE_LIMIT"))
HTTP_PORT := env.String("HTTP_PORT")
// Server
server := &http.Server{
Addr: ":" + HTTP_PORT,
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@ -140,6 +146,7 @@ func main() {
}
}),
}
http2.ConfigureServer(server, nil)
go func() {
if err := server.ListenAndServe(); err != nil {
@ -148,6 +155,7 @@ func main() {
}
}()
log.Printf("Server successfully started on port %v\n", HTTP_PORT)
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
<-sigchan

View file

@ -0,0 +1,30 @@
package main
type startSessionRequest struct {
Token string `json:"token"`
UserUUID *string `json:"userUUID"`
RevID string `json:"revID"`
Timestamp uint64 `json:"timestamp"`
TrackerVersion string `json:"trackerVersion"`
IsSnippet bool `json:"isSnippet"`
DeviceMemory uint64 `json:"deviceMemory"`
JsHeapSizeLimit uint64 `json:"jsHeapSizeLimit"`
ProjectKey *string `json:"projectKey"`
Reset bool `json:"reset"`
UserID string `json:"userID"`
}
type startSessionResponse struct {
Timestamp int64 `json:"timestamp"`
Delay int64 `json:"delay"`
Token string `json:"token"`
UserUUID string `json:"userUUID"`
SessionID string `json:"sessionID"`
BeaconSizeLimit int64 `json:"beaconSizeLimit"`
}
type notStartedRequest struct {
ProjectKey *string `json:"projectKey"`
TrackerVersion string `json:"trackerVersion"`
DoNotTrack bool `json:"DoNotTrack"`
}