feat(backend): fixed issue in sessions cache layer
This commit is contained in:
parent
676041d90b
commit
a01a66afeb
4 changed files with 41 additions and 96 deletions
|
|
@ -100,6 +100,33 @@ func IncreaseTotalRequests(method, table string) {
|
|||
dbTotalRequests.WithLabelValues(method, table).Inc()
|
||||
}
|
||||
|
||||
var cacheRedisRequests = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "cache",
|
||||
Name: "redis_requests_total",
|
||||
Help: "A counter showing the total number of all Redis requests.",
|
||||
},
|
||||
[]string{"method", "table"},
|
||||
)
|
||||
|
||||
func IncreaseRedisRequests(method, table string) {
|
||||
cacheRedisRequests.WithLabelValues(method, table).Inc()
|
||||
}
|
||||
|
||||
var cacheRedisRequestDuration = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: "cache",
|
||||
Name: "redis_request_duration_seconds",
|
||||
Help: "A histogram displaying the duration of each Redis request in seconds.",
|
||||
Buckets: common.DefaultDurationBuckets,
|
||||
},
|
||||
[]string{"method", "table"},
|
||||
)
|
||||
|
||||
func RecordRedisRequestDuration(durMillis float64, method, table string) {
|
||||
cacheRedisRequestDuration.WithLabelValues(method, table).Observe(durMillis / 1000.0)
|
||||
}
|
||||
|
||||
func List() []prometheus.Collector {
|
||||
return []prometheus.Collector{
|
||||
dbBatchElements,
|
||||
|
|
@ -109,5 +136,7 @@ func List() []prometheus.Collector {
|
|||
dbBulkInsertDuration,
|
||||
dbRequestDuration,
|
||||
dbTotalRequests,
|
||||
cacheRedisRequests,
|
||||
cacheRedisRequestDuration,
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,8 +3,9 @@ package sessions
|
|||
import (
|
||||
"errors"
|
||||
"log"
|
||||
"openreplay/backend/pkg/cache"
|
||||
"time"
|
||||
|
||||
"openreplay/backend/pkg/cache"
|
||||
)
|
||||
|
||||
type Cache interface {
|
||||
|
|
@ -33,6 +34,7 @@ func (i *inMemoryCacheImpl) Get(sessionID uint64) (*Session, error) {
|
|||
}
|
||||
session, err := i.redis.Get(sessionID)
|
||||
if err == nil {
|
||||
i.sessions.Set(sessionID, session)
|
||||
return session, nil
|
||||
}
|
||||
if !errors.Is(err, ErrDisabledCache) && err.Error() != "redis: nil" {
|
||||
|
|
@ -43,7 +45,7 @@ func (i *inMemoryCacheImpl) Get(sessionID uint64) (*Session, error) {
|
|||
|
||||
func NewInMemoryCache(redisCache Cache) Cache {
|
||||
return &inMemoryCacheImpl{
|
||||
sessions: cache.New(time.Minute*3, time.Minute*5),
|
||||
sessions: cache.New(time.Minute*3, time.Minute*10),
|
||||
redis: redisCache,
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -179,56 +179,12 @@ func (s *sessionsImpl) UpdateUserID(sessionID uint64, userID string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *sessionsImpl) _updateUserID(sessionID uint64, userID string) error {
|
||||
if err := s.storage.InsertUserID(sessionID, userID); err != nil {
|
||||
return err
|
||||
}
|
||||
if session, err := s.cache.Get(sessionID); err != nil {
|
||||
session.UserID = &userID
|
||||
if err := s.cache.Set(session); err != nil {
|
||||
log.Printf("Failed to cache session: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
session, err := s.getFromDB(sessionID)
|
||||
if err != nil {
|
||||
log.Printf("Failed to get session from postgres: %v", err)
|
||||
return nil
|
||||
}
|
||||
if err := s.cache.Set(session); err != nil {
|
||||
log.Printf("Failed to cache session: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateAnonymousID usage: in db handler
|
||||
func (s *sessionsImpl) UpdateAnonymousID(sessionID uint64, userAnonymousID string) error {
|
||||
s.updates.AddUserID(sessionID, userAnonymousID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *sessionsImpl) _updateAnonymousID(sessionID uint64, userAnonymousID string) error {
|
||||
if err := s.storage.InsertUserAnonymousID(sessionID, userAnonymousID); err != nil {
|
||||
return err
|
||||
}
|
||||
if session, err := s.cache.Get(sessionID); err != nil {
|
||||
session.UserAnonymousID = &userAnonymousID
|
||||
if err := s.cache.Set(session); err != nil {
|
||||
log.Printf("Failed to cache session: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
session, err := s.getFromDB(sessionID)
|
||||
if err != nil {
|
||||
log.Printf("Failed to get session from postgres: %v", err)
|
||||
return nil
|
||||
}
|
||||
if err := s.cache.Set(session); err != nil {
|
||||
log.Printf("Failed to cache session: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateReferrer usage: in db handler on each page event
|
||||
func (s *sessionsImpl) UpdateReferrer(sessionID uint64, referrer string) error {
|
||||
if referrer == "" {
|
||||
|
|
@ -239,30 +195,6 @@ func (s *sessionsImpl) UpdateReferrer(sessionID uint64, referrer string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *sessionsImpl) _updateReferrer(sessionID uint64, referrer string) error {
|
||||
baseReferrer := url.DiscardURLQuery(referrer)
|
||||
if err := s.storage.InsertReferrer(sessionID, referrer, baseReferrer); err != nil {
|
||||
return err
|
||||
}
|
||||
if session, err := s.cache.Get(sessionID); err != nil {
|
||||
session.Referrer = &referrer
|
||||
session.ReferrerBase = &baseReferrer
|
||||
if err := s.cache.Set(session); err != nil {
|
||||
log.Printf("Failed to cache session: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
session, err := s.getFromDB(sessionID)
|
||||
if err != nil {
|
||||
log.Printf("Failed to get session from postgres: %v", err)
|
||||
return nil
|
||||
}
|
||||
if err := s.cache.Set(session); err != nil {
|
||||
log.Printf("Failed to cache session: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateMetadata usage: in db handler on each metadata event
|
||||
func (s *sessionsImpl) UpdateMetadata(sessionID uint64, key, value string) error {
|
||||
session, err := s.Get(sessionID)
|
||||
|
|
@ -283,31 +215,6 @@ func (s *sessionsImpl) UpdateMetadata(sessionID uint64, key, value string) error
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *sessionsImpl) _updateMetadata(sessionID uint64, key, value string) error {
|
||||
session, err := s.Get(sessionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
project, err := s.projects.GetProject(session.ProjectID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
keyNo := project.GetMetadataNo(key)
|
||||
if keyNo == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := s.storage.InsertMetadata(sessionID, keyNo, value); err != nil {
|
||||
return err
|
||||
}
|
||||
session.SetMetadata(keyNo, value)
|
||||
if err := s.cache.Set(session); err != nil {
|
||||
log.Printf("Failed to cache session: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *sessionsImpl) UpdateEventsStats(sessionID uint64, events, pages int) error {
|
||||
s.updates.AddEvents(sessionID, events, pages)
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import (
|
|||
"time"
|
||||
|
||||
"openreplay/backend/pkg/db/redis"
|
||||
"openreplay/backend/pkg/metrics/database"
|
||||
)
|
||||
|
||||
type cacheImpl struct {
|
||||
|
|
@ -23,13 +24,16 @@ func (c *cacheImpl) Set(session *Session) error {
|
|||
if session.SessionID == 0 {
|
||||
return errors.New("session id is 0")
|
||||
}
|
||||
start := time.Now()
|
||||
sessionBytes, err := json.Marshal(session)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = c.db.Redis.Set(fmt.Sprintf("session:id:%d", session.SessionID), sessionBytes, time.Minute*30).Result(); err != nil {
|
||||
if _, err = c.db.Redis.Set(fmt.Sprintf("session:id:%d", session.SessionID), sessionBytes, time.Minute*60).Result(); err != nil {
|
||||
return err
|
||||
}
|
||||
database.RecordRedisRequestDuration(float64(time.Now().Sub(start).Milliseconds()), "set", "session")
|
||||
database.IncreaseRedisRequests("set", "sessions")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -40,6 +44,7 @@ func (c *cacheImpl) Get(sessionID uint64) (*Session, error) {
|
|||
if sessionID == 0 {
|
||||
return nil, errors.New("session id is 0")
|
||||
}
|
||||
start := time.Now()
|
||||
result, err := c.db.Redis.Get(fmt.Sprintf("session:id:%d", sessionID)).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -48,6 +53,8 @@ func (c *cacheImpl) Get(sessionID uint64) (*Session, error) {
|
|||
if err = json.Unmarshal([]byte(result), session); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
database.RecordRedisRequestDuration(float64(time.Now().Sub(start).Milliseconds()), "get", "session")
|
||||
database.IncreaseRedisRequests("get", "sessions")
|
||||
return session, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue