refactor(chalice): removed top-values cache
refactor(DB): removed or_cache
This commit is contained in:
parent
edf6b2060e
commit
af8996b54a
13 changed files with 42 additions and 129 deletions
17
api/Pipfile
17
api/Pipfile
|
|
@ -4,25 +4,24 @@ verify_ssl = true
|
|||
name = "pypi"
|
||||
|
||||
[packages]
|
||||
urllib3 = "==2.3.0"
|
||||
urllib3 = "==2.4.0"
|
||||
requests = "==2.32.3"
|
||||
boto3 = "==1.37.21"
|
||||
boto3 = "==1.38.16"
|
||||
pyjwt = "==2.10.1"
|
||||
psycopg2-binary = "==2.9.10"
|
||||
psycopg = {extras = ["pool", "binary"], version = "==3.2.6"}
|
||||
clickhouse-connect = "==0.8.15"
|
||||
elasticsearch = "==8.17.2"
|
||||
psycopg = {extras = ["binary", "pool"], version = "==3.2.9"}
|
||||
clickhouse-connect = "==0.8.17"
|
||||
elasticsearch = "==9.0.1"
|
||||
jira = "==3.8.0"
|
||||
cachetools = "==5.5.2"
|
||||
fastapi = "==0.115.12"
|
||||
uvicorn = {extras = ["standard"], version = "==0.34.0"}
|
||||
uvicorn = {extras = ["standard"], version = "==0.34.2"}
|
||||
python-decouple = "==3.8"
|
||||
pydantic = {extras = ["email"], version = "==2.10.6"}
|
||||
pydantic = {extras = ["email"], version = "==2.11.4"}
|
||||
apscheduler = "==3.11.0"
|
||||
redis = "==5.2.1"
|
||||
redis = "==6.1.0"
|
||||
|
||||
[dev-packages]
|
||||
|
||||
[requires]
|
||||
python_version = "3.12"
|
||||
python_full_version = "3.12.8"
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ from chalicelib.core import countries, events, metadata
|
|||
from chalicelib.utils import helper
|
||||
from chalicelib.utils import pg_client
|
||||
from chalicelib.utils.event_filter_definition import Event
|
||||
from chalicelib.utils.or_cache import CachedResponse
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
TABLE = "public.autocomplete"
|
||||
|
|
@ -377,7 +376,6 @@ def is_top_supported(event_type):
|
|||
return TYPE_TO_COLUMN.get(event_type, False)
|
||||
|
||||
|
||||
@CachedResponse(table="or_cache.autocomplete_top_values", ttl=5 * 60)
|
||||
def get_top_values(project_id, event_type, event_key=None):
|
||||
with pg_client.PostgresClient() as cur:
|
||||
if schemas.FilterType.has_value(event_type):
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ from chalicelib.core import countries, events, metadata
|
|||
from chalicelib.utils import ch_client
|
||||
from chalicelib.utils import helper, exp_ch_helper
|
||||
from chalicelib.utils.event_filter_definition import Event
|
||||
from chalicelib.utils.or_cache import CachedResponse
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
TABLE = "experimental.autocomplete"
|
||||
|
|
@ -260,8 +259,9 @@ def __search_metadata(project_id, value, key=None, source=None):
|
|||
with ch_client.ClickHouseClient() as cur:
|
||||
query = cur.format(query=f"""SELECT DISTINCT ON(key, value) key, value, 'METADATA' AS TYPE
|
||||
FROM({" UNION ALL ".join(sub_from)}) AS all_metas
|
||||
LIMIT 5;""", parameters={"project_id": project_id, "value": helper.string_to_sql_like(value),
|
||||
"svalue": helper.string_to_sql_like("^" + value)})
|
||||
LIMIT 5;""",
|
||||
parameters={"project_id": project_id, "value": helper.string_to_sql_like(value),
|
||||
"svalue": helper.string_to_sql_like("^" + value)})
|
||||
results = cur.execute(query)
|
||||
return helper.list_to_camel_case(results)
|
||||
|
||||
|
|
@ -298,7 +298,6 @@ def is_top_supported(event_type):
|
|||
return TYPE_TO_COLUMN.get(event_type, False)
|
||||
|
||||
|
||||
@CachedResponse(table="or_cache.autocomplete_top_values", ttl=5 * 60)
|
||||
def get_top_values(project_id, event_type, event_key=None):
|
||||
with ch_client.ClickHouseClient() as cur:
|
||||
if schemas.FilterType.has_value(event_type):
|
||||
|
|
|
|||
|
|
@ -1 +0,0 @@
|
|||
from .or_cache import CachedResponse
|
||||
|
|
@ -1,83 +0,0 @@
|
|||
import functools
|
||||
import inspect
|
||||
import json
|
||||
import logging
|
||||
from chalicelib.utils import pg_client
|
||||
import time
|
||||
from fastapi.encoders import jsonable_encoder
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CachedResponse:
|
||||
def __init__(self, table, ttl):
|
||||
self.table = table
|
||||
self.ttl = ttl
|
||||
|
||||
def __call__(self, func):
|
||||
self.param_names = {i: param for i, param in enumerate(inspect.signature(func).parameters)}
|
||||
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
values = dict()
|
||||
for i, param in self.param_names.items():
|
||||
if i < len(args):
|
||||
values[param] = args[i]
|
||||
elif param in kwargs:
|
||||
values[param] = kwargs[param]
|
||||
else:
|
||||
values[param] = None
|
||||
result = self.__get(values)
|
||||
if result is None or result["expired"] \
|
||||
or result["result"] is None or len(result["result"]) == 0:
|
||||
now = time.time()
|
||||
result = func(*args, **kwargs)
|
||||
now = time.time() - now
|
||||
if result is not None and len(result) > 0:
|
||||
self.__add(values, result, now)
|
||||
result[0]["cached"] = False
|
||||
else:
|
||||
logger.info(f"using cached response for "
|
||||
f"{func.__name__}({','.join([f'{key}={val}' for key, val in enumerate(values)])})")
|
||||
result = result["result"]
|
||||
result[0]["cached"] = True
|
||||
|
||||
return result
|
||||
|
||||
return wrapper
|
||||
|
||||
def __get(self, values):
|
||||
with pg_client.PostgresClient() as cur:
|
||||
sub_constraints = []
|
||||
for key, value in values.items():
|
||||
if value is not None:
|
||||
sub_constraints.append(f"{key}=%({key})s")
|
||||
else:
|
||||
sub_constraints.append(f"{key} IS NULL")
|
||||
query = f"""SELECT result,
|
||||
(%(ttl)s>0
|
||||
AND EXTRACT(EPOCH FROM (timezone('utc'::text, now()) - created_at - INTERVAL %(interval)s)) > 0) AS expired
|
||||
FROM {self.table}
|
||||
WHERE {" AND ".join(sub_constraints)}"""
|
||||
query = cur.mogrify(query, {**values, 'ttl': self.ttl, 'interval': f'{self.ttl} seconds'})
|
||||
logger.debug("------")
|
||||
logger.debug(query)
|
||||
logger.debug("------")
|
||||
cur.execute(query)
|
||||
result = cur.fetchone()
|
||||
return result
|
||||
|
||||
def __add(self, values, result, execution_time):
|
||||
with pg_client.PostgresClient() as cur:
|
||||
query = f"""INSERT INTO {self.table} ({",".join(values.keys())},result,execution_time)
|
||||
VALUES ({",".join([f"%({param})s" for param in values.keys()])},%(result)s,%(execution_time)s)
|
||||
ON CONFLICT ({",".join(values.keys())}) DO UPDATE SET result=%(result)s,
|
||||
execution_time=%(execution_time)s,
|
||||
created_at=timezone('utc'::text, now());"""
|
||||
query = cur.mogrify(query, {**values,
|
||||
"result": json.dumps(jsonable_encoder(result)),
|
||||
"execution_time": execution_time})
|
||||
logger.debug("------")
|
||||
logger.debug(query)
|
||||
logger.debug("------")
|
||||
cur.execute(query)
|
||||
|
|
@ -1,9 +1,9 @@
|
|||
urllib3==2.4.0
|
||||
requests==2.32.3
|
||||
boto3==1.38.10
|
||||
boto3==1.38.16
|
||||
pyjwt==2.10.1
|
||||
psycopg2-binary==2.9.10
|
||||
psycopg[pool,binary]==3.2.7
|
||||
psycopg[pool,binary]==3.2.9
|
||||
clickhouse-connect==0.8.17
|
||||
elasticsearch==9.0.1
|
||||
jira==3.8.0
|
||||
|
|
|
|||
|
|
@ -1,9 +1,9 @@
|
|||
urllib3==2.4.0
|
||||
requests==2.32.3
|
||||
boto3==1.38.10
|
||||
boto3==1.38.16
|
||||
pyjwt==2.10.1
|
||||
psycopg2-binary==2.9.10
|
||||
psycopg[pool,binary]==3.2.7
|
||||
psycopg[pool,binary]==3.2.9
|
||||
clickhouse-connect==0.8.17
|
||||
elasticsearch==9.0.1
|
||||
jira==3.8.0
|
||||
|
|
@ -15,4 +15,4 @@ python-decouple==3.8
|
|||
pydantic[email]==2.11.4
|
||||
apscheduler==3.11.0
|
||||
|
||||
redis==6.0.0
|
||||
redis==6.1.0
|
||||
|
|
@ -19,7 +19,7 @@ $fn_def$, :'next_version')
|
|||
|
||||
--
|
||||
|
||||
|
||||
DROP SCHEMA IF EXISTS or_cache CASCADE;
|
||||
|
||||
COMMIT;
|
||||
|
||||
|
|
|
|||
|
|
@ -17,7 +17,6 @@ BEGIN;
|
|||
CREATE SCHEMA IF NOT EXISTS events_common;
|
||||
CREATE SCHEMA IF NOT EXISTS events;
|
||||
CREATE SCHEMA IF NOT EXISTS events_ios;
|
||||
CREATE SCHEMA IF NOT EXISTS or_cache;
|
||||
CREATE EXTENSION IF NOT EXISTS pg_trgm;
|
||||
CREATE EXTENSION IF NOT EXISTS pgcrypto;
|
||||
|
||||
|
|
@ -1264,17 +1263,6 @@ CREATE TABLE public.projects_conditions
|
|||
filters jsonb NOT NULL DEFAULT '[]'::jsonb
|
||||
);
|
||||
|
||||
CREATE TABLE or_cache.autocomplete_top_values
|
||||
(
|
||||
project_id integer NOT NULL REFERENCES public.projects (project_id) ON DELETE CASCADE,
|
||||
event_type text NOT NULL,
|
||||
event_key text NULL,
|
||||
result jsonb NULL,
|
||||
execution_time integer NULL,
|
||||
created_at timestamp DEFAULT timezone('utc'::text, now()) NOT NULL,
|
||||
UNIQUE NULLS NOT DISTINCT (project_id, event_type, event_key)
|
||||
);
|
||||
|
||||
CREATE SCHEMA IF NOT EXISTS spots;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS spots.spots
|
||||
|
|
|
|||
|
|
@ -17,6 +17,18 @@ $$ LANGUAGE sql IMMUTABLE;
|
|||
$fn_def$, :'next_version')
|
||||
\gexec
|
||||
|
||||
CREATE SCHEMA IF NOT EXISTS or_cache;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS or_cache.autocomplete_top_values
|
||||
(
|
||||
project_id integer NOT NULL REFERENCES public.projects (project_id) ON DELETE CASCADE,
|
||||
event_type text NOT NULL,
|
||||
event_key text NULL,
|
||||
result jsonb NULL,
|
||||
execution_time integer NULL,
|
||||
created_at timestamp DEFAULT timezone('utc'::text, now()) NOT NULL,
|
||||
UNIQUE NULLS NOT DISTINCT (project_id, event_type, event_key)
|
||||
);
|
||||
|
||||
COMMIT;
|
||||
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ $fn_def$, :'next_version')
|
|||
|
||||
--
|
||||
|
||||
|
||||
DROP SCHEMA IF EXISTS or_cache CASCADE;
|
||||
|
||||
COMMIT;
|
||||
|
||||
|
|
|
|||
|
|
@ -17,7 +17,6 @@ BEGIN;
|
|||
CREATE SCHEMA IF NOT EXISTS events_common;
|
||||
CREATE SCHEMA IF NOT EXISTS events;
|
||||
CREATE SCHEMA IF NOT EXISTS events_ios;
|
||||
CREATE SCHEMA IF NOT EXISTS or_cache;
|
||||
CREATE EXTENSION IF NOT EXISTS pg_trgm;
|
||||
CREATE EXTENSION IF NOT EXISTS pgcrypto;
|
||||
|
||||
|
|
@ -1148,16 +1147,6 @@ CREATE TABLE public.projects_conditions
|
|||
filters jsonb NOT NULL DEFAULT '[]'::jsonb
|
||||
);
|
||||
|
||||
CREATE TABLE or_cache.autocomplete_top_values
|
||||
(
|
||||
project_id integer NOT NULL REFERENCES public.projects (project_id) ON DELETE CASCADE,
|
||||
event_type text NOT NULL,
|
||||
event_key text NULL,
|
||||
result jsonb NULL,
|
||||
execution_time integer NULL,
|
||||
created_at timestamp DEFAULT timezone('utc'::text, now()) NOT NULL,
|
||||
UNIQUE NULLS NOT DISTINCT (project_id, event_type, event_key)
|
||||
);
|
||||
|
||||
CREATE SCHEMA IF NOT EXISTS spots;
|
||||
|
||||
|
|
|
|||
|
|
@ -17,6 +17,18 @@ $$ LANGUAGE sql IMMUTABLE;
|
|||
$fn_def$, :'next_version')
|
||||
\gexec
|
||||
|
||||
CREATE SCHEMA IF NOT EXISTS or_cache;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS or_cache.autocomplete_top_values
|
||||
(
|
||||
project_id integer NOT NULL REFERENCES public.projects (project_id) ON DELETE CASCADE,
|
||||
event_type text NOT NULL,
|
||||
event_key text NULL,
|
||||
result jsonb NULL,
|
||||
execution_time integer NULL,
|
||||
created_at timestamp DEFAULT timezone('utc'::text, now()) NOT NULL,
|
||||
UNIQUE NULLS NOT DISTINCT (project_id, event_type, event_key)
|
||||
);
|
||||
|
||||
COMMIT;
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue