Moved the rest of the code to separate dirs

This commit is contained in:
Alexander Zavorotynskiy 2022-05-02 15:28:51 +02:00
parent df722761e5
commit 69cabaecfe
13 changed files with 52 additions and 54 deletions

View file

@ -3,9 +3,9 @@ package main
import (
"log"
"openreplay/backend/internal/config"
"openreplay/backend/internal/http"
"openreplay/backend/internal/router"
"openreplay/backend/internal/server"
"openreplay/backend/internal/services"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/pprof"
@ -31,7 +31,7 @@ func main() {
defer dbConn.Close()
// Build all services
services := http.NewServiceBuilder(cfg, producer, dbConn)
services := services.New(cfg, producer, dbConn)
// Init server's routes
router, err := router.NewRouter(cfg, services)

View file

@ -1,4 +1,4 @@
package http
package ios
import (
"strings"

View file

@ -6,7 +6,7 @@ import (
"log"
"math/rand"
"net/http"
http2 "openreplay/backend/internal/http"
"openreplay/backend/internal/ios"
"openreplay/backend/internal/uuid"
"strconv"
"time"
@ -41,21 +41,21 @@ func (e *Router) startSessionHandlerIOS(w http.ResponseWriter, r *http.Request)
body := http.MaxBytesReader(w, r.Body, e.cfg.JsonSizeLimit)
defer body.Close()
if err := json.NewDecoder(body).Decode(req); err != nil {
http2.ResponseWithError(w, http.StatusBadRequest, err)
ResponseWithError(w, http.StatusBadRequest, err)
return
}
if req.ProjectKey == nil {
http2.ResponseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"))
ResponseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"))
return
}
p, err := e.services.Pgconn.GetProjectByKey(*req.ProjectKey)
if err != nil {
if postgres.IsNoRowsErr(err) {
http2.ResponseWithError(w, http.StatusNotFound, errors.New("Project doesn't exist or is not active"))
ResponseWithError(w, http.StatusNotFound, errors.New("Project doesn't exist or is not active"))
} else {
http2.ResponseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
ResponseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
}
return
}
@ -65,18 +65,18 @@ func (e *Router) startSessionHandlerIOS(w http.ResponseWriter, r *http.Request)
if err != nil { // Starting the new one
dice := byte(rand.Intn(100)) // [0, 100)
if dice >= p.SampleRate {
http2.ResponseWithError(w, http.StatusForbidden, errors.New("cancel"))
ResponseWithError(w, http.StatusForbidden, errors.New("cancel"))
return
}
ua := e.services.UaParser.ParseFromHTTPRequest(r)
if ua == nil {
http2.ResponseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
ResponseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
return
}
sessionID, err := e.services.Flaker.Compose(uint64(startTime.UnixMilli()))
if err != nil {
http2.ResponseWithError(w, http.StatusInternalServerError, err)
ResponseWithError(w, http.StatusInternalServerError, err)
return
}
// TODO: if EXPIRED => send message for two sessions association
@ -94,13 +94,13 @@ func (e *Router) startSessionHandlerIOS(w http.ResponseWriter, r *http.Request)
UserUUID: userUUID,
UserOS: "IOS",
UserOSVersion: req.UserOSVersion,
UserDevice: http2.MapIOSDevice(req.UserDevice),
UserDeviceType: http2.GetIOSDeviceType(req.UserDevice),
UserDevice: ios.MapIOSDevice(req.UserDevice),
UserDeviceType: ios.GetIOSDeviceType(req.UserDevice),
UserCountry: country,
}))
}
http2.ResponseWithJSON(w, &response{
ResponseWithJSON(w, &response{
Token: e.services.Tokenizer.Compose(*tokenData),
UserUUID: userUUID,
SessionID: strconv.FormatUint(tokenData.ID, 10),
@ -111,7 +111,7 @@ func (e *Router) startSessionHandlerIOS(w http.ResponseWriter, r *http.Request)
func (e *Router) pushMessagesHandlerIOS(w http.ResponseWriter, r *http.Request) {
sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r)
if err != nil {
http2.ResponseWithError(w, http.StatusUnauthorized, err)
ResponseWithError(w, http.StatusUnauthorized, err)
return
}
e.pushMessages(w, r, sessionData.ID, e.cfg.TopicRawIOS)
@ -120,7 +120,7 @@ func (e *Router) pushMessagesHandlerIOS(w http.ResponseWriter, r *http.Request)
func (e *Router) pushLateMessagesHandlerIOS(w http.ResponseWriter, r *http.Request) {
sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r)
if err != nil && err != token.EXPIRED {
http2.ResponseWithError(w, http.StatusUnauthorized, err)
ResponseWithError(w, http.StatusUnauthorized, err)
return
}
// Check timestamps here?
@ -132,7 +132,7 @@ func (e *Router) imagesUploadHandlerIOS(w http.ResponseWriter, r *http.Request)
sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r)
if err != nil { // Should accept expired token?
http2.ResponseWithError(w, http.StatusUnauthorized, err)
ResponseWithError(w, http.StatusUnauthorized, err)
return
}
@ -140,18 +140,18 @@ func (e *Router) imagesUploadHandlerIOS(w http.ResponseWriter, r *http.Request)
defer r.Body.Close()
err = r.ParseMultipartForm(1e6) // ~1Mb
if err == http.ErrNotMultipart || err == http.ErrMissingBoundary {
http2.ResponseWithError(w, http.StatusUnsupportedMediaType, err)
ResponseWithError(w, http.StatusUnsupportedMediaType, err)
// } else if err == multipart.ErrMessageTooLarge // if non-files part exceeds 10 MB
} else if err != nil {
http2.ResponseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
ResponseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
}
if r.MultipartForm == nil {
http2.ResponseWithError(w, http.StatusInternalServerError, errors.New("Multipart not parsed"))
ResponseWithError(w, http.StatusInternalServerError, errors.New("Multipart not parsed"))
}
if len(r.MultipartForm.Value["projectKey"]) == 0 {
http2.ResponseWithError(w, http.StatusBadRequest, errors.New("projectKey parameter missing")) // status for missing/wrong parameter?
ResponseWithError(w, http.StatusBadRequest, errors.New("projectKey parameter missing")) // status for missing/wrong parameter?
return
}

View file

@ -7,7 +7,6 @@ import (
"log"
"math/rand"
"net/http"
http2 "openreplay/backend/internal/http"
"openreplay/backend/internal/uuid"
"strconv"
"time"
@ -22,30 +21,30 @@ func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request)
// Check request body
if r.Body == nil {
http2.ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
}
body := http.MaxBytesReader(w, r.Body, e.cfg.JsonSizeLimit)
defer body.Close()
// Parse request body
req := &http2.StartSessionRequest{}
req := &StartSessionRequest{}
if err := json.NewDecoder(body).Decode(req); err != nil {
http2.ResponseWithError(w, http.StatusBadRequest, err)
ResponseWithError(w, http.StatusBadRequest, err)
return
}
// Handler's logic
if req.ProjectKey == nil {
http2.ResponseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"))
ResponseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"))
return
}
p, err := e.services.Pgconn.GetProjectByKey(*req.ProjectKey)
if err != nil {
if postgres.IsNoRowsErr(err) {
http2.ResponseWithError(w, http.StatusNotFound, errors.New("Project doesn't exist or capture limit has been reached"))
ResponseWithError(w, http.StatusNotFound, errors.New("Project doesn't exist or capture limit has been reached"))
} else {
http2.ResponseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
ResponseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
}
return
}
@ -55,18 +54,18 @@ func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request)
if err != nil || req.Reset { // Starting the new one
dice := byte(rand.Intn(100)) // [0, 100)
if dice >= p.SampleRate {
http2.ResponseWithError(w, http.StatusForbidden, errors.New("cancel"))
ResponseWithError(w, http.StatusForbidden, errors.New("cancel"))
return
}
ua := e.services.UaParser.ParseFromHTTPRequest(r)
if ua == nil {
http2.ResponseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
ResponseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
return
}
sessionID, err := e.services.Flaker.Compose(uint64(startTime.UnixNano() / 1e6))
if err != nil {
http2.ResponseWithError(w, http.StatusInternalServerError, err)
ResponseWithError(w, http.StatusInternalServerError, err)
return
}
// TODO: if EXPIRED => send message for two sessions association
@ -93,7 +92,7 @@ func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request)
}))
}
http2.ResponseWithJSON(w, &http2.StartSessionResponse{
ResponseWithJSON(w, &StartSessionResponse{
Token: e.services.Tokenizer.Compose(*tokenData),
UserUUID: userUUID,
SessionID: strconv.FormatUint(tokenData.ID, 10),
@ -105,13 +104,13 @@ func (e *Router) pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request)
// Check authorization
sessionData, err := e.services.Tokenizer.ParseFromHTTPRequest(r)
if err != nil {
http2.ResponseWithError(w, http.StatusUnauthorized, err)
ResponseWithError(w, http.StatusUnauthorized, err)
return
}
// Check request body
if r.Body == nil {
http2.ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
}
body := http.MaxBytesReader(w, r.Body, e.cfg.BeaconSizeLimit)
defer body.Close()
@ -150,7 +149,7 @@ func (e *Router) pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request)
handledMessages.Write(msg.Encode())
})
if err != nil {
http2.ResponseWithError(w, http.StatusForbidden, err)
ResponseWithError(w, http.StatusForbidden, err)
return
}
@ -166,26 +165,26 @@ func (e *Router) pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request)
func (e *Router) notStartedHandlerWeb(w http.ResponseWriter, r *http.Request) {
// Check request body
if r.Body == nil {
http2.ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
ResponseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
}
body := http.MaxBytesReader(w, r.Body, e.cfg.JsonSizeLimit)
defer body.Close()
// Parse request body
req := &http2.NotStartedRequest{}
req := &NotStartedRequest{}
if err := json.NewDecoder(body).Decode(req); err != nil {
http2.ResponseWithError(w, http.StatusBadRequest, err)
ResponseWithError(w, http.StatusBadRequest, err)
return
}
// Handler's logic
if req.ProjectKey == nil {
http2.ResponseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"))
ResponseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"))
return
}
ua := e.services.UaParser.ParseFromHTTPRequest(r) // TODO?: insert anyway
if ua == nil {
http2.ResponseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
ResponseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
return
}
country := e.services.GeoIP.ExtractISOCodeFromHTTPRequest(r)

View file

@ -6,7 +6,6 @@ import (
"io/ioutil"
"log"
"net/http"
http2 "openreplay/backend/internal/http"
)
func (e *Router) pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64, topicName string) {
@ -20,7 +19,7 @@ func (e *Router) pushMessages(w http.ResponseWriter, r *http.Request, sessionID
reader, err = gzip.NewReader(body)
if err != nil {
http2.ResponseWithError(w, http.StatusInternalServerError, err) // TODO: stage-dependent responce
ResponseWithError(w, http.StatusInternalServerError, err) // TODO: stage-dependent responce
return
}
log.Println("Gzip reader init", reader)
@ -31,7 +30,7 @@ func (e *Router) pushMessages(w http.ResponseWriter, r *http.Request, sessionID
log.Println("Reader after switch:", reader)
buf, err := ioutil.ReadAll(reader)
if err != nil {
http2.ResponseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
ResponseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
return
}
e.services.Producer.Produce(topicName, sessionID, buf) // What if not able to send?

View file

@ -1,4 +1,4 @@
package http
package router
type StartSessionRequest struct {
Token string `json:"token"`

View file

@ -1,4 +1,4 @@
package http
package router
import (
"encoding/json"

View file

@ -5,16 +5,16 @@ import (
"log"
"net/http"
"openreplay/backend/internal/config"
http2 "openreplay/backend/internal/http"
http2 "openreplay/backend/internal/services"
)
type Router struct {
router *mux.Router
cfg *config.Config
services *http2.ServiceBuilder
services *http2.ServicesBuilder
}
func NewRouter(cfg *config.Config, services *http2.ServiceBuilder) (*Router, error) {
func NewRouter(cfg *config.Config, services *http2.ServicesBuilder) (*Router, error) {
e := &Router{
cfg: cfg,
services: services,

View file

@ -1,9 +1,9 @@
package http
package services
import (
"openreplay/backend/internal/config"
"openreplay/backend/internal/http/geoip"
"openreplay/backend/internal/http/uaparser"
"openreplay/backend/internal/geoip"
"openreplay/backend/internal/uaparser"
"openreplay/backend/pkg/db/cache"
"openreplay/backend/pkg/flakeid"
"openreplay/backend/pkg/queue/types"
@ -12,7 +12,7 @@ import (
"openreplay/backend/pkg/url/assets"
)
type ServiceBuilder struct {
type ServicesBuilder struct {
Pgconn *cache.PGCache
Producer types.Producer
Rewriter *assets.Rewriter
@ -23,8 +23,8 @@ type ServiceBuilder struct {
S3 *storage.S3
}
func NewServiceBuilder(cfg *config.Config, producer types.Producer, pgconn *cache.PGCache) *ServiceBuilder {
return &ServiceBuilder{
func New(cfg *config.Config, producer types.Producer, pgconn *cache.PGCache) *ServicesBuilder {
return &ServicesBuilder{
Pgconn: pgconn,
Producer: producer,
Rewriter: assets.NewRewriter(cfg.AssetsOrigin),