Save the last batch (#2748)
* feat(backend): fix to save the latest message tracker just after the token has been expired * feat(http): return 401 even after successfully saved batch for JustExpired case
This commit is contained in:
parent
fdc281a406
commit
b2cb874a2a
3 changed files with 38 additions and 9 deletions
|
|
@ -302,9 +302,14 @@ func (e *Router) pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request)
|
||||||
if sessionData != nil {
|
if sessionData != nil {
|
||||||
r = r.WithContext(context.WithValue(r.Context(), "sessionID", fmt.Sprintf("%d", sessionData.ID)))
|
r = r.WithContext(context.WithValue(r.Context(), "sessionID", fmt.Sprintf("%d", sessionData.ID)))
|
||||||
}
|
}
|
||||||
|
tokenJustExpired := false
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.ResponseWithError(r.Context(), w, http.StatusUnauthorized, err, startTime, r.URL.Path, bodySize)
|
if errors.Is(err, token.JUST_EXPIRED) {
|
||||||
return
|
tokenJustExpired = true
|
||||||
|
} else {
|
||||||
|
e.ResponseWithError(r.Context(), w, http.StatusUnauthorized, err, startTime, r.URL.Path, bodySize)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add sessionID and projectID to context
|
// Add sessionID and projectID to context
|
||||||
|
|
@ -314,13 +319,21 @@ func (e *Router) pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request)
|
||||||
|
|
||||||
// Check request body
|
// Check request body
|
||||||
if r.Body == nil {
|
if r.Body == nil {
|
||||||
e.ResponseWithError(r.Context(), w, http.StatusBadRequest, errors.New("request body is empty"), startTime, r.URL.Path, bodySize)
|
errCode := http.StatusBadRequest
|
||||||
|
if tokenJustExpired {
|
||||||
|
errCode = http.StatusUnauthorized
|
||||||
|
}
|
||||||
|
e.ResponseWithError(r.Context(), w, errCode, errors.New("request body is empty"), startTime, r.URL.Path, bodySize)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
bodyBytes, err := e.readBody(w, r, e.getBeaconSize(sessionData.ID))
|
bodyBytes, err := e.readBody(w, r, e.getBeaconSize(sessionData.ID))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.ResponseWithError(r.Context(), w, http.StatusRequestEntityTooLarge, err, startTime, r.URL.Path, bodySize)
|
errCode := http.StatusRequestEntityTooLarge
|
||||||
|
if tokenJustExpired {
|
||||||
|
errCode = http.StatusUnauthorized
|
||||||
|
}
|
||||||
|
e.ResponseWithError(r.Context(), w, errCode, err, startTime, r.URL.Path, bodySize)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
bodySize = len(bodyBytes)
|
bodySize = len(bodyBytes)
|
||||||
|
|
@ -329,10 +342,18 @@ func (e *Router) pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request)
|
||||||
err = e.services.Producer.Produce(e.cfg.TopicRawWeb, sessionData.ID, bodyBytes)
|
err = e.services.Producer.Produce(e.cfg.TopicRawWeb, sessionData.ID, bodyBytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.log.Error(r.Context(), "can't send messages batch to queue: %s", err)
|
e.log.Error(r.Context(), "can't send messages batch to queue: %s", err)
|
||||||
e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, errors.New("can't save message, try again"), startTime, r.URL.Path, bodySize)
|
errCode := http.StatusInternalServerError
|
||||||
|
if tokenJustExpired {
|
||||||
|
errCode = http.StatusUnauthorized
|
||||||
|
}
|
||||||
|
e.ResponseWithError(r.Context(), w, errCode, errors.New("can't save message, try again"), startTime, r.URL.Path, bodySize)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if tokenJustExpired {
|
||||||
|
e.ResponseWithError(r.Context(), w, http.StatusUnauthorized, errors.New("token expired"), startTime, r.URL.Path, bodySize)
|
||||||
|
return
|
||||||
|
}
|
||||||
e.ResponseOK(r.Context(), w, startTime, r.URL.Path, bodySize)
|
e.ResponseOK(r.Context(), w, startTime, r.URL.Path, bodySize)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,7 @@ const BEARER_SCHEMA = "Bearer "
|
||||||
func (tokenizer *Tokenizer) ParseFromHTTPRequest(r *http.Request) (*TokenData, error) {
|
func (tokenizer *Tokenizer) ParseFromHTTPRequest(r *http.Request) (*TokenData, error) {
|
||||||
header := r.Header.Get("Authorization")
|
header := r.Header.Get("Authorization")
|
||||||
if !strings.HasPrefix(header, BEARER_SCHEMA) {
|
if !strings.HasPrefix(header, BEARER_SCHEMA) {
|
||||||
return nil, errors.New("Missing token")
|
return nil, errors.New("missing token")
|
||||||
}
|
}
|
||||||
token := header[len(BEARER_SCHEMA):]
|
token := header[len(BEARER_SCHEMA):]
|
||||||
return tokenizer.Parse(token)
|
return tokenizer.Parse(token)
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,10 @@ import (
|
||||||
"github.com/btcsuite/btcutil/base58"
|
"github.com/btcsuite/btcutil/base58"
|
||||||
)
|
)
|
||||||
|
|
||||||
var EXPIRED = errors.New("token expired")
|
var (
|
||||||
|
EXPIRED = errors.New("token expired")
|
||||||
|
JUST_EXPIRED = errors.New("token just expired")
|
||||||
|
)
|
||||||
|
|
||||||
type Tokenizer struct {
|
type Tokenizer struct {
|
||||||
secret []byte
|
secret []byte
|
||||||
|
|
@ -64,8 +67,13 @@ func (tokenizer *Tokenizer) Parse(token string) (*TokenData, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
res := &TokenData{id, delay, expTime}
|
||||||
if expTime <= time.Now().UnixMilli() {
|
if expTime <= time.Now().UnixMilli() {
|
||||||
return &TokenData{id, delay, expTime}, EXPIRED
|
// If token is expired less than 30 seconds ago, we still consider it semi-valid
|
||||||
|
if expTime+30000 > time.Now().UnixMilli() {
|
||||||
|
return res, JUST_EXPIRED
|
||||||
|
}
|
||||||
|
return res, EXPIRED
|
||||||
}
|
}
|
||||||
return &TokenData{id, delay, expTime}, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue