fix(backend-http):read from request without copying byts to array first

This commit is contained in:
ShiKhu 2022-04-23 13:54:56 +02:00
parent d392f4ca89
commit 263dd79d7f
2 changed files with 25 additions and 32 deletions

View file

@ -1,14 +1,17 @@
package messages
import (
"io"
"bytes"
"io"
"github.com/pkg/errors"
)
func ReadBatch(b []byte, callback func(Message)) error {
reader := bytes.NewReader(b)
return ReadBatchReader(bytes.NewReader(b), callback)
}
func ReadBatchReader(reader io.Reader, callback func(Message)) error {
var index uint64
var timestamp int64
for {
@ -21,12 +24,12 @@ func ReadBatch(b []byte, callback func(Message)) error {
msg = transformDepricated(msg)
isBatchMeta := false
switch m := msg.(type){
case *BatchMeta: // Is not required to be present in batch since IOS doesn't have it (though we might change it)
switch m := msg.(type) {
case *BatchMeta: // Is not required to be present in batch since IOS doesn't have it (though we might change it)
if index != 0 { // Might be several 0-0 BatchMeta in a row without a error though
return errors.New("Batch Meta found at the end of the batch")
}
index = m.PageNo << 32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha)
index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha)
timestamp = m.Timestamp
isBatchMeta = true
// continue readLoop
@ -34,7 +37,7 @@ func ReadBatch(b []byte, callback func(Message)) error {
if index != 0 { // Might be several 0-0 BatchMeta in a row without a error though
return errors.New("Batch Meta found at the end of the batch")
}
index = m.FirstIndex
index = m.FirstIndex
timestamp = int64(m.Timestamp)
isBatchMeta = true
// continue readLoop
@ -46,23 +49,23 @@ func ReadBatch(b []byte, callback func(Message)) error {
msg.Meta().Index = index
msg.Meta().Timestamp = timestamp
callback(msg)
if !isBatchMeta { // Without that indexes will be unique anyway, though shifted by 1 because BatchMeta is not counted in tracker
if !isBatchMeta { // Without that indexes will be unique anyway, though shifted by 1 because BatchMeta is not counted in tracker
index++
}
}
return errors.New("Error of the codeflow. (Should return on EOF)")
}
const AVG_MESSAGE_SIZE = 40 // TODO: calculate OR calculate dynamically
const AVG_MESSAGE_SIZE = 40 // TODO: calculate OR calculate dynamically
func WriteBatch(mList []Message) []byte {
batch := make([]byte, AVG_MESSAGE_SIZE * len(mList))
batch := make([]byte, AVG_MESSAGE_SIZE*len(mList))
p := 0
for _, msg := range mList {
msgBytes := msg.Encode()
if len(batch) < p + len(msgBytes) {
newBatch := make([]byte, 2*len(batch) + len(msgBytes))
copy(newBatch, batch)
batch = newBatch
if len(batch) < p+len(msgBytes) {
newBatch := make([]byte, 2*len(batch)+len(msgBytes))
copy(newBatch, batch)
batch = newBatch
}
copy(batch[p:], msgBytes)
p += len(msgBytes)
@ -70,12 +73,12 @@ func WriteBatch(mList []Message) []byte {
return batch[:p]
}
func RewriteBatch(b []byte, rewrite func(Message) Message) ([]byte, error) {
mList := make([]Message, 0, len(b)/AVG_MESSAGE_SIZE)
if err := ReadBatch(b, func(m Message) {
func RewriteBatch(reader io.Reader, rewrite func(Message) Message) ([]byte, error) {
mList := make([]Message, 0, 10) // 10?
if err := ReadBatchReader(reader, func(m Message) {
mList = append(mList, rewrite(m))
}); err != nil {
return nil, err
}
return WriteBatch(mList), nil
}
}

View file

@ -3,7 +3,6 @@ package main
import (
"encoding/json"
"errors"
"io/ioutil"
"log"
"math/rand"
"net/http"
@ -76,14 +75,14 @@ func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
responseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
return
}
sessionID, err := flaker.Compose(uint64(startTime.UnixMilli()))
sessionID, err := flaker.Compose(uint64(startTime.UnixNano() / 1e6))
if err != nil {
responseWithError(w, http.StatusInternalServerError, err)
return
}
// TODO: if EXPIRED => send message for two sessions association
expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond)
tokenData = &token.TokenData{sessionID, expTime.UnixMilli()}
tokenData = &token.TokenData{sessionID, expTime.UnixNano() / 1e6}
country := geoIP.ExtractISOCodeFromHTTPRequest(r)
producer.Produce(TOPIC_RAW_WEB, tokenData.ID, Encode(&SessionStart{
@ -108,8 +107,8 @@ func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
//delayDuration := time.Now().Sub(startTime)
responseWithJSON(w, &response{
//Timestamp: startTime.UnixMilli(),
//Delay: delayDuration.Milliseconds(),
//Timestamp: startTime.UnixNano() / 1e6,
//Delay: delayDuration.Nanoseconds() / 1e6,
Token: tokenizer.Compose(*tokenData),
UserUUID: userUUID,
SessionID: strconv.FormatUint(tokenData.ID, 10),
@ -125,17 +124,8 @@ func pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request) {
}
body := http.MaxBytesReader(w, r.Body, BEACON_SIZE_LIMIT)
defer body.Close()
buf, err := ioutil.ReadAll(body)
if err != nil {
responseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
return
}
//log.Printf("Sending batch...")
//startTime := time.Now()
// analyticsMessages := make([]Message, 0, 200)
rewritenBuf, err := RewriteBatch(buf, func(msg Message) Message {
rewritenBuf, err := RewriteBatch(body, func(msg Message) Message {
switch m := msg.(type) {
case *SetNodeAttributeURLBased:
if m.Name == "src" || m.Name == "href" {