* refactor(chalice): upgraded dependencies

* refactor(chalice): upgraded dependencies
feat(chalice): support heatmaps

* fix(chalice): fixed Math-operators validation
refactor(chalice): search for sessions that have events for heatmaps

* refactor(chalice): search for sessions that have at least 1 location event for heatmaps

* refactor(chalice): upgraded dependencies

* refactor(chalice): upgraded dependencies
feat(chalice): support heatmaps

* fix(chalice): fixed Math-operators validation
refactor(chalice): search for sessions that have events for heatmaps

* refactor(chalice): search for sessions that have at least 1 location event for heatmaps

* refactor(chalice): upgraded dependencies
refactor(crons): upgraded dependencies
refactor(alerts): upgraded dependencies

* feat(chalice): get top 10 values for autocomplete CH

* refactor(chalice): cleaned code
refactor(chalice): upgraded dependencies
refactor(alerts): upgraded dependencies
refactor(crons): upgraded dependencies

* feat(chalice): autocomplete return top 10 with stats

* fix(chalice): fixed autocomplete top 10 meta-filters

* feat(chalice): cache autocomplete-top-10 responses
feat(DB): support Spot login

* fix(chalice): fixed Spot new refresh token
This commit is contained in:
Kraiem Taha Yassine 2024-08-07 13:42:15 +02:00 committed by GitHub
parent 8bf640d753
commit 6f0794b196
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 245 additions and 78 deletions

View file

@ -114,23 +114,23 @@ class JWTAuth(HTTPBearer):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid authorization code for refresh.")
async def __process_spot_refresh_call(self, request: Request) -> schemas.CurrentContext:
if "refreshToken" not in request.cookies:
logger.warning("Missing sopt-refreshToken cookie.")
if "spotRefreshToken" not in request.cookies:
logger.warning("Missing soptRefreshToken cookie.")
jwt_payload = None
else:
jwt_payload = authorizers.jwt_refresh_authorizer(scheme="Bearer", token=request.cookies["refreshToken"])
jwt_payload = authorizers.jwt_refresh_authorizer(scheme="Bearer", token=request.cookies["spotRefreshToken"])
if jwt_payload is None or jwt_payload.get("jti") is None:
logger.warning("Null spot-refreshToken's payload, or null JTI.")
logger.warning("Null spotRefreshToken's payload, or null JTI.")
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid spot-refresh-token or expired refresh-token.")
detail="Invalid spotRefreshToken or expired refresh-token.")
auth_exists = spot.refresh_auth_exists(user_id=jwt_payload.get("userId", -1),
jwt_jti=jwt_payload["jti"])
if not auth_exists:
logger.warning("spot-refreshToken's user not found.")
logger.warning("spotRefreshToken's user not found.")
logger.warning(jwt_payload)
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid spot-refresh-token or expired refresh-token.")
detail="Invalid spotRefreshToken or expired refresh-token.")
credentials: HTTPAuthorizationCredentials = await super(JWTAuth, self).__call__(request)
if credentials:

View file

@ -4,6 +4,7 @@ from chalicelib.core import countries, events, metadata
from chalicelib.utils import helper
from chalicelib.utils import pg_client
from chalicelib.utils.event_filter_definition import Event
from chalicelib.utils.or_cache import CachedResponse
logger = logging.getLogger(__name__)
TABLE = "public.autocomplete"
@ -375,6 +376,7 @@ def is_top_supported(event_type):
return TYPE_TO_COLUMN.get(event_type, False)
@CachedResponse(table="or_cache.autocomplete_top_values", ttl=5 * 60)
def get_top_values(project_id, event_type, event_key=None):
with pg_client.PostgresClient() as cur:
if schemas.FilterType.has_value(event_type):

View file

@ -1,28 +1,11 @@
from decouple import config
from chalicelib.core import authorizers, users
from chalicelib.utils import helper
from chalicelib.utils import pg_client
AUDIENCE = "spot:OpenReplay"
def change_spot_jwt_iat_jti(user_id):
with pg_client.PostgresClient() as cur:
query = cur.mogrify(f"""UPDATE public.users
SET spot_jwt_iat = timezone('utc'::text, now()-INTERVAL '10s'),
spot_jwt_refresh_jti = 0,
spot_jwt_refresh_iat = timezone('utc'::text, now()-INTERVAL '10s')
WHERE user_id = %(user_id)s
RETURNING EXTRACT (epoch FROM spot_jwt_iat)::BIGINT AS spot_jwt_iat,
spot_jwt_refresh_jti,
EXTRACT (epoch FROM spot_jwt_refresh_iat)::BIGINT AS spot_jwt_refresh_iat;""",
{"user_id": user_id})
cur.execute(query)
row = cur.fetchone()
return row.get("spot_jwt_iat"), row.get("spot_jwt_refresh_jti"), row.get("spot_jwt_refresh_iat")
def refresh_spot_jwt_iat_jti(user_id):
with pg_client.PostgresClient() as cur:
query = cur.mogrify(f"""UPDATE public.users
@ -38,40 +21,6 @@ def refresh_spot_jwt_iat_jti(user_id):
return row.get("spot_jwt_iat"), row.get("spot_jwt_refresh_jti"), row.get("spot_jwt_refresh_iat")
def authenticate(email, password) -> dict | None:
with pg_client.PostgresClient() as cur:
query = cur.mogrify(
f"""SELECT
users.user_id,
1 AS tenant_id,
users.name,
users.email
FROM public.users INNER JOIN public.basic_authentication USING(user_id)
WHERE users.email = %(email)s
AND basic_authentication.password = crypt(%(password)s, basic_authentication.password)
AND basic_authentication.user_id = (SELECT su.user_id FROM public.users AS su WHERE su.email=%(email)s AND su.deleted_at IS NULL LIMIT 1)
LIMIT 1;""",
{"email": email, "password": password})
cur.execute(query)
r = cur.fetchone()
if r is not None:
r = helper.dict_to_camel_case(r)
spot_jwt_iat, spot_jwt_r_jti, spot_jwt_r_iat = change_spot_jwt_iat_jti(user_id=r['userId'])
return {
"jwt": authorizers.generate_jwt(user_id=r['userId'], tenant_id=r['tenantId'], iat=spot_jwt_iat,
aud=AUDIENCE),
"refreshToken": authorizers.generate_jwt_refresh(user_id=r['userId'], tenant_id=r['tenantId'],
iat=spot_jwt_r_iat, aud=AUDIENCE,
jwt_jti=spot_jwt_r_jti),
"refreshTokenMaxAge": config("JWT_REFRESH_EXPIRATION", cast=int),
"email": email,
**r
}
return None
def logout(user_id: int):
users.logout(user_id=user_id)
@ -83,7 +32,7 @@ def refresh(user_id: int, tenant_id: int = -1) -> dict:
aud=AUDIENCE),
"refreshToken": authorizers.generate_jwt_refresh(user_id=user_id, tenant_id=tenant_id, iat=spot_jwt_r_iat,
aud=AUDIENCE, jwt_jti=spot_jwt_r_jti),
"refreshTokenMaxAge": config("JWT_REFRESH_EXPIRATION", cast=int) - (spot_jwt_iat - spot_jwt_r_iat)
"refreshTokenMaxAge": config("JWT_SPOT_REFRESH_EXPIRATION", cast=int) - (spot_jwt_iat - spot_jwt_r_iat)
}

View file

@ -653,7 +653,7 @@ def authenticate(email, password, for_change_password=False, include_spot=False)
iat=j_r.spot_jwt_refresh_iat,
aud=spot.AUDIENCE,
jwt_jti=j_r.spot_jwt_refresh_jti),
"spotRefreshTokenMaxAge": config("JWT_REFRESH_EXPIRATION", cast=int)
"spotRefreshTokenMaxAge": config("JWT_SPOT_REFRESH_EXPIRATION", cast=int)
}
return response
return None

View file

@ -0,0 +1 @@
from .or_cache import CachedResponse

View file

@ -0,0 +1,83 @@
import functools
import inspect
import json
import logging
from chalicelib.utils import pg_client
import time
from fastapi.encoders import jsonable_encoder
logger = logging.getLogger(__name__)
class CachedResponse:
def __init__(self, table, ttl):
self.table = table
self.ttl = ttl
def __call__(self, func):
self.param_names = {i: param for i, param in enumerate(inspect.signature(func).parameters)}
@functools.wraps(func)
def wrapper(*args, **kwargs):
values = dict()
for i, param in self.param_names.items():
if i < len(args):
values[param] = args[i]
elif param in kwargs:
values[param] = kwargs[param]
else:
values[param] = None
result = self.__get(values)
if result is None or result["expired"] \
or result["result"] is None or len(result["result"]) == 0:
now = time.time()
result = func(*args, **kwargs)
now = time.time() - now
if result is not None and len(result) > 0:
self.__add(values, result, now)
result[0]["cached"] = False
else:
logger.info(f"using cached response for "
f"{func.__name__}({','.join([f'{key}={val}' for key, val in enumerate(values)])})")
result = result["result"]
result[0]["cached"] = True
return result
return wrapper
def __get(self, values):
with pg_client.PostgresClient() as cur:
sub_constraints = []
for key, value in values.items():
if value is not None:
sub_constraints.append(f"{key}=%({key})s")
else:
sub_constraints.append(f"{key} IS NULL")
query = f"""SELECT result,
(%(ttl)s>0
AND EXTRACT(EPOCH FROM (timezone('utc'::text, now()) - created_at - INTERVAL %(interval)s)) > 0) AS expired
FROM {self.table}
WHERE {" AND ".join(sub_constraints)}"""
query = cur.mogrify(query, {**values, 'ttl': self.ttl, 'interval': f'{self.ttl} seconds'})
logger.debug("------")
logger.debug(query)
logger.debug("------")
cur.execute(query)
result = cur.fetchone()
return result
def __add(self, values, result, execution_time):
with pg_client.PostgresClient() as cur:
query = f"""INSERT INTO {self.table} ({",".join(values.keys())},result,execution_time)
VALUES ({",".join([f"%({param})s" for param in values.keys()])},%(result)s,%(execution_time)s)
ON CONFLICT ({",".join(values.keys())}) DO UPDATE SET result=%(result)s,
execution_time=%(execution_time)s,
created_at=timezone('utc'::text, now());"""
query = cur.mogrify(query, {**values,
"result": json.dumps(jsonable_encoder(result)),
"execution_time": execution_time})
logger.debug("------")
logger.debug(query)
logger.debug("------")
cur.execute(query)

View file

@ -30,6 +30,8 @@ JWT_EXPIRATION=86400
JWT_ISSUER=openreplay-oss
JWT_REFRESH_EXPIRATION=604800
JWT_REFRESH_SECRET="SET A RANDOM STRING HERE"
JWT_SPOT_REFRESH_EXPIRATION=604800
JWT_SPOT_REFRESH_SECRET="SET A RANDOM STRING HERE"
jwt_secret="SET A RANDOM STRING HERE"
pg_dbname=postgres
pg_host=postgresql.db.svc.cluster.local

View file

@ -31,6 +31,8 @@ JWT_EXPIRATION=6000
JWT_ISSUER=openReplay-dev
JWT_REFRESH_EXPIRATION=604800
JWT_REFRESH_SECRET=SECRET2
JWT_SPOT_REFRESH_EXPIRATION=604800
JWT_SPOT_REFRESH_SECRET=SECRET3
jwt_secret=SECRET
LOCAL_DEV=true
LOGLEVEL=INFO

View file

@ -29,9 +29,9 @@ def events_search(projectId: int, q: Optional[str] = None,
context: schemas.CurrentContext = Depends(OR_context)):
if type and (not q or len(q) == 0) \
and (autocomplete.is_top_supported(type)):
# TODO: check if type is a valid value for autocomplete
return autocomplete.get_top_values(project_id=projectId, event_type=type, event_key=key)
elif (not q or len(q) == 0):
# return autocomplete.get_top_values(project_id=projectId, event_type=type, event_key=key)
return autocomplete.get_top_values(projectId, type, event_key=key)
elif not q or len(q) == 0:
return {"data": []}
if live:

View file

@ -1,11 +1,8 @@
from fastapi import Body, Depends
from fastapi import HTTPException, status
from fastapi import Depends
from starlette.responses import JSONResponse, Response
import schemas
from chalicelib.core import spot, webhook
from chalicelib.utils import captcha
from chalicelib.utils import helper
from or_dependencies import OR_context
from routers.base import get_routers

1
ee/api/.gitignore vendored
View file

@ -274,3 +274,4 @@ Pipfile.lock
/NOTES.md
/chalicelib/core/db_request_handler.py
/routers/subs/spot.py
/chalicelib/utils/or_cache/

View file

@ -121,23 +121,23 @@ class JWTAuth(HTTPBearer):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid authorization code for refresh.")
async def __process_spot_refresh_call(self, request: Request) -> schemas.CurrentContext:
if "refreshToken" not in request.cookies:
logger.warning("Missing sopt-refreshToken cookie.")
if "spotRefreshToken" not in request.cookies:
logger.warning("Missing soptRefreshToken cookie.")
jwt_payload = None
else:
jwt_payload = authorizers.jwt_refresh_authorizer(scheme="Bearer", token=request.cookies["refreshToken"])
jwt_payload = authorizers.jwt_refresh_authorizer(scheme="Bearer", token=request.cookies["spotRefreshToken"])
if jwt_payload is None or jwt_payload.get("jti") is None:
logger.warning("Null spot-refreshToken's payload, or null JTI.")
logger.warning("Null spotRefreshToken's payload, or null JTI.")
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid spot-refresh-token or expired refresh-token.")
detail="Invalid spotRefreshToken or expired refresh-token.")
auth_exists = spot.refresh_auth_exists(user_id=jwt_payload.get("userId", -1),
jwt_jti=jwt_payload["jti"])
if not auth_exists:
logger.warning("spot-refreshToken's user not found.")
logger.warning("spotRefreshToken's user not found.")
logger.warning(jwt_payload)
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid spot-refresh-token or expired refresh-token.")
detail="Invalid spotRefreshToken or expired refresh-token.")
credentials: HTTPAuthorizationCredentials = await super(JWTAuth, self).__call__(request)
if credentials:

View file

@ -3,6 +3,7 @@ from chalicelib.core import countries, events, metadata
from chalicelib.utils import ch_client
from chalicelib.utils import helper, exp_ch_helper
from chalicelib.utils.event_filter_definition import Event
from chalicelib.utils.or_cache import CachedResponse
TABLE = "experimental.autocomplete"
@ -294,6 +295,7 @@ def is_top_supported(event_type):
return TYPE_TO_COLUMN.get(event_type, False)
@CachedResponse(table="or_cache.autocomplete_top_values", ttl=5 * 60)
def get_top_values(project_id, event_type, event_key=None):
with ch_client.ClickHouseClient() as cur:
if schemas.FilterType.has_value(event_type):

View file

@ -765,7 +765,7 @@ def authenticate(email, password, for_change_password=False, include_spot=False)
iat=j_r.spot_jwt_refresh_iat,
aud=spot.AUDIENCE,
jwt_jti=j_r.spot_jwt_refresh_jti),
"spotRefreshTokenMaxAge": config("JWT_REFRESH_EXPIRATION", cast=int),
"spotRefreshTokenMaxAge": config("JWT_SPOT_REFRESH_EXPIRATION", cast=int),
}
return response
if config("enforce_SSO", cast=bool, default=False) and helper.is_saml2_available():
@ -905,7 +905,7 @@ def authenticate_sso(email: str, internal_id: str, include_spot: bool = False):
iat=j_r.spot_jwt_refresh_iat,
aud=spot.AUDIENCE,
jwt_jti=j_r.spot_jwt_refresh_jti),
"spotRefreshTokenMaxAge": config("JWT_REFRESH_EXPIRATION", cast=int)
"spotRefreshTokenMaxAge": config("JWT_SPOT_REFRESH_EXPIRATION", cast=int)
}
return response
logger.warning(f"SSO user not found with email: {email} and internal_id: {internal_id}")

View file

@ -94,4 +94,5 @@ rm -rf ./orpy.py
rm -rf ./chalicelib/core/usability_testing/
rm -rf ./chalicelib/core/db_request_handler.py
rm -rf ./chalicelib/core/db_request_handler.py
rm -rf ./routers/subs/spot.py
rm -rf ./routers/subs/spot.py
rm -rf ./chalicelib/utils/or_cache

View file

@ -48,6 +48,8 @@ JWT_EXPIRATION=86400
JWT_ISSUER=openreplay-oss
JWT_REFRESH_EXPIRATION=604800
JWT_REFRESH_SECRET="SET A RANDOM STRING HERE"
JWT_SPOT_REFRESH_EXPIRATION=604800
JWT_SPOT_REFRESH_SECRET="SET A RANDOM STRING HERE"
jwt_secret="SET A RANDOM STRING HERE"
KAFKA_SERVERS=kafka.db.svc.cluster.local:9092
KAFKA_USE_SSL=false

View file

@ -45,8 +45,10 @@ js_cache_bucket=
jwt_algorithm=HS512
JWT_EXPIRATION=6000
JWT_ISSUER=openReplay-dev
JWT_REFRESH_EXPIRATION=60
JWT_REFRESH_EXPIRATION=604800
JWT_REFRESH_SECRET=SECRET2
JWT_SPOT_REFRESH_EXPIRATION=604800
JWT_SPOT_REFRESH_SECRET=SECRET3
jwt_secret=SECRET
KAFKA_SERVERS=127.0.0.1:9092
KAFKA_USE_SSL=false

View file

@ -37,6 +37,18 @@ ALTER TABLE IF EXISTS public.users
ADD COLUMN IF NOT EXISTS spot_jwt_refresh_jti integer NULL DEFAULT NULL,
ADD COLUMN IF NOT EXISTS spot_jwt_refresh_iat timestamp without time zone NULL DEFAULT NULL;
CREATE SCHEMA IF NOT EXISTS or_cache;
CREATE TABLE IF NOT EXISTS or_cache.autocomplete_top_values
(
project_id integer NOT NULL REFERENCES public.projects (project_id) ON DELETE CASCADE,
event_type text NOT NULL,
event_key text NULL,
result jsonb NULL,
execution_time integer NULL,
created_at timestamp DEFAULT timezone('utc'::text, now()) NOT NULL,
UNIQUE (project_id, event_type, event_key)
);
COMMIT;
\elif :is_next

View file

@ -1305,4 +1305,17 @@ CREATE TABLE public.projects_conditions
filters jsonb NOT NULL DEFAULT '[]'::jsonb
);
CREATE TABLE or_cache.autocomplete_top_values
(
project_id integer NOT NULL REFERENCES public.projects (project_id) ON DELETE CASCADE,
event_type text NOT NULL,
event_key text NULL,
result jsonb NULL,
execution_time integer NULL,
created_at timestamp DEFAULT timezone('utc'::text, now()) NOT NULL,
UNIQUE (project_id, event_type, event_key)
-- TODO: use `UNIQUE NULLS NOT DISTINCT (project_id, event_type, event_key)`
-- when PG upgrade is validated by devops team
);
COMMIT;

View file

@ -0,0 +1,34 @@
\set previous_version 'v1.20.0-ee'
\set next_version 'v1.19.0-ee'
SELECT openreplay_version() AS current_version,
openreplay_version() = :'previous_version' AS valid_previous,
openreplay_version() = :'next_version' AS is_next
\gset
\if :valid_previous
\echo valid previous DB version :'previous_version', starting DB downgrade to :'next_version'
BEGIN;
SELECT format($fn_def$
CREATE OR REPLACE FUNCTION openreplay_version()
RETURNS text AS
$$
SELECT '%1$s'
$$ LANGUAGE sql IMMUTABLE;
$fn_def$, :'next_version')
\gexec
--
ALTER TABLE IF EXISTS public.users
DROP COLUMN IF EXISTS spot_jwt_iat,
DROP COLUMN IF EXISTS spot_jwt_refresh_jti,
DROP COLUMN IF EXISTS spot_jwt_refresh_iat;
DROP SCHEMA or_cache CASCADE;
COMMIT;
\elif :is_next
\echo new version detected :'next_version', nothing to do
\else
\warn skipping DB downgrade of :'next_version', expected previous version :'previous_version', found :'current_version'
\endif

View file

@ -25,3 +25,5 @@ sourcemaps_reader="http://sourcemapreader-openreplay:9000/sourcemaps/%s/sourcema
version_number="${COMMON_VERSION}"
CLUSTER_URL=""
POD_NAMESPACE=""
JWT_REFRESH_SECRET=${COMMON_JWT_REFRESH_SECRET}
JWT_SPOT_REFRESH_SECRET=${COMMON_JWT_REFRESH_SECRET}

View file

@ -1,6 +1,7 @@
COMMON_PROTOCOL="https"
COMMON_DOMAIN_NAME="change_me_domain"
COMMON_JWT_SECRET="change_me_jwt"
COMMON_JWT_REFRESH_SECRET="change_me_jwt_refresh"
COMMON_S3_KEY="change_me_s3_key"
COMMON_S3_SECRET="change_me_s3_secret"
COMMON_PG_PASSWORD="change_me_pg_password"

View file

@ -27,6 +27,18 @@ ALTER TABLE IF EXISTS public.users
ADD COLUMN IF NOT EXISTS spot_jwt_refresh_jti integer NULL DEFAULT NULL,
ADD COLUMN IF NOT EXISTS spot_jwt_refresh_iat timestamp without time zone NULL DEFAULT NULL;
CREATE SCHEMA IF NOT EXISTS or_cache;
CREATE TABLE IF NOT EXISTS or_cache.autocomplete_top_values
(
project_id integer NOT NULL REFERENCES public.projects (project_id) ON DELETE CASCADE,
event_type text NOT NULL,
event_key text NULL,
result jsonb NULL,
execution_time integer NULL,
created_at timestamp DEFAULT timezone('utc'::text, now()) NOT NULL,
UNIQUE (project_id, event_type, event_key)
);
COMMIT;
\elif :is_next

View file

@ -17,6 +17,7 @@ BEGIN;
CREATE SCHEMA IF NOT EXISTS events_common;
CREATE SCHEMA IF NOT EXISTS events;
CREATE SCHEMA IF NOT EXISTS events_ios;
CREATE SCHEMA IF NOT EXISTS or_cache;
CREATE EXTENSION IF NOT EXISTS pg_trgm;
CREATE EXTENSION IF NOT EXISTS pgcrypto;
@ -1190,4 +1191,18 @@ CREATE TABLE public.projects_conditions
filters jsonb NOT NULL DEFAULT '[]'::jsonb
);
CREATE TABLE or_cache.autocomplete_top_values
(
project_id integer NOT NULL REFERENCES public.projects (project_id) ON DELETE CASCADE,
event_type text NOT NULL,
event_key text NULL,
result jsonb NULL,
execution_time integer NULL,
created_at timestamp DEFAULT timezone('utc'::text, now()) NOT NULL,
UNIQUE (project_id, event_type, event_key)
-- TODO: use `UNIQUE NULLS NOT DISTINCT (project_id, event_type, event_key)`
-- when PG upgrade is validated by devops team
);
COMMIT;

View file

@ -0,0 +1,34 @@
\set previous_version 'v1.20.0'
\set next_version 'v1.19.0'
SELECT openreplay_version() AS current_version,
openreplay_version() = :'previous_version' AS valid_previous,
openreplay_version() = :'next_version' AS is_next
\gset
\if :valid_previous
\echo valid previous DB version :'previous_version', starting DB downgrade to :'next_version'
BEGIN;
SELECT format($fn_def$
CREATE OR REPLACE FUNCTION openreplay_version()
RETURNS text AS
$$
SELECT '%1$s'
$$ LANGUAGE sql IMMUTABLE;
$fn_def$, :'next_version')
\gexec
--
ALTER TABLE IF EXISTS public.users
DROP COLUMN IF EXISTS spot_jwt_iat,
DROP COLUMN IF EXISTS spot_jwt_refresh_jti,
DROP COLUMN IF EXISTS spot_jwt_refresh_iat;
DROP SCHEMA or_cache CASCADE;
COMMIT;
\elif :is_next
\echo new version detected :'next_version', nothing to do
\else
\warn skipping DB downgrade of :'next_version', expected previous version :'previous_version', found :'current_version'
\endif