Moved assets cache logic
This commit is contained in:
parent
69cabaecfe
commit
5ec46ad753
8 changed files with 126 additions and 96 deletions
83
backend/internal/assetscache/assets.go
Normal file
83
backend/internal/assetscache/assets.go
Normal file
|
|
@ -0,0 +1,83 @@
|
|||
package assetscache
|
||||
|
||||
import (
|
||||
"openreplay/backend/internal/config"
|
||||
"openreplay/backend/pkg/messages"
|
||||
"openreplay/backend/pkg/queue/types"
|
||||
"openreplay/backend/pkg/url/assets"
|
||||
)
|
||||
|
||||
type AssetsCache struct {
|
||||
cfg *config.Config
|
||||
rewriter *assets.Rewriter
|
||||
producer types.Producer
|
||||
}
|
||||
|
||||
func New(cfg *config.Config, rewriter *assets.Rewriter, producer types.Producer) *AssetsCache {
|
||||
return &AssetsCache{
|
||||
cfg: cfg,
|
||||
rewriter: rewriter,
|
||||
producer: producer,
|
||||
}
|
||||
}
|
||||
|
||||
func (e *AssetsCache) ParseAssets(sessID uint64, msg messages.Message) messages.Message {
|
||||
switch m := msg.(type) {
|
||||
case *messages.SetNodeAttributeURLBased:
|
||||
if m.Name == "src" || m.Name == "href" {
|
||||
return &messages.SetNodeAttribute{
|
||||
ID: m.ID,
|
||||
Name: m.Name,
|
||||
Value: e.handleURL(sessID, m.BaseURL, m.Value),
|
||||
}
|
||||
} else if m.Name == "style" {
|
||||
return &messages.SetNodeAttribute{
|
||||
ID: m.ID,
|
||||
Name: m.Name,
|
||||
Value: e.handleCSS(sessID, m.BaseURL, m.Value),
|
||||
}
|
||||
}
|
||||
case *messages.SetCSSDataURLBased:
|
||||
return &messages.SetCSSData{
|
||||
ID: m.ID,
|
||||
Data: e.handleCSS(sessID, m.BaseURL, m.Data),
|
||||
}
|
||||
case *messages.CSSInsertRuleURLBased:
|
||||
return &messages.CSSInsertRule{
|
||||
ID: m.ID,
|
||||
Index: m.Index,
|
||||
Rule: e.handleCSS(sessID, m.BaseURL, m.Rule),
|
||||
}
|
||||
}
|
||||
return msg
|
||||
}
|
||||
|
||||
func (e *AssetsCache) sendAssetForCache(sessionID uint64, baseURL string, relativeURL string) {
|
||||
if fullURL, cacheable := assets.GetFullCachableURL(baseURL, relativeURL); cacheable {
|
||||
e.producer.Produce(e.cfg.TopicCache, sessionID, messages.Encode(&messages.AssetCache{
|
||||
URL: fullURL,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
func (e *AssetsCache) sendAssetsForCacheFromCSS(sessionID uint64, baseURL string, css string) {
|
||||
for _, u := range assets.ExtractURLsFromCSS(css) { // TODO: in one shot with rewriting
|
||||
e.sendAssetForCache(sessionID, baseURL, u)
|
||||
}
|
||||
}
|
||||
|
||||
func (e *AssetsCache) handleURL(sessionID uint64, baseURL string, url string) string {
|
||||
if e.cfg.CacheAssets {
|
||||
e.sendAssetForCache(sessionID, baseURL, url)
|
||||
return e.rewriter.RewriteURL(sessionID, baseURL, url)
|
||||
}
|
||||
return assets.ResolveURL(baseURL, url)
|
||||
}
|
||||
|
||||
func (e *AssetsCache) handleCSS(sessionID uint64, baseURL string, css string) string {
|
||||
if e.cfg.CacheAssets {
|
||||
e.sendAssetsForCacheFromCSS(sessionID, baseURL, css)
|
||||
return e.rewriter.RewriteCSS(sessionID, baseURL, css)
|
||||
}
|
||||
return assets.ResolveCSS(baseURL, css)
|
||||
}
|
||||
|
|
@ -15,6 +15,7 @@ type Config struct {
|
|||
CacheAssets bool
|
||||
BeaconSizeLimit int64
|
||||
JsonSizeLimit int64
|
||||
FileSizeLimit int64
|
||||
AssetsOrigin string
|
||||
AWSRegion string
|
||||
S3BucketIOSImages string
|
||||
|
|
@ -36,6 +37,7 @@ func New() *Config {
|
|||
CacheAssets: env.Bool("CACHE_ASSETS"),
|
||||
BeaconSizeLimit: int64(env.Uint64("BEACON_SIZE_LIMIT")),
|
||||
JsonSizeLimit: 1e3, // 1Kb
|
||||
FileSizeLimit: 1e7, // 10Mb
|
||||
AssetsOrigin: env.String("ASSETS_ORIGIN"),
|
||||
AWSRegion: env.String("AWS_REGION"),
|
||||
S3BucketIOSImages: env.String("S3_BUCKET_IOS_IMAGES"),
|
||||
|
|
|
|||
|
|
@ -1,36 +0,0 @@
|
|||
package router
|
||||
|
||||
import (
|
||||
"openreplay/backend/pkg/messages"
|
||||
"openreplay/backend/pkg/url/assets"
|
||||
)
|
||||
|
||||
func (e *Router) sendAssetForCache(sessionID uint64, baseURL string, relativeURL string) {
|
||||
if fullURL, cacheable := assets.GetFullCachableURL(baseURL, relativeURL); cacheable {
|
||||
e.services.Producer.Produce(e.cfg.TopicCache, sessionID, messages.Encode(&messages.AssetCache{
|
||||
URL: fullURL,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Router) sendAssetsForCacheFromCSS(sessionID uint64, baseURL string, css string) {
|
||||
for _, u := range assets.ExtractURLsFromCSS(css) { // TODO: in one shot with rewriting
|
||||
e.sendAssetForCache(sessionID, baseURL, u)
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Router) handleURL(sessionID uint64, baseURL string, url string) string {
|
||||
if e.cfg.CacheAssets {
|
||||
e.sendAssetForCache(sessionID, baseURL, url)
|
||||
return e.services.Rewriter.RewriteURL(sessionID, baseURL, url)
|
||||
}
|
||||
return assets.ResolveURL(baseURL, url)
|
||||
}
|
||||
|
||||
func (e *Router) handleCSS(sessionID uint64, baseURL string, css string) string {
|
||||
if e.cfg.CacheAssets {
|
||||
e.sendAssetsForCacheFromCSS(sessionID, baseURL, css)
|
||||
return e.services.Rewriter.RewriteCSS(sessionID, baseURL, css)
|
||||
}
|
||||
return assets.ResolveCSS(baseURL, css)
|
||||
}
|
||||
|
|
@ -16,30 +16,13 @@ import (
|
|||
"openreplay/backend/pkg/token"
|
||||
)
|
||||
|
||||
const FILES_SIZE_LIMIT int64 = 1e7 // 10Mb
|
||||
|
||||
func (e *Router) startSessionHandlerIOS(w http.ResponseWriter, r *http.Request) {
|
||||
type request struct {
|
||||
Token string `json:"token"`
|
||||
ProjectKey *string `json:"projectKey"`
|
||||
TrackerVersion string `json:"trackerVersion"`
|
||||
RevID string `json:"revID"`
|
||||
UserUUID *string `json:"userUUID"`
|
||||
UserOSVersion string `json:"userOSVersion"`
|
||||
UserDevice string `json:"userDevice"`
|
||||
Timestamp uint64 `json:"timestamp"`
|
||||
}
|
||||
type response struct {
|
||||
Token string `json:"token"`
|
||||
ImagesHashList []string `json:"imagesHashList"`
|
||||
UserUUID string `json:"userUUID"`
|
||||
BeaconSizeLimit int64 `json:"beaconSizeLimit"`
|
||||
SessionID string `json:"sessionID"`
|
||||
}
|
||||
startTime := time.Now()
|
||||
req := &request{}
|
||||
req := &StartIOSSessionRequest{}
|
||||
|
||||
body := http.MaxBytesReader(w, r.Body, e.cfg.JsonSizeLimit)
|
||||
defer body.Close()
|
||||
|
||||
if err := json.NewDecoder(body).Decode(req); err != nil {
|
||||
ResponseWithError(w, http.StatusBadRequest, err)
|
||||
return
|
||||
|
|
@ -50,7 +33,7 @@ func (e *Router) startSessionHandlerIOS(w http.ResponseWriter, r *http.Request)
|
|||
return
|
||||
}
|
||||
|
||||
p, err := e.services.Pgconn.GetProjectByKey(*req.ProjectKey)
|
||||
p, err := e.services.Database.GetProjectByKey(*req.ProjectKey)
|
||||
if err != nil {
|
||||
if postgres.IsNoRowsErr(err) {
|
||||
ResponseWithError(w, http.StatusNotFound, errors.New("Project doesn't exist or is not active"))
|
||||
|
|
@ -100,7 +83,7 @@ func (e *Router) startSessionHandlerIOS(w http.ResponseWriter, r *http.Request)
|
|||
}))
|
||||
}
|
||||
|
||||
ResponseWithJSON(w, &response{
|
||||
ResponseWithJSON(w, &StartIOSSessionResponse{
|
||||
Token: e.services.Tokenizer.Compose(*tokenData),
|
||||
UserUUID: userUUID,
|
||||
SessionID: strconv.FormatUint(tokenData.ID, 10),
|
||||
|
|
@ -136,8 +119,9 @@ func (e *Router) imagesUploadHandlerIOS(w http.ResponseWriter, r *http.Request)
|
|||
return
|
||||
}
|
||||
|
||||
r.Body = http.MaxBytesReader(w, r.Body, FILES_SIZE_LIMIT)
|
||||
r.Body = http.MaxBytesReader(w, r.Body, e.cfg.FileSizeLimit)
|
||||
defer r.Body.Close()
|
||||
|
||||
err = r.ParseMultipartForm(1e6) // ~1Mb
|
||||
if err == http.ErrNotMultipart || err == http.ErrMissingBoundary {
|
||||
ResponseWithError(w, http.StatusUnsupportedMediaType, err)
|
||||
|
|
@ -166,7 +150,7 @@ func (e *Router) imagesUploadHandlerIOS(w http.ResponseWriter, r *http.Request)
|
|||
key := prefix + fileHeader.Filename
|
||||
log.Printf("Uploading image... %v", key)
|
||||
go func() { //TODO: mime type from header
|
||||
if err := e.services.S3.Upload(file, key, "image/jpeg", false); err != nil {
|
||||
if err := e.services.Storage.Upload(file, key, "image/jpeg", false); err != nil {
|
||||
log.Printf("Upload ios screen error. %v", err)
|
||||
}
|
||||
}()
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request)
|
|||
return
|
||||
}
|
||||
|
||||
p, err := e.services.Pgconn.GetProjectByKey(*req.ProjectKey)
|
||||
p, err := e.services.Database.GetProjectByKey(*req.ProjectKey)
|
||||
if err != nil {
|
||||
if postgres.IsNoRowsErr(err) {
|
||||
ResponseWithError(w, http.StatusNotFound, errors.New("Project doesn't exist or capture limit has been reached"))
|
||||
|
|
@ -119,33 +119,7 @@ func (e *Router) pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request)
|
|||
|
||||
// Process each message in request data
|
||||
err = ReadBatchReader(body, func(msg Message) {
|
||||
switch m := msg.(type) {
|
||||
case *SetNodeAttributeURLBased:
|
||||
if m.Name == "src" || m.Name == "href" {
|
||||
msg = &SetNodeAttribute{
|
||||
ID: m.ID,
|
||||
Name: m.Name,
|
||||
Value: e.handleURL(sessionData.ID, m.BaseURL, m.Value),
|
||||
}
|
||||
} else if m.Name == "style" {
|
||||
msg = &SetNodeAttribute{
|
||||
ID: m.ID,
|
||||
Name: m.Name,
|
||||
Value: e.handleCSS(sessionData.ID, m.BaseURL, m.Value),
|
||||
}
|
||||
}
|
||||
case *SetCSSDataURLBased:
|
||||
msg = &SetCSSData{
|
||||
ID: m.ID,
|
||||
Data: e.handleCSS(sessionData.ID, m.BaseURL, m.Data),
|
||||
}
|
||||
case *CSSInsertRuleURLBased:
|
||||
msg = &CSSInsertRule{
|
||||
ID: m.ID,
|
||||
Index: m.Index,
|
||||
Rule: e.handleCSS(sessionData.ID, m.BaseURL, m.Rule),
|
||||
}
|
||||
}
|
||||
msg = e.services.Assets.ParseAssets(sessionData.ID, msg)
|
||||
handledMessages.Write(msg.Encode())
|
||||
})
|
||||
if err != nil {
|
||||
|
|
@ -188,7 +162,7 @@ func (e *Router) notStartedHandlerWeb(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
country := e.services.GeoIP.ExtractISOCodeFromHTTPRequest(r)
|
||||
err := e.services.Pgconn.InsertUnstartedSession(postgres.UnstartedSession{
|
||||
err := e.services.Database.InsertUnstartedSession(postgres.UnstartedSession{
|
||||
ProjectKey: *req.ProjectKey,
|
||||
TrackerVersion: req.TrackerVersion,
|
||||
DoNotTrack: req.DoNotTrack,
|
||||
|
|
|
|||
|
|
@ -11,15 +11,17 @@ import (
|
|||
func (e *Router) pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64, topicName string) {
|
||||
body := http.MaxBytesReader(w, r.Body, e.cfg.BeaconSizeLimit)
|
||||
defer body.Close()
|
||||
|
||||
var reader io.ReadCloser
|
||||
var err error
|
||||
|
||||
switch r.Header.Get("Content-Encoding") {
|
||||
case "gzip":
|
||||
log.Println("Gzip", reader)
|
||||
|
||||
reader, err = gzip.NewReader(body)
|
||||
if err != nil {
|
||||
ResponseWithError(w, http.StatusInternalServerError, err) // TODO: stage-dependent responce
|
||||
ResponseWithError(w, http.StatusInternalServerError, err) // TODO: stage-dependent response
|
||||
return
|
||||
}
|
||||
log.Println("Gzip reader init", reader)
|
||||
|
|
|
|||
|
|
@ -28,3 +28,22 @@ type NotStartedRequest struct {
|
|||
TrackerVersion string `json:"trackerVersion"`
|
||||
DoNotTrack bool `json:"DoNotTrack"`
|
||||
}
|
||||
|
||||
type StartIOSSessionRequest struct {
|
||||
Token string `json:"token"`
|
||||
ProjectKey *string `json:"projectKey"`
|
||||
TrackerVersion string `json:"trackerVersion"`
|
||||
RevID string `json:"revID"`
|
||||
UserUUID *string `json:"userUUID"`
|
||||
UserOSVersion string `json:"userOSVersion"`
|
||||
UserDevice string `json:"userDevice"`
|
||||
Timestamp uint64 `json:"timestamp"`
|
||||
}
|
||||
|
||||
type StartIOSSessionResponse struct {
|
||||
Token string `json:"token"`
|
||||
ImagesHashList []string `json:"imagesHashList"`
|
||||
UserUUID string `json:"userUUID"`
|
||||
BeaconSizeLimit int64 `json:"beaconSizeLimit"`
|
||||
SessionID string `json:"sessionID"`
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package services
|
||||
|
||||
import (
|
||||
"openreplay/backend/internal/assetscache"
|
||||
"openreplay/backend/internal/config"
|
||||
"openreplay/backend/internal/geoip"
|
||||
"openreplay/backend/internal/uaparser"
|
||||
|
|
@ -13,22 +14,23 @@ import (
|
|||
)
|
||||
|
||||
type ServicesBuilder struct {
|
||||
Pgconn *cache.PGCache
|
||||
Database *cache.PGCache
|
||||
Producer types.Producer
|
||||
Rewriter *assets.Rewriter
|
||||
Assets *assetscache.AssetsCache
|
||||
Flaker *flakeid.Flaker
|
||||
UaParser *uaparser.UAParser
|
||||
GeoIP *geoip.GeoIP
|
||||
Tokenizer *token.Tokenizer
|
||||
S3 *storage.S3
|
||||
Storage *storage.S3
|
||||
}
|
||||
|
||||
func New(cfg *config.Config, producer types.Producer, pgconn *cache.PGCache) *ServicesBuilder {
|
||||
rewriter := assets.NewRewriter(cfg.AssetsOrigin)
|
||||
return &ServicesBuilder{
|
||||
Pgconn: pgconn,
|
||||
Database: pgconn,
|
||||
Producer: producer,
|
||||
Rewriter: assets.NewRewriter(cfg.AssetsOrigin),
|
||||
S3: storage.NewS3(cfg.AWSRegion, cfg.S3BucketIOSImages),
|
||||
Assets: assetscache.New(cfg, rewriter, producer),
|
||||
Storage: storage.NewS3(cfg.AWSRegion, cfg.S3BucketIOSImages),
|
||||
Tokenizer: token.NewTokenizer(cfg.TokenSecret),
|
||||
UaParser: uaparser.NewUAParser(cfg.UAParserFile),
|
||||
GeoIP: geoip.NewGeoIP(cfg.MaxMinDBFile),
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue