Removed global objects (moved service initialization into serviceBuilder)
This commit is contained in:
parent
b0bb5bd922
commit
66e190221d
9 changed files with 243 additions and 176 deletions
|
|
@ -5,32 +5,32 @@ import (
|
|||
"openreplay/backend/pkg/url/assets"
|
||||
)
|
||||
|
||||
func sendAssetForCache(sessionID uint64, baseURL string, relativeURL string) {
|
||||
func (e *Router) sendAssetForCache(sessionID uint64, baseURL string, relativeURL string) {
|
||||
if fullURL, cacheable := assets.GetFullCachableURL(baseURL, relativeURL); cacheable {
|
||||
producer.Produce(cfg.TopicCache, sessionID, messages.Encode(&messages.AssetCache{
|
||||
e.services.producer.Produce(e.cfg.TopicCache, sessionID, messages.Encode(&messages.AssetCache{
|
||||
URL: fullURL,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
func sendAssetsForCacheFromCSS(sessionID uint64, baseURL string, css string) {
|
||||
func (e *Router) sendAssetsForCacheFromCSS(sessionID uint64, baseURL string, css string) {
|
||||
for _, u := range assets.ExtractURLsFromCSS(css) { // TODO: in one shot with rewriting
|
||||
sendAssetForCache(sessionID, baseURL, u)
|
||||
e.sendAssetForCache(sessionID, baseURL, u)
|
||||
}
|
||||
}
|
||||
|
||||
func handleURL(sessionID uint64, baseURL string, url string) string {
|
||||
if cfg.CacheAssets {
|
||||
sendAssetForCache(sessionID, baseURL, url)
|
||||
return rewriter.RewriteURL(sessionID, baseURL, url)
|
||||
func (e *Router) handleURL(sessionID uint64, baseURL string, url string) string {
|
||||
if e.cfg.CacheAssets {
|
||||
e.sendAssetForCache(sessionID, baseURL, url)
|
||||
return e.services.rewriter.RewriteURL(sessionID, baseURL, url)
|
||||
}
|
||||
return assets.ResolveURL(baseURL, url)
|
||||
}
|
||||
|
||||
func handleCSS(sessionID uint64, baseURL string, css string) string {
|
||||
if cfg.CacheAssets {
|
||||
sendAssetsForCacheFromCSS(sessionID, baseURL, css)
|
||||
return rewriter.RewriteCSS(sessionID, baseURL, css)
|
||||
func (e *Router) handleCSS(sessionID uint64, baseURL string, css string) string {
|
||||
if e.cfg.CacheAssets {
|
||||
e.sendAssetsForCacheFromCSS(sessionID, baseURL, css)
|
||||
return e.services.rewriter.RewriteCSS(sessionID, baseURL, css)
|
||||
}
|
||||
return assets.ResolveCSS(baseURL, css)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,9 +1,14 @@
|
|||
package main
|
||||
|
||||
import "openreplay/backend/pkg/env"
|
||||
import (
|
||||
"openreplay/backend/pkg/env"
|
||||
"time"
|
||||
)
|
||||
|
||||
type config struct {
|
||||
HTTPHost string
|
||||
HTTPPort string
|
||||
HTTPTimeout time.Duration
|
||||
TopicRawWeb string
|
||||
TopicRawIOS string
|
||||
TopicCache string
|
||||
|
|
@ -13,6 +18,7 @@ type config struct {
|
|||
AssetsOrigin string
|
||||
AWSRegion string
|
||||
S3BucketIOSImages string
|
||||
Postgres string
|
||||
TokenSecret string
|
||||
UAParserFile string
|
||||
MaxMinDBFile string
|
||||
|
|
@ -21,7 +27,9 @@ type config struct {
|
|||
|
||||
func NewConfig() *config {
|
||||
return &config{
|
||||
HTTPHost: "",
|
||||
HTTPPort: env.String("HTTP_PORT"),
|
||||
HTTPTimeout: time.Second * 60,
|
||||
TopicRawWeb: env.String("TOPIC_RAW_WEB"),
|
||||
TopicRawIOS: env.String("TOPIC_RAW_IOS"),
|
||||
TopicCache: env.String("TOPIC_CACHE"),
|
||||
|
|
@ -31,6 +39,7 @@ func NewConfig() *config {
|
|||
AssetsOrigin: env.String("ASSETS_ORIGIN"),
|
||||
AWSRegion: env.String("AWS_REGION"),
|
||||
S3BucketIOSImages: env.String("S3_BUCKET_IOS_IMAGES"),
|
||||
Postgres: env.String("POSTGRES_STRING"),
|
||||
TokenSecret: env.String("TOKEN_SECRET"),
|
||||
UAParserFile: env.String("UAPARSER_FILE"),
|
||||
MaxMinDBFile: env.String("MAXMINDDB_FILE"),
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ import (
|
|||
|
||||
const FILES_SIZE_LIMIT int64 = 1e7 // 10Mb
|
||||
|
||||
func startSessionHandlerIOS(w http.ResponseWriter, r *http.Request) {
|
||||
func (e *Router) startSessionHandlerIOS(w http.ResponseWriter, r *http.Request) {
|
||||
type request struct {
|
||||
Token string `json:"token"`
|
||||
ProjectKey *string `json:"projectKey"`
|
||||
|
|
@ -36,7 +36,7 @@ func startSessionHandlerIOS(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
startTime := time.Now()
|
||||
req := &request{}
|
||||
body := http.MaxBytesReader(w, r.Body, cfg.JsonSizeLimit)
|
||||
body := http.MaxBytesReader(w, r.Body, e.cfg.JsonSizeLimit)
|
||||
defer body.Close()
|
||||
if err := json.NewDecoder(body).Decode(req); err != nil {
|
||||
responseWithError(w, http.StatusBadRequest, err)
|
||||
|
|
@ -48,7 +48,7 @@ func startSessionHandlerIOS(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
p, err := pgconn.GetProjectByKey(*req.ProjectKey)
|
||||
p, err := e.services.pgconn.GetProjectByKey(*req.ProjectKey)
|
||||
if err != nil {
|
||||
if postgres.IsNoRowsErr(err) {
|
||||
responseWithError(w, http.StatusNotFound, errors.New("Project doesn't exist or is not active"))
|
||||
|
|
@ -58,7 +58,7 @@ func startSessionHandlerIOS(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
userUUID := getUUID(req.UserUUID)
|
||||
tokenData, err := tokenizer.Parse(req.Token)
|
||||
tokenData, err := e.services.tokenizer.Parse(req.Token)
|
||||
|
||||
if err != nil { // Starting the new one
|
||||
dice := byte(rand.Intn(100)) // [0, 100)
|
||||
|
|
@ -67,12 +67,12 @@ func startSessionHandlerIOS(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
ua := uaParser.ParseFromHTTPRequest(r)
|
||||
ua := e.services.uaParser.ParseFromHTTPRequest(r)
|
||||
if ua == nil {
|
||||
responseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
|
||||
return
|
||||
}
|
||||
sessionID, err := flaker.Compose(uint64(startTime.UnixMilli()))
|
||||
sessionID, err := e.services.flaker.Compose(uint64(startTime.UnixMilli()))
|
||||
if err != nil {
|
||||
responseWithError(w, http.StatusInternalServerError, err)
|
||||
return
|
||||
|
|
@ -81,10 +81,10 @@ func startSessionHandlerIOS(w http.ResponseWriter, r *http.Request) {
|
|||
expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond)
|
||||
tokenData = &token.TokenData{sessionID, expTime.UnixMilli()}
|
||||
|
||||
country := geoIP.ExtractISOCodeFromHTTPRequest(r)
|
||||
country := e.services.geoIP.ExtractISOCodeFromHTTPRequest(r)
|
||||
|
||||
// The difference with web is mostly here:
|
||||
producer.Produce(cfg.TopicRawIOS, tokenData.ID, Encode(&IOSSessionStart{
|
||||
e.services.producer.Produce(e.cfg.TopicRawIOS, tokenData.ID, Encode(&IOSSessionStart{
|
||||
Timestamp: req.Timestamp,
|
||||
ProjectID: uint64(p.ProjectID),
|
||||
TrackerVersion: req.TrackerVersion,
|
||||
|
|
@ -99,36 +99,36 @@ func startSessionHandlerIOS(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
responseWithJSON(w, &response{
|
||||
Token: tokenizer.Compose(*tokenData),
|
||||
Token: e.services.tokenizer.Compose(*tokenData),
|
||||
UserUUID: userUUID,
|
||||
SessionID: strconv.FormatUint(tokenData.ID, 10),
|
||||
BeaconSizeLimit: cfg.BeaconSizeLimit,
|
||||
BeaconSizeLimit: e.cfg.BeaconSizeLimit,
|
||||
})
|
||||
}
|
||||
|
||||
func pushMessagesHandlerIOS(w http.ResponseWriter, r *http.Request) {
|
||||
sessionData, err := tokenizer.ParseFromHTTPRequest(r)
|
||||
func (e *Router) pushMessagesHandlerIOS(w http.ResponseWriter, r *http.Request) {
|
||||
sessionData, err := e.services.tokenizer.ParseFromHTTPRequest(r)
|
||||
if err != nil {
|
||||
responseWithError(w, http.StatusUnauthorized, err)
|
||||
return
|
||||
}
|
||||
pushMessages(w, r, sessionData.ID, cfg.TopicRawIOS)
|
||||
e.pushMessages(w, r, sessionData.ID, e.cfg.TopicRawIOS)
|
||||
}
|
||||
|
||||
func pushLateMessagesHandlerIOS(w http.ResponseWriter, r *http.Request) {
|
||||
sessionData, err := tokenizer.ParseFromHTTPRequest(r)
|
||||
func (e *Router) pushLateMessagesHandlerIOS(w http.ResponseWriter, r *http.Request) {
|
||||
sessionData, err := e.services.tokenizer.ParseFromHTTPRequest(r)
|
||||
if err != nil && err != token.EXPIRED {
|
||||
responseWithError(w, http.StatusUnauthorized, err)
|
||||
return
|
||||
}
|
||||
// Check timestamps here?
|
||||
pushMessages(w, r, sessionData.ID, cfg.TopicRawIOS)
|
||||
e.pushMessages(w, r, sessionData.ID, e.cfg.TopicRawIOS)
|
||||
}
|
||||
|
||||
func imagesUploadHandlerIOS(w http.ResponseWriter, r *http.Request) {
|
||||
func (e *Router) imagesUploadHandlerIOS(w http.ResponseWriter, r *http.Request) {
|
||||
log.Printf("recieved imagerequest")
|
||||
|
||||
sessionData, err := tokenizer.ParseFromHTTPRequest(r)
|
||||
sessionData, err := e.services.tokenizer.ParseFromHTTPRequest(r)
|
||||
if err != nil { // Should accept expired token?
|
||||
responseWithError(w, http.StatusUnauthorized, err)
|
||||
return
|
||||
|
|
@ -164,7 +164,7 @@ func imagesUploadHandlerIOS(w http.ResponseWriter, r *http.Request) {
|
|||
key := prefix + fileHeader.Filename
|
||||
log.Printf("Uploading image... %v", key)
|
||||
go func() { //TODO: mime type from header
|
||||
if err := s3.Upload(file, key, "image/jpeg", false); err != nil {
|
||||
if err := e.services.s3.Upload(file, key, "image/jpeg", false); err != nil {
|
||||
log.Printf("Upload ios screen error. %v", err)
|
||||
}
|
||||
}()
|
||||
|
|
|
|||
|
|
@ -15,14 +15,14 @@ import (
|
|||
"openreplay/backend/pkg/token"
|
||||
)
|
||||
|
||||
func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
|
||||
func (e *Router) startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
|
||||
startTime := time.Now()
|
||||
|
||||
// Check request body
|
||||
if r.Body == nil {
|
||||
responseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
|
||||
}
|
||||
body := http.MaxBytesReader(w, r.Body, cfg.JsonSizeLimit)
|
||||
body := http.MaxBytesReader(w, r.Body, e.cfg.JsonSizeLimit)
|
||||
defer body.Close()
|
||||
|
||||
// Parse request body
|
||||
|
|
@ -38,7 +38,7 @@ func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
p, err := pgconn.GetProjectByKey(*req.ProjectKey)
|
||||
p, err := e.services.pgconn.GetProjectByKey(*req.ProjectKey)
|
||||
if err != nil {
|
||||
if postgres.IsNoRowsErr(err) {
|
||||
responseWithError(w, http.StatusNotFound, errors.New("Project doesn't exist or capture limit has been reached"))
|
||||
|
|
@ -49,7 +49,7 @@ func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
userUUID := getUUID(req.UserUUID)
|
||||
tokenData, err := tokenizer.Parse(req.Token)
|
||||
tokenData, err := e.services.tokenizer.Parse(req.Token)
|
||||
if err != nil || req.Reset { // Starting the new one
|
||||
dice := byte(rand.Intn(100)) // [0, 100)
|
||||
if dice >= p.SampleRate {
|
||||
|
|
@ -57,12 +57,12 @@ func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
ua := uaParser.ParseFromHTTPRequest(r)
|
||||
ua := e.services.uaParser.ParseFromHTTPRequest(r)
|
||||
if ua == nil {
|
||||
responseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
|
||||
return
|
||||
}
|
||||
sessionID, err := flaker.Compose(uint64(startTime.UnixNano() / 1e6))
|
||||
sessionID, err := e.services.flaker.Compose(uint64(startTime.UnixNano() / 1e6))
|
||||
if err != nil {
|
||||
responseWithError(w, http.StatusInternalServerError, err)
|
||||
return
|
||||
|
|
@ -71,7 +71,7 @@ func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
|
|||
expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond)
|
||||
tokenData = &token.TokenData{ID: sessionID, ExpTime: expTime.UnixNano() / 1e6}
|
||||
|
||||
producer.Produce(cfg.TopicRawWeb, tokenData.ID, Encode(&SessionStart{
|
||||
e.services.producer.Produce(e.cfg.TopicRawWeb, tokenData.ID, Encode(&SessionStart{
|
||||
Timestamp: req.Timestamp,
|
||||
ProjectID: uint64(p.ProjectID),
|
||||
TrackerVersion: req.TrackerVersion,
|
||||
|
|
@ -84,7 +84,7 @@ func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
|
|||
UserBrowserVersion: ua.BrowserVersion,
|
||||
UserDevice: ua.Device,
|
||||
UserDeviceType: ua.DeviceType,
|
||||
UserCountry: geoIP.ExtractISOCodeFromHTTPRequest(r),
|
||||
UserCountry: e.services.geoIP.ExtractISOCodeFromHTTPRequest(r),
|
||||
UserDeviceMemorySize: req.DeviceMemory,
|
||||
UserDeviceHeapSize: req.JsHeapSizeLimit,
|
||||
UserID: req.UserID,
|
||||
|
|
@ -92,16 +92,16 @@ func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
responseWithJSON(w, &startSessionResponse{
|
||||
Token: tokenizer.Compose(*tokenData),
|
||||
Token: e.services.tokenizer.Compose(*tokenData),
|
||||
UserUUID: userUUID,
|
||||
SessionID: strconv.FormatUint(tokenData.ID, 10),
|
||||
BeaconSizeLimit: cfg.BeaconSizeLimit,
|
||||
BeaconSizeLimit: e.cfg.BeaconSizeLimit,
|
||||
})
|
||||
}
|
||||
|
||||
func pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request) {
|
||||
func (e *Router) pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request) {
|
||||
// Check authorization
|
||||
sessionData, err := tokenizer.ParseFromHTTPRequest(r)
|
||||
sessionData, err := e.services.tokenizer.ParseFromHTTPRequest(r)
|
||||
if err != nil {
|
||||
responseWithError(w, http.StatusUnauthorized, err)
|
||||
return
|
||||
|
|
@ -111,7 +111,7 @@ func pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request) {
|
|||
if r.Body == nil {
|
||||
responseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
|
||||
}
|
||||
body := http.MaxBytesReader(w, r.Body, cfg.BeaconSizeLimit)
|
||||
body := http.MaxBytesReader(w, r.Body, e.cfg.BeaconSizeLimit)
|
||||
defer body.Close()
|
||||
|
||||
var handledMessages bytes.Buffer
|
||||
|
|
@ -124,25 +124,25 @@ func pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request) {
|
|||
msg = &SetNodeAttribute{
|
||||
ID: m.ID,
|
||||
Name: m.Name,
|
||||
Value: handleURL(sessionData.ID, m.BaseURL, m.Value),
|
||||
Value: e.handleURL(sessionData.ID, m.BaseURL, m.Value),
|
||||
}
|
||||
} else if m.Name == "style" {
|
||||
msg = &SetNodeAttribute{
|
||||
ID: m.ID,
|
||||
Name: m.Name,
|
||||
Value: handleCSS(sessionData.ID, m.BaseURL, m.Value),
|
||||
Value: e.handleCSS(sessionData.ID, m.BaseURL, m.Value),
|
||||
}
|
||||
}
|
||||
case *SetCSSDataURLBased:
|
||||
msg = &SetCSSData{
|
||||
ID: m.ID,
|
||||
Data: handleCSS(sessionData.ID, m.BaseURL, m.Data),
|
||||
Data: e.handleCSS(sessionData.ID, m.BaseURL, m.Data),
|
||||
}
|
||||
case *CSSInsertRuleURLBased:
|
||||
msg = &CSSInsertRule{
|
||||
ID: m.ID,
|
||||
Index: m.Index,
|
||||
Rule: handleCSS(sessionData.ID, m.BaseURL, m.Rule),
|
||||
Rule: e.handleCSS(sessionData.ID, m.BaseURL, m.Rule),
|
||||
}
|
||||
}
|
||||
handledMessages.Write(msg.Encode())
|
||||
|
|
@ -153,7 +153,7 @@ func pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
// Send processed messages to queue as array of bytes
|
||||
err = producer.Produce(cfg.TopicRawWeb, sessionData.ID, handledMessages.Bytes())
|
||||
err = e.services.producer.Produce(e.cfg.TopicRawWeb, sessionData.ID, handledMessages.Bytes())
|
||||
if err != nil {
|
||||
log.Printf("can't send processed messages to queue: %s", err)
|
||||
}
|
||||
|
|
@ -161,12 +161,12 @@ func pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request) {
|
|||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
func notStartedHandlerWeb(w http.ResponseWriter, r *http.Request) {
|
||||
func (e *Router) notStartedHandlerWeb(w http.ResponseWriter, r *http.Request) {
|
||||
// Check request body
|
||||
if r.Body == nil {
|
||||
responseWithError(w, http.StatusBadRequest, errors.New("request body is empty"))
|
||||
}
|
||||
body := http.MaxBytesReader(w, r.Body, cfg.JsonSizeLimit)
|
||||
body := http.MaxBytesReader(w, r.Body, e.cfg.JsonSizeLimit)
|
||||
defer body.Close()
|
||||
|
||||
// Parse request body
|
||||
|
|
@ -181,13 +181,13 @@ func notStartedHandlerWeb(w http.ResponseWriter, r *http.Request) {
|
|||
responseWithError(w, http.StatusForbidden, errors.New("ProjectKey value required"))
|
||||
return
|
||||
}
|
||||
ua := uaParser.ParseFromHTTPRequest(r) // TODO?: insert anyway
|
||||
ua := e.services.uaParser.ParseFromHTTPRequest(r) // TODO?: insert anyway
|
||||
if ua == nil {
|
||||
responseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
|
||||
return
|
||||
}
|
||||
country := geoIP.ExtractISOCodeFromHTTPRequest(r)
|
||||
err := pgconn.InsertUnstartedSession(postgres.UnstartedSession{
|
||||
country := e.services.geoIP.ExtractISOCodeFromHTTPRequest(r)
|
||||
err := e.services.pgconn.InsertUnstartedSession(postgres.UnstartedSession{
|
||||
ProjectKey: *req.ProjectKey,
|
||||
TrackerVersion: req.TrackerVersion,
|
||||
DoNotTrack: req.DoNotTrack,
|
||||
|
|
|
|||
|
|
@ -9,8 +9,8 @@ import (
|
|||
gzip "github.com/klauspost/pgzip"
|
||||
)
|
||||
|
||||
func pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64, topicName string) {
|
||||
body := http.MaxBytesReader(w, r.Body, cfg.BeaconSizeLimit)
|
||||
func (e *Router) pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64, topicName string) {
|
||||
body := http.MaxBytesReader(w, r.Body, e.cfg.BeaconSizeLimit)
|
||||
defer body.Close()
|
||||
var reader io.ReadCloser
|
||||
var err error
|
||||
|
|
@ -34,6 +34,6 @@ func pushMessages(w http.ResponseWriter, r *http.Request, sessionID uint64, topi
|
|||
responseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging
|
||||
return
|
||||
}
|
||||
producer.Produce(topicName, sessionID, buf) // What if not able to send?
|
||||
e.services.producer.Produce(topicName, sessionID, buf) // What if not able to send?
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,151 +1,59 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"net/http"
|
||||
"openreplay/backend/pkg/db/cache"
|
||||
"openreplay/backend/pkg/db/postgres"
|
||||
"openreplay/backend/pkg/pprof"
|
||||
"openreplay/backend/pkg/queue"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"golang.org/x/net/http2"
|
||||
|
||||
"openreplay/backend/pkg/db/cache"
|
||||
"openreplay/backend/pkg/env"
|
||||
"openreplay/backend/pkg/flakeid"
|
||||
"openreplay/backend/pkg/queue"
|
||||
"openreplay/backend/pkg/queue/types"
|
||||
"openreplay/backend/pkg/storage"
|
||||
"openreplay/backend/pkg/token"
|
||||
"openreplay/backend/pkg/url/assets"
|
||||
"openreplay/backend/services/http/geoip"
|
||||
"openreplay/backend/services/http/uaparser"
|
||||
|
||||
"openreplay/backend/pkg/pprof"
|
||||
)
|
||||
|
||||
// Global variables
|
||||
var cfg *config
|
||||
var rewriter *assets.Rewriter
|
||||
var producer types.Producer
|
||||
var pgconn *cache.PGCache
|
||||
var flaker *flakeid.Flaker
|
||||
var uaParser *uaparser.UAParser
|
||||
var geoIP *geoip.GeoIP
|
||||
var tokenizer *token.Tokenizer
|
||||
var s3 *storage.S3
|
||||
|
||||
func main() {
|
||||
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
|
||||
pprof.StartProfilingServer()
|
||||
|
||||
// Configs
|
||||
cfg = NewConfig()
|
||||
// Load configuration
|
||||
cfg := NewConfig()
|
||||
|
||||
// Queue
|
||||
producer = queue.NewProducer()
|
||||
// Connect to queue
|
||||
producer := queue.NewProducer()
|
||||
defer producer.Close(15000)
|
||||
|
||||
// Database
|
||||
pgconn = cache.NewPGCache(postgres.NewConn(env.String("POSTGRES_STRING")), 1000*60*20)
|
||||
defer pgconn.Close()
|
||||
// Connect to database
|
||||
dbConn := cache.NewPGCache(postgres.NewConn(cfg.Postgres), 1000*60*20)
|
||||
defer dbConn.Close()
|
||||
|
||||
// Init modules
|
||||
rewriter = assets.NewRewriter(cfg.AssetsOrigin)
|
||||
s3 = storage.NewS3(cfg.AWSRegion, cfg.S3BucketIOSImages)
|
||||
tokenizer = token.NewTokenizer(cfg.TokenSecret)
|
||||
uaParser = uaparser.NewUAParser(cfg.UAParserFile)
|
||||
geoIP = geoip.NewGeoIP(cfg.MaxMinDBFile)
|
||||
flaker = flakeid.NewFlaker(cfg.WorkerID)
|
||||
// Build all services
|
||||
services := NewServiceBuilder(cfg, producer, dbConn)
|
||||
|
||||
// Server
|
||||
server := &http.Server{
|
||||
Addr: ":" + cfg.HTTPPort,
|
||||
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// TODO: agree with specification
|
||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
w.Header().Set("Access-Control-Allow-Methods", "POST")
|
||||
w.Header().Set("Access-Control-Allow-Headers", "Content-Type,Authorization")
|
||||
if r.Method == http.MethodOptions {
|
||||
w.Header().Set("Cache-Control", "max-age=86400")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("Request: %v - %v ", r.Method, r.URL.Path)
|
||||
|
||||
switch r.URL.Path {
|
||||
case "/":
|
||||
w.WriteHeader(http.StatusOK)
|
||||
case "/v1/web/not-started":
|
||||
switch r.Method {
|
||||
case http.MethodPost:
|
||||
notStartedHandlerWeb(w, r)
|
||||
default:
|
||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||
}
|
||||
case "/v1/web/start":
|
||||
switch r.Method {
|
||||
case http.MethodPost:
|
||||
startSessionHandlerWeb(w, r)
|
||||
default:
|
||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||
}
|
||||
case "/v1/web/i":
|
||||
switch r.Method {
|
||||
case http.MethodPost:
|
||||
pushMessagesHandlerWeb(w, r)
|
||||
default:
|
||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||
}
|
||||
case "/v1/ios/start":
|
||||
switch r.Method {
|
||||
case http.MethodPost:
|
||||
startSessionHandlerIOS(w, r)
|
||||
default:
|
||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||
}
|
||||
case "/v1/ios/i":
|
||||
switch r.Method {
|
||||
case http.MethodPost:
|
||||
pushMessagesHandlerIOS(w, r)
|
||||
default:
|
||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||
}
|
||||
case "/v1/ios/late":
|
||||
switch r.Method {
|
||||
case http.MethodPost:
|
||||
pushLateMessagesHandlerIOS(w, r)
|
||||
default:
|
||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||
}
|
||||
case "/v1/ios/images":
|
||||
switch r.Method {
|
||||
case http.MethodPost:
|
||||
imagesUploadHandlerIOS(w, r)
|
||||
default:
|
||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||
}
|
||||
default:
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
}
|
||||
}),
|
||||
// Init server's routes
|
||||
router, err := NewRouter(cfg, services)
|
||||
if err != nil {
|
||||
log.Fatalf("failed while creating engine: %s", err)
|
||||
}
|
||||
|
||||
http2.ConfigureServer(server, nil)
|
||||
// Init server
|
||||
server, err := NewServer(router.GetHandler(), cfg.HTTPHost, cfg.HTTPPort, cfg.HTTPTimeout)
|
||||
if err != nil {
|
||||
log.Fatalf("failed while creating server: %s", err)
|
||||
}
|
||||
|
||||
// Run server
|
||||
go func() {
|
||||
if err := server.ListenAndServe(); err != nil {
|
||||
if err := server.Start(); err != nil {
|
||||
log.Printf("Server error: %v\n", err)
|
||||
log.Fatal("Server error")
|
||||
}
|
||||
}()
|
||||
log.Printf("Server successfully started on port %v\n", cfg.HTTPPort)
|
||||
|
||||
// Wait stop signal to shut down server gracefully
|
||||
sigchan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
|
||||
<-sigchan
|
||||
log.Printf("Shutting down the server\n")
|
||||
server.Shutdown(context.Background())
|
||||
server.Stop()
|
||||
}
|
||||
|
|
|
|||
68
backend/services/http/router.go
Normal file
68
backend/services/http/router.go
Normal file
|
|
@ -0,0 +1,68 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"github.com/gorilla/mux"
|
||||
"log"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
type Router struct {
|
||||
router *mux.Router
|
||||
cfg *config
|
||||
services *ServiceBuilder
|
||||
}
|
||||
|
||||
func NewRouter(cfg *config, services *ServiceBuilder) (*Router, error) {
|
||||
e := &Router{
|
||||
cfg: cfg,
|
||||
services: services,
|
||||
}
|
||||
e.init()
|
||||
return e, nil
|
||||
}
|
||||
|
||||
func (e *Router) init() {
|
||||
e.router = mux.NewRouter()
|
||||
// Root path
|
||||
e.router.HandleFunc("/", e.root).Methods("POST")
|
||||
|
||||
// Web handlers
|
||||
e.router.HandleFunc("/v1/web/not-started", e.notStartedHandlerWeb).Methods("POST")
|
||||
e.router.HandleFunc("/v1/web/start", e.startSessionHandlerWeb).Methods("POST")
|
||||
e.router.HandleFunc("/v1/web/i", e.pushMessagesHandlerWeb).Methods("POST")
|
||||
|
||||
// iOS handlers
|
||||
e.router.HandleFunc("/v1/ios/start", e.startSessionHandlerIOS).Methods("POST")
|
||||
e.router.HandleFunc("/v1/ios/i", e.pushMessagesHandlerIOS).Methods("POST")
|
||||
e.router.HandleFunc("/v1/ios/late", e.pushLateMessagesHandlerIOS).Methods("POST")
|
||||
e.router.HandleFunc("/v1/ios/images", e.imagesUploadHandlerIOS).Methods("POST")
|
||||
|
||||
// CORS middleware
|
||||
e.router.Use(e.corsMiddleware)
|
||||
}
|
||||
|
||||
func (e *Router) root(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
func (e *Router) corsMiddleware(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
// Prepare headers for preflight requests
|
||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
w.Header().Set("Access-Control-Allow-Methods", "POST")
|
||||
w.Header().Set("Access-Control-Allow-Headers", "Content-Type,Authorization")
|
||||
if r.Method == http.MethodOptions {
|
||||
w.Header().Set("Cache-Control", "max-age=86400")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
return
|
||||
}
|
||||
log.Printf("Request: %v - %v ", r.Method, r.URL.Path)
|
||||
|
||||
// Serve request
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
func (e *Router) GetHandler() http.Handler {
|
||||
return e.router
|
||||
}
|
||||
46
backend/services/http/server.go
Normal file
46
backend/services/http/server.go
Normal file
|
|
@ -0,0 +1,46 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"golang.org/x/net/http2"
|
||||
"log"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
server *http.Server
|
||||
}
|
||||
|
||||
func NewServer(handler http.Handler, host, port string, timeout time.Duration) (*Server, error) {
|
||||
switch {
|
||||
case port == "":
|
||||
return nil, errors.New("empty server port")
|
||||
case handler == nil:
|
||||
return nil, errors.New("empty handler")
|
||||
case timeout < 1:
|
||||
return nil, fmt.Errorf("invalid timeout %d", timeout)
|
||||
}
|
||||
server := &http.Server{
|
||||
Addr: fmt.Sprintf("%s:%s", host, port),
|
||||
Handler: handler,
|
||||
ReadTimeout: timeout,
|
||||
WriteTimeout: timeout,
|
||||
}
|
||||
if err := http2.ConfigureServer(server, nil); err != nil {
|
||||
log.Printf("can't configure http2 server: %s", err)
|
||||
}
|
||||
return &Server{
|
||||
server: server,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Server) Start() error {
|
||||
return s.server.ListenAndServe()
|
||||
}
|
||||
|
||||
func (s *Server) Stop() {
|
||||
s.server.Shutdown(context.Background())
|
||||
}
|
||||
36
backend/services/http/service.go
Normal file
36
backend/services/http/service.go
Normal file
|
|
@ -0,0 +1,36 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"openreplay/backend/pkg/db/cache"
|
||||
"openreplay/backend/pkg/flakeid"
|
||||
"openreplay/backend/pkg/queue/types"
|
||||
"openreplay/backend/pkg/storage"
|
||||
"openreplay/backend/pkg/token"
|
||||
"openreplay/backend/pkg/url/assets"
|
||||
"openreplay/backend/services/http/geoip"
|
||||
"openreplay/backend/services/http/uaparser"
|
||||
)
|
||||
|
||||
type ServiceBuilder struct {
|
||||
pgconn *cache.PGCache
|
||||
producer types.Producer
|
||||
rewriter *assets.Rewriter
|
||||
flaker *flakeid.Flaker
|
||||
uaParser *uaparser.UAParser
|
||||
geoIP *geoip.GeoIP
|
||||
tokenizer *token.Tokenizer
|
||||
s3 *storage.S3
|
||||
}
|
||||
|
||||
func NewServiceBuilder(cfg *config, producer types.Producer, pgconn *cache.PGCache) *ServiceBuilder {
|
||||
return &ServiceBuilder{
|
||||
pgconn: pgconn,
|
||||
producer: producer,
|
||||
rewriter: assets.NewRewriter(cfg.AssetsOrigin),
|
||||
s3: storage.NewS3(cfg.AWSRegion, cfg.S3BucketIOSImages),
|
||||
tokenizer: token.NewTokenizer(cfg.TokenSecret),
|
||||
uaParser: uaparser.NewUAParser(cfg.UAParserFile),
|
||||
geoIP: geoip.NewGeoIP(cfg.MaxMinDBFile),
|
||||
flaker: flakeid.NewFlaker(cfg.WorkerID),
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue