feat(assist-api): added the golang part

This commit is contained in:
Alexander 2025-04-14 15:47:56 +02:00
parent df10fa706b
commit 96f58b94d5
19 changed files with 1517 additions and 0 deletions

View file

@ -0,0 +1,53 @@
package main
import (
"context"
assistConfig "openreplay/backend/internal/config/assist"
"openreplay/backend/pkg/assist"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/db/redis"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/metrics"
databaseMetrics "openreplay/backend/pkg/metrics/database"
"openreplay/backend/pkg/metrics/web"
"openreplay/backend/pkg/server"
"openreplay/backend/pkg/server/api"
)
func main() {
ctx := context.Background()
log := logger.New()
cfg := assistConfig.New(log)
// Observability
webMetrics := web.New("assist")
dbMetric := databaseMetrics.New("assist")
metrics.New(log, append(webMetrics.List(), dbMetric.List()...))
pgConn, err := pool.New(dbMetric, cfg.Postgres.String())
if err != nil {
log.Fatal(ctx, "can't init postgres connection: %s", err)
}
defer pgConn.Close()
redisClient, err := redis.New(&cfg.Redis)
if err != nil {
log.Fatal(ctx, "can't init redis connection: %s", err)
}
defer redisClient.Close()
prefix := api.NoPrefix
builder, err := assist.NewServiceBuilder(log, cfg, webMetrics, dbMetric, pgConn, redisClient, prefix)
if err != nil {
log.Fatal(ctx, "can't init services: %s", err)
}
router, err := api.NewRouter(&cfg.HTTP, log)
if err != nil {
log.Fatal(ctx, "failed while creating router: %s", err)
}
router.AddHandlers(prefix, builder.AssistAPI)
router.AddMiddlewares(builder.Auth.Middleware, builder.RateLimiter.Middleware, builder.AuditTrail.Middleware)
server.Run(ctx, log, &cfg.HTTP, router)
}

View file

@ -0,0 +1,201 @@
package api
import (
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/gorilla/mux"
assistAPI "openreplay/backend/internal/config/assist"
"openreplay/backend/pkg/assist/service"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/server/api"
)
type handlersImpl struct {
cfg *assistAPI.Config
log logger.Logger
responser *api.Responser
jsonSizeLimit int64
assist service.Assist
}
func NewHandlers(log logger.Logger, cfg *assistAPI.Config, responser *api.Responser, assist service.Assist) (api.Handlers, error) {
return &handlersImpl{
cfg: cfg,
log: log,
responser: responser,
jsonSizeLimit: cfg.JsonSizeLimit,
assist: assist,
}, nil
}
func (e *handlersImpl) GetAll() []*api.Description {
keyPrefix := ""
if e.cfg.AssistKey != "" {
keyPrefix = fmt.Sprintf("/%s", e.cfg.AssistKey)
}
return []*api.Description{
{keyPrefix + "/sockets-list/{projectKey}/autocomplete", e.autocomplete, "GET"}, // event search with live=true
{keyPrefix + "/sockets-list/{projectKey}/{sessionId}", e.socketsListByProject, "GET"}, // is_live for getReplay call
{keyPrefix + "/sockets-live/{projectKey}", e.socketsLiveByProject, "GET"}, // handler /{projectId}/assist/sessions for co-browser
{keyPrefix + "/sockets-live/{projectKey}/{sessionId}", e.socketsLiveBySession, "GET"}, // for get_live_session (with data) and for session_exists
{"/v1/ping", e.ping, "GET"},
}
}
func (e *handlersImpl) ping(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}
func getProjectKey(r *http.Request) (string, error) {
vars := mux.Vars(r)
key := vars["projectKey"]
if key == "" {
return "", fmt.Errorf("empty project key")
}
return key, nil
}
func getSessionID(r *http.Request) (string, error) {
vars := mux.Vars(r)
key := vars["sessionId"]
if key == "" {
return "", fmt.Errorf("empty session ID")
}
return key, nil
}
func getQuery(r *http.Request) (*service.Query, error) {
params := r.URL.Query()
q := &service.Query{
Key: params.Get("key"),
Value: params.Get("q"),
}
if q.Key == "" || q.Value == "" {
return nil, fmt.Errorf("empty key or value")
}
return q, nil
}
func (e *handlersImpl) autocomplete(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
projectKey, err := getProjectKey(r)
if err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
query, err := getQuery(r)
if err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
resp, err := e.assist.Autocomplete(projectKey, query)
if err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
e.responser.ResponseWithJSON(e.log, r.Context(), w, resp, startTime, r.URL.Path, bodySize)
}
func (e *handlersImpl) socketsListByProject(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
projectKey, err := getProjectKey(r)
if err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
sessionID, err := getSessionID(r)
if err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
bodyBytes, err := api.ReadBody(e.log, w, r, e.jsonSizeLimit)
if err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusRequestEntityTooLarge, err, startTime, r.URL.Path, bodySize)
return
}
bodySize = len(bodyBytes)
req := &service.Request{}
if err := json.Unmarshal(bodyBytes, req); err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
resp, err := e.assist.IsLive(projectKey, sessionID, req)
if err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
e.responser.ResponseWithJSON(e.log, r.Context(), w, resp, startTime, r.URL.Path, bodySize)
}
func (e *handlersImpl) socketsLiveByProject(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
projectKey, err := getProjectKey(r)
if err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
bodyBytes, err := api.ReadBody(e.log, w, r, e.jsonSizeLimit)
if err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusRequestEntityTooLarge, err, startTime, r.URL.Path, bodySize)
return
}
bodySize = len(bodyBytes)
req := &service.Request{}
if err := json.Unmarshal(bodyBytes, req); err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
resp, err := e.assist.GetAll(projectKey, req)
if err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
e.responser.ResponseWithJSON(e.log, r.Context(), w, resp, startTime, r.URL.Path, bodySize)
}
func (e *handlersImpl) socketsLiveBySession(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
projectKey, err := getProjectKey(r)
if err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
sessionID, err := getSessionID(r)
if err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
bodyBytes, err := api.ReadBody(e.log, w, r, e.jsonSizeLimit)
if err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusRequestEntityTooLarge, err, startTime, r.URL.Path, bodySize)
return
}
bodySize = len(bodyBytes)
req := &service.Request{}
if err := json.Unmarshal(bodyBytes, req); err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
resp, err := e.assist.GetByID(projectKey, sessionID, req)
if err != nil {
e.responser.ResponseWithError(e.log, r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
e.responser.ResponseWithJSON(e.log, r.Context(), w, resp, startTime, r.URL.Path, bodySize)
}

View file

@ -0,0 +1 @@
package api

View file

@ -0,0 +1,52 @@
package assist
import (
"openreplay/backend/pkg/db/redis"
"openreplay/backend/pkg/projects"
"openreplay/backend/pkg/sessionmanager"
"time"
"openreplay/backend/internal/config/assist"
assistAPI "openreplay/backend/pkg/assist/api"
"openreplay/backend/pkg/assist/service"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/metrics/database"
"openreplay/backend/pkg/metrics/web"
"openreplay/backend/pkg/server/api"
"openreplay/backend/pkg/server/auth"
"openreplay/backend/pkg/server/limiter"
"openreplay/backend/pkg/server/tracer"
)
type ServicesBuilder struct {
Auth auth.Auth
RateLimiter *limiter.UserRateLimiter
AuditTrail tracer.Tracer
AssistAPI api.Handlers
}
func NewServiceBuilder(log logger.Logger, cfg *assist.Config, webMetrics web.Web, dbMetrics database.Database, pgconn pool.Pool, redis *redis.Client, prefix string) (*ServicesBuilder, error) {
projectsManager := projects.New(log, pgconn, redis, dbMetrics)
sessManager, err := sessionmanager.New(log, redis.Redis)
if err != nil {
return nil, err
}
sessManager.Start()
assist := service.NewAssist(log, pgconn, projectsManager, sessManager)
auditrail, err := tracer.NewTracer(log, pgconn, dbMetrics)
if err != nil {
return nil, err
}
responser := api.NewResponser(webMetrics)
handlers, err := assistAPI.NewHandlers(log, cfg, responser, assist)
if err != nil {
return nil, err
}
return &ServicesBuilder{
Auth: auth.NewAuth(log, cfg.JWTSecret, cfg.JWTSpotSecret, pgconn, nil, prefix),
RateLimiter: limiter.NewUserRateLimiter(10, 30, 1*time.Minute, 5*time.Minute),
AuditTrail: auditrail,
AssistAPI: handlers,
}, nil
}

View file

@ -0,0 +1,134 @@
package service
import (
"fmt"
"strconv"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/projects"
"openreplay/backend/pkg/sessionmanager"
)
type Query struct {
Key string
Value string
}
type Filter struct {
Type string `json:"type"`
Value []string `json:"value"`
Operator string `json:"operator"` // is|contains
Source string `json:"source"` // for metadata only
}
type Request struct {
Filters []Filter `json:"filters"`
Order string `json:"order"` // sort.order [asc|desc]
Limit int `json:"limit"` // pagination.limit
Page int `json:"page"` // pagination.page
}
type assistImpl struct {
log logger.Logger
pgconn pool.Pool
projects projects.Projects
sessions sessionmanager.SessionManager
}
type Assist interface {
Autocomplete(projectKey string, query *Query) (interface{}, error)
IsLive(projectKey, sessionID string, filters *Request) (bool, error)
GetAll(projectKey string, filters *Request) (interface{}, error)
GetByID(projectKey, sessionID string, filters *Request) (interface{}, error)
}
func NewAssist(log logger.Logger, pgconn pool.Pool, projects projects.Projects, sessions sessionmanager.SessionManager) Assist {
return &assistImpl{
log: log,
pgconn: pgconn,
projects: projects,
sessions: sessions,
}
}
func (a *assistImpl) Autocomplete(projectKey string, query *Query) (interface{}, error) {
switch {
case projectKey == "":
return nil, fmt.Errorf("project key is required")
case query == nil:
return nil, fmt.Errorf("query is required")
case query.Key == "":
return nil, fmt.Errorf("query key is required")
case query.Value == "":
return nil, fmt.Errorf("query value is required")
}
project, err := a.projects.GetProjectByKey(projectKey)
if err != nil {
return nil, fmt.Errorf("failed to get project by key: %s", err)
}
return a.sessions.Autocomplete(strconv.Itoa(int(project.ProjectID)), sessionmanager.FilterType(query.Key), query.Value)
}
func (a *assistImpl) IsLive(projectKey, sessionID string, filters *Request) (bool, error) {
switch {
case projectKey == "":
return false, fmt.Errorf("project key is required")
case sessionID == "":
return false, fmt.Errorf("session ID is required")
case filters == nil:
return false, fmt.Errorf("filters are required")
}
project, err := a.projects.GetProjectByKey(projectKey)
if err != nil {
return false, fmt.Errorf("failed to get project by key: %s", err)
}
sess, err := a.sessions.GetByID(strconv.Itoa(int(project.ProjectID)), sessionID)
if err != nil {
return false, fmt.Errorf("failed to get session by ID: %s", err)
}
return sess != nil, nil
}
func (a *assistImpl) GetAll(projectKey string, request *Request) (interface{}, error) {
switch {
case projectKey == "":
return nil, fmt.Errorf("project key is required")
case request == nil:
return nil, fmt.Errorf("filters are required")
}
project, err := a.projects.GetProjectByKey(projectKey)
if err != nil {
return nil, fmt.Errorf("failed to get project by key: %s", err)
}
order := sessionmanager.Asc
if request.Order == "desc" {
order = sessionmanager.Desc
}
filters := make([]*sessionmanager.Filter, 0, len(request.Filters))
for _, f := range request.Filters {
filters = append(filters, &sessionmanager.Filter{
Type: sessionmanager.FilterType(f.Type),
Value: f.Value,
Operator: f.Operator == "is",
Source: f.Source,
})
}
return a.sessions.GetAll(strconv.Itoa(int(project.ProjectID)), filters, order, request.Page, request.Limit)
}
func (a *assistImpl) GetByID(projectKey, sessionID string, filters *Request) (interface{}, error) {
switch {
case projectKey == "":
return nil, fmt.Errorf("project key is required")
case sessionID == "":
return nil, fmt.Errorf("session ID is required")
case filters == nil:
return nil, fmt.Errorf("filters are required")
}
project, err := a.projects.GetProjectByKey(projectKey)
if err != nil {
return nil, fmt.Errorf("failed to get project by key: %s", err)
}
return a.sessions.GetByID(strconv.Itoa(int(project.ProjectID)), sessionID)
}

View file

@ -0,0 +1,526 @@
package sessionmanager
import (
"context"
"encoding/json"
"errors"
"fmt"
"sort"
"strings"
"sync"
"time"
"github.com/go-redis/redis"
"openreplay/backend/pkg/logger"
)
const (
TickInterval = 10 * time.Second
NodeKeyPattern = "assist:nodes:*"
ActiveSessionPrefix = "assist:online_sessions:"
RecentlyUpdatedSessions = "assist:updated_sessions"
BatchSize = 1000
)
type SessionData struct {
Timestamp uint64 `json:"timestamp"`
ProjectID string `json:"projectID"`
SessionID string `json:"sessionID"`
UserID *string `json:"userID"`
UserUUID *string `json:"userUUID"`
UserOS *string `json:"userOs"`
UserBrowser *string `json:"userBrowser"`
UserDevice *string `json:"userDevice"`
UserPlatform *string `json:"userDeviceType"` // is
UserCountry *string `json:"userCountry"` // is
UserState *string `json:"userState"` // is
UserCity *string `json:"userCity"` // is
Metadata *map[string]string `json:"metadata"` // contains
Raw string
}
type SessionManager interface {
Start()
Stop()
GetByID(projectID, sessionID string) (*SessionData, error)
GetAll(projectID string, filters []*Filter, sort SortOrder, page, limit int) ([]*SessionData, error)
Autocomplete(projectID string, key FilterType, value string) ([]string, error)
}
type sessionManagerImpl struct {
ctx context.Context
log logger.Logger
client *redis.Client
ticker *time.Ticker
wg *sync.WaitGroup
stopChan chan struct{}
mutex *sync.RWMutex
cache map[string]*SessionData
sorted []*SessionData
}
func New(log logger.Logger, redis *redis.Client) (SessionManager, error) {
switch {
case log == nil:
return nil, fmt.Errorf("logger is required")
case redis == nil:
return nil, fmt.Errorf("redis client is required")
}
sm := &sessionManagerImpl{
ctx: context.Background(),
log: log,
client: redis,
ticker: time.NewTicker(TickInterval),
wg: &sync.WaitGroup{},
stopChan: make(chan struct{}),
mutex: &sync.RWMutex{},
cache: make(map[string]*SessionData),
sorted: make([]*SessionData, 0),
}
return sm, nil
}
func (sm *sessionManagerImpl) Start() {
sm.log.Debug(sm.ctx, "Starting session manager...")
go func() {
sm.loadSessions()
for {
select {
case <-sm.ticker.C:
sm.updateSessions()
case <-sm.stopChan:
sm.log.Debug(sm.ctx, "Stopping session manager...")
return
}
}
}()
}
func (sm *sessionManagerImpl) Stop() {
close(sm.stopChan)
sm.ticker.Stop()
sm.wg.Wait()
if err := sm.client.Close(); err != nil {
sm.log.Debug(sm.ctx, "Error closing Redis connection: %v", err)
}
sm.log.Debug(sm.ctx, "Session manager stopped")
}
func (sm *sessionManagerImpl) getNodeIDs() ([]string, error) {
var nodeIDs = make([]string, 0, 16) // Let's assume we have at most 16 nodes
var cursor uint64 = 0
for {
keys, nextCursor, err := sm.client.Scan(cursor, NodeKeyPattern, 100).Result()
if err != nil {
return nil, fmt.Errorf("scan failed: %v", err)
}
for _, key := range keys {
nodeIDs = append(nodeIDs, key)
}
cursor = nextCursor
if cursor == 0 {
break
}
}
return nodeIDs, nil
}
func (sm *sessionManagerImpl) getAllNodeSessions(nodeIDs []string) map[string]struct{} {
allSessionIDs := make(map[string]struct{})
var mu sync.Mutex
var wg sync.WaitGroup
for _, nodeID := range nodeIDs {
wg.Add(1)
go func(id string) {
defer wg.Done()
sessionListJSON, err := sm.client.Get(id).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
return
}
sm.log.Debug(sm.ctx, "Error getting session list for node %s: %v", id, err)
return
}
var sessionList []string
if err = json.Unmarshal([]byte(sessionListJSON), &sessionList); err != nil {
sm.log.Debug(sm.ctx, "Error unmarshalling session list for node %s: %v", id, err)
return
}
mu.Lock()
for _, sessionID := range sessionList {
allSessionIDs[sessionID] = struct{}{}
}
mu.Unlock()
}(nodeID)
}
wg.Wait()
return allSessionIDs
}
func (sm *sessionManagerImpl) getOnlineSessionIDs() (map[string]struct{}, error) {
nodeIDs, err := sm.getNodeIDs()
if err != nil {
sm.log.Debug(sm.ctx, "Error getting node IDs: %v", err)
return nil, err
}
sm.log.Debug(sm.ctx, "Found %d nodes", len(nodeIDs))
allSessionIDs := sm.getAllNodeSessions(nodeIDs)
sm.log.Debug(sm.ctx, "Collected %d unique session IDs", len(allSessionIDs))
return allSessionIDs, nil
}
func (sm *sessionManagerImpl) getSessionData(sessionIDs []string) map[string]*SessionData {
sessionData := make(map[string]*SessionData, len(sessionIDs))
for i := 0; i < len(sessionIDs); i += BatchSize {
end := i + BatchSize
if end > len(sessionIDs) {
end = len(sessionIDs)
}
batch := sessionIDs[i:end]
keys := make([]string, len(batch))
for j, id := range batch {
keys[j] = ActiveSessionPrefix + id
}
results, err := sm.client.MGet(keys...).Result()
if err != nil {
sm.log.Debug(sm.ctx, "Error in MGET operation: %v", err)
continue // TODO: Handle the error
}
for j, result := range results {
if result == nil {
continue
}
strVal, ok := result.(string)
if !ok {
sm.log.Debug(sm.ctx, "Unexpected type for session data: %T", result)
continue
}
var data SessionData
if err := json.Unmarshal([]byte(strVal), &data); err != nil {
sm.log.Debug(sm.ctx, "Error unmarshalling session data: %v", err)
continue
}
data.Raw = strVal
sessionData[batch[j]] = &data
}
sm.log.Debug(sm.ctx, "Collected %d sessions", len(results))
}
sm.wg.Wait()
return sessionData
}
func (sm *sessionManagerImpl) updateCache(sessionsToAdd map[string]*SessionData, sessionsToRemove []string) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
if sessionsToRemove != nil {
for _, sessID := range sessionsToRemove {
delete(sm.cache, sessID)
}
}
if sessionsToAdd == nil {
return
}
for sessID, session := range sessionsToAdd {
sm.cache[sessID] = session
}
sessionList := make([]*SessionData, 0, len(sm.cache))
for _, s := range sm.cache {
sessionList = append(sessionList, s)
}
sort.Slice(sessionList, func(i, j int) bool {
return sessionList[i].Timestamp < sessionList[j].Timestamp
})
sm.sorted = sessionList
}
func (sm *sessionManagerImpl) loadSessions() {
startTime := time.Now()
sm.log.Debug(sm.ctx, "Starting session processing cycle")
sessIDs, err := sm.getOnlineSessionIDs()
if err != nil {
sm.log.Debug(sm.ctx, "Error getting online session IDs: %v", err)
return
}
if len(sessIDs) == 0 {
sm.log.Debug(sm.ctx, "No sessions found for nodes")
return
}
allSessionIDsList := make([]string, 0, len(sessIDs))
for sessionID := range sessIDs {
allSessionIDsList = append(allSessionIDsList, sessionID)
}
sessionMap := sm.getSessionData(allSessionIDsList)
sm.updateCache(sessionMap, nil)
duration := time.Since(startTime)
sm.log.Debug(sm.ctx, "Session processing cycle completed in %v. Processed %d sessions", duration, len(sm.cache))
}
func (sm *sessionManagerImpl) getAllRecentlyUpdatedSessions() (map[string]struct{}, error) {
var (
cursor uint64
allIDs = make(map[string]struct{})
batchIDs []string
err error
)
for {
batchIDs, cursor, err = sm.client.SScan(RecentlyUpdatedSessions, cursor, "*", 1000).Result()
if err != nil {
sm.log.Debug(sm.ctx, "Error scanning updated session IDs: %v", err)
return nil, err
}
for _, id := range batchIDs {
allIDs[id] = struct{}{}
}
if cursor == 0 {
break
}
}
if len(allIDs) == 0 {
sm.log.Debug(sm.ctx, "No updated session IDs found")
return nil, nil
}
var sessionIDsSlice []interface{}
for id := range allIDs {
sessionIDsSlice = append(sessionIDsSlice, id)
}
removed := sm.client.SRem(RecentlyUpdatedSessions, sessionIDsSlice...).Val()
sm.log.Debug(sm.ctx, "Fetched and removed %d session IDs from updated_session_set", removed)
return allIDs, nil
}
func (sm *sessionManagerImpl) updateSessions() {
startTime := time.Now()
sm.log.Debug(sm.ctx, "Starting session processing cycle")
sessIDs, err := sm.getOnlineSessionIDs()
if err != nil {
sm.log.Debug(sm.ctx, "Error getting online session IDs: %v", err)
return
}
updatedSessIDs, err := sm.getAllRecentlyUpdatedSessions()
if err != nil {
sm.log.Debug(sm.ctx, "Error getting recently updated sessions: %v", err)
return
}
sm.mutex.RLock()
toAdd := make([]string, 0, len(updatedSessIDs))
for sessID, _ := range sessIDs {
if _, exists := sm.cache[sessID]; !exists {
updatedSessIDs[sessID] = struct{}{} // Add to updated sessions if not in cache
}
}
for sessID, _ := range updatedSessIDs {
toAdd = append(toAdd, sessID)
}
toRemove := make([]string, 0, len(sessIDs)/16)
for sessID, _ := range sm.cache {
if _, exists := sessIDs[sessID]; !exists {
toRemove = append(toRemove, sessID)
}
}
sm.mutex.RUnlock()
// Load full session data from Redis
newCache := sm.getSessionData(toAdd)
sm.updateCache(newCache, toRemove)
duration := time.Since(startTime)
sm.log.Debug(sm.ctx, "Session processing cycle completed in %v. Processed %d sessions", duration, len(sm.cache))
}
func (sm *sessionManagerImpl) GetByID(projectID, sessionID string) (*SessionData, error) {
if sessionID == "" {
return nil, fmt.Errorf("session ID is required")
}
sm.mutex.RLock()
defer sm.mutex.RUnlock()
sessionData, exists := sm.cache[sessionID]
if !exists {
return nil, fmt.Errorf("session not found")
}
if sessionData.ProjectID != projectID {
return nil, fmt.Errorf("session does not belong to the project")
}
return sessionData, nil
}
func (sm *sessionManagerImpl) GetAll(projectID string, filters []*Filter, sort SortOrder, page, limit int) ([]*SessionData, error) {
if page < 1 || limit < 1 {
page, limit = 1, 10 // Set default values
}
sm.mutex.RLock()
defer sm.mutex.RUnlock()
filtered := make([]*SessionData, 0, limit)
for _, session := range sm.sorted {
if session.ProjectID != projectID {
continue
}
if matchesFilters(session, filters) {
filtered = append(filtered, session)
}
}
start := (page - 1) * limit
end := start + limit
if start > len(filtered) {
return []*SessionData{}, nil
}
if end > len(filtered) {
end = len(filtered)
}
return filtered[start:end], nil
}
func matchesFilters(session *SessionData, filters []*Filter) bool {
for _, filter := range filters {
var value string
switch filter.Type {
case UserID:
if session.UserID != nil {
value = *session.UserID
}
case UserAnonymousID:
if session.UserUUID != nil {
value = *session.UserUUID
}
case UserOS:
if session.UserOS != nil {
value = *session.UserOS
}
case UserBrowser:
if session.UserBrowser != nil {
value = *session.UserBrowser
}
case UserDevice:
if session.UserDevice != nil {
value = *session.UserDevice
}
case UserPlatform:
if session.UserPlatform != nil {
value = *session.UserPlatform
}
case UserCountry:
if session.UserCountry != nil {
value = *session.UserCountry
}
case UserState:
if session.UserState != nil {
value = *session.UserState
}
case UserCity:
if session.UserCity != nil {
value = *session.UserCity
}
case Metadata:
if session.Metadata != nil {
value = (*session.Metadata)[filter.Source]
}
default:
return false // Unknown filter type
}
for _, filterValue := range filter.Value {
if filter.Operator == Is && value == filterValue {
return true
} else if filter.Operator == Contains && strings.Contains(strings.ToLower(value), strings.ToLower(filterValue)) {
return true
}
}
}
return false
}
func (sm *sessionManagerImpl) Autocomplete(projectID string, key FilterType, value string) ([]string, error) {
matches := make(map[string]struct{}) // To ensure uniqueness
lowerValue := strings.ToLower(value)
sm.mutex.RLock()
defer sm.mutex.RUnlock()
for _, session := range sm.sorted {
if session.ProjectID != projectID {
continue
}
var fieldValue string
switch key {
case UserID:
if session.UserID != nil {
fieldValue = *session.UserID
}
case UserAnonymousID:
if session.UserUUID != nil {
fieldValue = *session.UserUUID
}
case UserOS:
if session.UserOS != nil {
fieldValue = *session.UserOS
}
case UserBrowser:
if session.UserBrowser != nil {
fieldValue = *session.UserBrowser
}
case UserDevice:
if session.UserDevice != nil {
fieldValue = *session.UserDevice
}
case UserState:
if session.UserState != nil {
fieldValue = *session.UserState
}
case UserCity:
if session.UserCity != nil {
fieldValue = *session.UserCity
}
case Metadata:
if session.Metadata != nil {
if v, ok := (*session.Metadata)[string(key)]; ok {
fieldValue = v
}
}
default:
return nil, fmt.Errorf("unknown filter type: %s", key)
}
if fieldValue != "" && strings.Contains(strings.ToLower(fieldValue), lowerValue) {
matches[fieldValue] = struct{}{}
}
}
results := make([]string, 0, len(matches))
for k := range matches {
results = append(results, k)
}
return results, nil
}

View file

@ -0,0 +1,37 @@
package sessionmanager
type SortOrder bool
const (
Asc SortOrder = true
Desc SortOrder = false
)
type FilterType string
const (
UserID FilterType = "userId"
UserAnonymousID = "userAnonymousId"
UserOS = "userOs"
UserBrowser = "userBrowser"
UserDevice = "userDevice"
UserPlatform = "platform"
UserCountry = "userCountry"
UserState = "userState"
UserCity = "userCity"
Metadata = "metadata"
)
type FilterOperator bool
const (
Is FilterOperator = true
Contains FilterOperator = false
)
type Filter struct {
Type FilterType
Value []string
Operator FilterOperator
Source string // for metadata only
}

View file

@ -0,0 +1,23 @@
# Patterns to ignore when building packages.
# This supports shell glob matching, relative path matching, and
# negation (prefixed with !). Only one pattern per line.
.DS_Store
# Common VCS dirs
.git/
.gitignore
.bzr/
.bzrignore
.hg/
.hgignore
.svn/
# Common backup files
*.swp
*.bak
*.tmp
*.orig
*~
# Various IDEs
.project
.idea/
*.tmproj
.vscode/

View file

@ -0,0 +1,24 @@
apiVersion: v2
name: assist-api
description: A Helm chart for Kubernetes
# A chart can be either an 'application' or a 'library' chart.
#
# Application charts are a collection of templates that can be packaged into versioned archives
# to be deployed.
#
# Library charts provide useful utilities or functions for the chart developer. They're included as
# a dependency of application charts to inject those utilities and functions into the rassist-apiing
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.1.1
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
AppVersion: "v1.22.0"

View file

@ -0,0 +1,22 @@
1. Get the application URL by running these commands:
{{- if .Values.ingress.enabled }}
{{- range $host := .Values.ingress.hosts }}
{{- range .paths }}
http{{ if $.Values.ingress.tls }}s{{ end }}://{{ $host.host }}{{ .path }}
{{- end }}
{{- end }}
{{- else if contains "NodePort" .Values.service.type }}
export NODE_PORT=$(kubectl get --namespace {{ .Release.Namespace }} -o jsonpath="{.spec.ports[0].nodePort}" services {{ include "assist-api.fullname" . }})
export NODE_IP=$(kubectl get nodes --namespace {{ .Release.Namespace }} -o jsonpath="{.items[0].status.addresses[0].address}")
echo http://$NODE_IP:$NODE_PORT
{{- else if contains "LoadBalancer" .Values.service.type }}
NOTE: It may take a few minutes for the LoadBalancer IP to be available.
You can watch the status of by running 'kubectl get --namespace {{ .Release.Namespace }} svc -w {{ include "assist-api.fullname" . }}'
export SERVICE_IP=$(kubectl get svc --namespace {{ .Release.Namespace }} {{ include "assist-api.fullname" . }} --template "{{"{{ range (index .status.loadBalancer.ingress 0) }}{{.}}{{ end }}"}}")
echo http://$SERVICE_IP:{{ .Values.service.port }}
{{- else if contains "ClusterIP" .Values.service.type }}
export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "assist-api.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}")
export CONTAINER_PORT=$(kubectl get pod --namespace {{ .Release.Namespace }} $POD_NAME -o jsonpath="{.spec.containers[0].ports[0].containerPort}")
echo "Visit http://127.0.0.1:8080 to use your application"
kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8080:$CONTAINER_PORT
{{- end }}

View file

@ -0,0 +1,65 @@
{{/*
Expand the name of the chart.
*/}}
{{- define "assist-api.name" -}}
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }}
{{- end }}
{{/*
Create a default fully qualified app name.
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
If release name contains chart name it will be used as a full name.
*/}}
{{- define "assist-api.fullname" -}}
{{- if .Values.fullnameOverride }}
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }}
{{- else }}
{{- $name := default .Chart.Name .Values.nameOverride }}
{{- if contains $name .Release.Name }}
{{- .Release.Name | trunc 63 | trimSuffix "-" }}
{{- else }}
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }}
{{- end }}
{{- end }}
{{- end }}
{{/*
Create chart name and version as used by the chart label.
*/}}
{{- define "assist-api.chart" -}}
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }}
{{- end }}
{{/*
Common labels
*/}}
{{- define "assist-api.labels" -}}
helm.sh/chart: {{ include "assist-api.chart" . }}
{{ include "assist-api.selectorLabels" . }}
{{- if .Chart.AppVersion }}
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
{{- end }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
{{- if .Values.global.appLabels }}
{{- .Values.global.appLabels | toYaml | nindent 0}}
{{- end}}
{{- end }}
{{/*
Selector labels
*/}}
{{- define "assist-api.selectorLabels" -}}
app.kubernetes.io/name: {{ include "assist-api.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- end }}
{{/*
Create the name of the service account to use
*/}}
{{- define "assist-api.serviceAccountName" -}}
{{- if .Values.serviceAccount.create }}
{{- default (include "assist-api.fullname" .) .Values.serviceAccount.name }}
{{- else }}
{{- default "default" .Values.serviceAccount.name }}
{{- end }}
{{- end }}

View file

@ -0,0 +1,101 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "assist-api.fullname" . }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "assist-api.labels" . | nindent 4 }}
spec:
{{- if not .Values.autoscaling.enabled }}
replicas: {{ .Values.replicaCount }}
{{- end }}
selector:
matchLabels:
{{- include "assist-api.selectorLabels" . | nindent 6 }}
template:
metadata:
{{- with .Values.podAnnotations }}
annotations:
{{- toYaml . | nindent 8 }}
{{- end }}
labels:
{{- include "assist-api.selectorLabels" . | nindent 8 }}
spec:
{{- with .Values.imagePullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}
serviceAccountName: {{ include "assist-api.serviceAccountName" . }}
securityContext:
{{- toYaml .Values.podSecurityContext | nindent 8 }}
shareProcessNamespace: true
containers:
- name: {{ .Chart.Name }}
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
{{- if .Values.global.enterpriseEditionLicense }}
image: "{{ tpl .Values.image.repository . }}:{{ .Values.image.tag | default .Chart.AppVersion }}-ee"
{{- else }}
image: "{{ tpl .Values.image.repository . }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
{{- end }}
imagePullPolicy: {{ .Values.image.pullPolicy }}
{{- if .Values.healthCheck}}
{{- .Values.healthCheck | toYaml | nindent 10}}
{{- end}}
env:
- name: LICENSE_KEY
value: '{{ .Values.global.enterpriseEditionLicense }}'
- name: KAFKA_SERVERS
value: '{{ .Values.global.kafka.kafkaHost }}:{{ .Values.global.kafka.kafkaPort }}'
- name: KAFKA_USE_SSL
value: '{{ .Values.global.kafka.kafkaUseSsl }}'
- name: pg_password
{{- if .Values.global.postgresql.existingSecret }}
valueFrom:
secretKeyRef:
name: {{ .Values.global.postgresql.existingSecret }}
key: postgresql-postgres-password
{{- else }}
value: '{{ .Values.global.postgresql.postgresqlPassword }}'
{{- end}}
- name: POSTGRES_STRING
value: 'postgres://{{ .Values.global.postgresql.postgresqlUser }}:$(pg_password)@{{ .Values.global.postgresql.postgresqlHost }}:{{ .Values.global.postgresql.postgresqlPort }}/{{ .Values.global.postgresql.postgresqlDatabase }}'
{{- include "openreplay.env.redis_string" .Values.global.redis | nindent 12 }}
{{- range $key, $val := .Values.global.env }}
- name: {{ $key }}
value: '{{ $val }}'
{{- end }}
{{- range $key, $val := .Values.env }}
- name: {{ $key }}
value: '{{ $val }}'
{{- end}}
ports:
{{- range $key, $val := .Values.service.ports }}
- name: {{ $key }}
containerPort: {{ $val }}
protocol: TCP
{{- end }}
volumeMounts:
{{- include "openreplay.volume.redis_ca_certificate.mount" .Values.global.redis | nindent 12 }}
{{- with .Values.persistence.mounts }}
{{- toYaml . | nindent 12 }}
{{- end }}
resources:
{{- toYaml .Values.resources | nindent 12 }}
volumes:
{{- with .Values.persistence.volumes }}
{{- toYaml . | nindent 8 }}
{{- end }}
{{- include "openreplay.volume.redis_ca_certificate" .Values.global.redis | nindent 8 }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}

View file

@ -0,0 +1,33 @@
{{- if .Values.autoscaling.enabled }}
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: {{ include "assist-api.fullname" . }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "assist-api.labels" . | nindent 4 }}
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: {{ include "assist-api.fullname" . }}
minReplicas: {{ .Values.autoscaling.minReplicas }}
maxReplicas: {{ .Values.autoscaling.maxReplicas }}
metrics:
{{- if .Values.autoscaling.targetCPUUtilizationPercentage }}
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: {{ .Values.autoscaling.targetCPUUtilizationPercentage }}
{{- end }}
{{- if .Values.autoscaling.targetMemoryUtilizationPercentage }}
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: {{ .Values.autoscaling.targetMemoryUtilizationPercentage }}
{{- end }}
{{- end }}

View file

@ -0,0 +1,62 @@
{{- if .Values.ingress.enabled -}}
{{- $fullName := include "assist-api.fullname" . -}}
{{- $svcPort := .Values.service.ports.http -}}
{{- if and .Values.ingress.className (not (semverCompare ">=1.18-0" .Capabilities.KubeVersion.GitVersion)) }}
{{- if not (hasKey .Values.ingress.annotations "kubernetes.io/ingress.class") }}
{{- $_ := set .Values.ingress.annotations "kubernetes.io/ingress.class" .Values.ingress.className}}
{{- end }}
{{- end }}
{{- if semverCompare ">=1.19-0" .Capabilities.KubeVersion.GitVersion -}}
apiVersion: networking.k8s.io/v1
{{- else if semverCompare ">=1.14-0" .Capabilities.KubeVersion.GitVersion -}}
apiVersion: networking.k8s.io/v1beta1
{{- else -}}
apiVersion: extensions/v1beta1
{{- end }}
kind: Ingress
metadata:
name: {{ $fullName }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "assist-api.labels" . | nindent 4 }}
{{- with .Values.ingress.annotations }}
annotations:
{{- toYaml . | nindent 4 }}
{{- end }}
spec:
{{- if and .Values.ingress.className (semverCompare ">=1.18-0" .Capabilities.KubeVersion.GitVersion) }}
ingressClassName: {{ .Values.ingress.className }}
{{- end }}
{{- if .Values.ingress.tls }}
tls:
{{- range .Values.ingress.tls }}
- hosts:
{{- range .hosts }}
- {{ . | quote }}
{{- end }}
secretName: {{ .secretName }}
{{- end }}
{{- end }}
rules:
{{- range .Values.ingress.hosts }}
- host: {{ .host | quote }}
http:
paths:
{{- range .paths }}
- path: {{ .path }}
{{- if and .pathType (semverCompare ">=1.18-0" $.Capabilities.KubeVersion.GitVersion) }}
pathType: {{ .pathType }}
{{- end }}
backend:
{{- if semverCompare ">=1.19-0" $.Capabilities.KubeVersion.GitVersion }}
service:
name: {{ $fullName }}
port:
number: {{ $svcPort }}
{{- else }}
serviceName: {{ $fullName }}
servicePort: {{ $svcPort }}
{{- end }}
{{- end }}
{{- end }}
{{- end }}

View file

@ -0,0 +1,18 @@
apiVersion: v1
kind: Service
metadata:
name: {{ include "assist-api.fullname" . }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "assist-api.labels" . | nindent 4 }}
spec:
type: {{ .Values.service.type }}
ports:
{{- range $key, $val := .Values.service.ports }}
- port: {{ $val }}
targetPort: {{ $key }}
protocol: TCP
name: {{ $key }}
{{- end}}
selector:
{{- include "assist-api.selectorLabels" . | nindent 4 }}

View file

@ -0,0 +1,18 @@
{{- if and ( .Capabilities.APIVersions.Has "monitoring.coreos.com/v1" ) ( .Values.serviceMonitor.enabled ) }}
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: {{ include "assist-api.fullname" . }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "assist-api.labels" . | nindent 4 }}
{{- if .Values.serviceMonitor.additionalLabels }}
{{- toYaml .Values.serviceMonitor.additionalLabels | nindent 4 }}
{{- end }}
spec:
endpoints:
{{- .Values.serviceMonitor.scrapeConfigs | toYaml | nindent 4 }}
selector:
matchLabels:
{{- include "assist-api.selectorLabels" . | nindent 6 }}
{{- end }}

View file

@ -0,0 +1,13 @@
{{- if .Values.serviceAccount.create -}}
apiVersion: v1
kind: ServiceAccount
metadata:
name: {{ include "assist-api.serviceAccountName" . }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "assist-api.labels" . | nindent 4 }}
{{- with .Values.serviceAccount.annotations }}
annotations:
{{- toYaml . | nindent 4 }}
{{- end }}
{{- end }}

View file

@ -0,0 +1,15 @@
apiVersion: v1
kind: Pod
metadata:
name: "{{ include "assist-api.fullname" . }}-test-connection"
labels:
{{- include "assist-api.labels" . | nindent 4 }}
annotations:
"helm.sh/hook": test
spec:
containers:
- name: wget
image: busybox
command: ['wget']
args: ['{{ include "assist-api.fullname" . }}:{{ .Values.service.port }}']
restartPolicy: Never

View file

@ -0,0 +1,119 @@
# Default values for openreplay.
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.
replicaCount: 1
image:
repository: "{{ .Values.global.openReplayContainerRegistry }}/assist-api"
pullPolicy: IfNotPresent
# Overrides the image tag whose default is the chart appVersion.
tag: ""
imagePullSecrets: []
nameOverride: "assist-api"
fullnameOverride: "assist-api-openreplay"
serviceAccount:
# Specifies whether a service account should be created
create: true
# Annotations to add to the service account
annotations: {}
# The name of the service account to use.
# If not set and create is true, a name is generated using the fullname template
name: ""
podAnnotations: {}
securityContext:
runAsUser: 1001
runAsGroup: 1001
podSecurityContext:
runAsUser: 1001
runAsGroup: 1001
fsGroup: 1001
fsGroupChangePolicy: "OnRootMismatch"
# podSecurityContext: {}
# fsGroup: 2000
# securityContext: {}
# capabilities:
# drop:
# - ALL
# readOnlyRootFilesystem: true
# runAsNonRoot: true
# runAsUser: 1000
service:
type: ClusterIP
ports:
http: 9000
metrics: 8888
serviceMonitor:
enabled: true
additionalLabels:
release: observability
scrapeConfigs:
- port: metrics
honorLabels: true
interval: 15s
path: /metrics
scheme: http
scrapeTimeout: 10s
ingress:
enabled: false
className: ""
annotations: {}
# kubernetes.io/ingress.class: nginx
# kubernetes.io/tls-acme: "true"
hosts:
- host: chart-example.local
paths:
- path: /
pathType: ImplementationSpecific
tls: []
# - secretName: chart-example-tls
# hosts:
# - chart-example.local
resources: {}
# We usually recommend not to specify default resources and to leave this as a conscious
# choice for the user. This also increases chances charts run on environments with little
# resources, such as Minikube. If you do want to specify resources, uncomment the following
# lines, adjust them as necessary, and remove the curly braces after 'resources:'.
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
autoscaling:
enabled: false
minReplicas: 1
maxReplicas: 5
targetCPUUtilizationPercentage: 80
# targetMemoryUtilizationPercentage: 80
env: {}
nodeSelector: {}
tolerations: []
affinity: {}
persistence: {}
# # Spec of spec.template.spec.containers[*].volumeMounts
# mounts:
# - name: kafka-ssl
# mountPath: /opt/kafka/ssl
# # Spec of spec.template.spec.volumes
# volumes:
# - name: kafka-ssl
# secret:
# secretName: kafka-ssl