refactor(chalice): initial product analytics
This commit is contained in:
parent
3d448320bf
commit
dd469d2349
16 changed files with 355 additions and 277 deletions
|
|
@ -16,7 +16,7 @@ from chalicelib.utils import helper
|
||||||
from chalicelib.utils import pg_client, ch_client
|
from chalicelib.utils import pg_client, ch_client
|
||||||
from crons import core_crons, core_dynamic_crons
|
from crons import core_crons, core_dynamic_crons
|
||||||
from routers import core, core_dynamic
|
from routers import core, core_dynamic
|
||||||
from routers.subs import insights, metrics, v1_api, health, usability_tests, spot, product_anaytics
|
from routers.subs import insights, metrics, v1_api, health, usability_tests, spot, product_analytics
|
||||||
|
|
||||||
loglevel = config("LOGLEVEL", default=logging.WARNING)
|
loglevel = config("LOGLEVEL", default=logging.WARNING)
|
||||||
print(f">Loglevel set to: {loglevel}")
|
print(f">Loglevel set to: {loglevel}")
|
||||||
|
|
@ -129,6 +129,6 @@ app.include_router(spot.public_app)
|
||||||
app.include_router(spot.app)
|
app.include_router(spot.app)
|
||||||
app.include_router(spot.app_apikey)
|
app.include_router(spot.app_apikey)
|
||||||
|
|
||||||
app.include_router(product_anaytics.public_app)
|
app.include_router(product_analytics.public_app, prefix="/pa")
|
||||||
app.include_router(product_anaytics.app)
|
app.include_router(product_analytics.app, prefix="/pa")
|
||||||
app.include_router(product_anaytics.app_apikey)
|
app.include_router(product_analytics.app_apikey, prefix="/pa")
|
||||||
|
|
|
||||||
|
|
@ -1,14 +0,0 @@
|
||||||
from chalicelib.utils.ch_client import ClickHouseClient
|
|
||||||
|
|
||||||
|
|
||||||
def search_events(project_id: int, data: dict):
|
|
||||||
with ClickHouseClient() as ch_client:
|
|
||||||
r = ch_client.format(
|
|
||||||
"""SELECT *
|
|
||||||
FROM taha.events
|
|
||||||
WHERE project_id=%(project_id)s
|
|
||||||
ORDER BY created_at;""",
|
|
||||||
params={"project_id": project_id})
|
|
||||||
x = ch_client.execute(r)
|
|
||||||
|
|
||||||
return x
|
|
||||||
0
api/chalicelib/core/product_analytics/__init__.py
Normal file
0
api/chalicelib/core/product_analytics/__init__.py
Normal file
28
api/chalicelib/core/product_analytics/events.py
Normal file
28
api/chalicelib/core/product_analytics/events.py
Normal file
|
|
@ -0,0 +1,28 @@
|
||||||
|
from chalicelib.utils import helper
|
||||||
|
from chalicelib.utils.ch_client import ClickHouseClient
|
||||||
|
|
||||||
|
|
||||||
|
def get_events(project_id: int):
|
||||||
|
with ClickHouseClient() as ch_client:
|
||||||
|
r = ch_client.format(
|
||||||
|
"""SELECT event_name, display_name
|
||||||
|
FROM product_analytics.all_events
|
||||||
|
WHERE project_id=%(project_id)s
|
||||||
|
ORDER BY display_name;""",
|
||||||
|
parameters={"project_id": project_id})
|
||||||
|
x = ch_client.execute(r)
|
||||||
|
|
||||||
|
return helper.list_to_camel_case(x)
|
||||||
|
|
||||||
|
|
||||||
|
def search_events(project_id: int, data: dict):
|
||||||
|
with ClickHouseClient() as ch_client:
|
||||||
|
r = ch_client.format(
|
||||||
|
"""SELECT *
|
||||||
|
FROM product_analytics.events
|
||||||
|
WHERE project_id=%(project_id)s
|
||||||
|
ORDER BY created_at;""",
|
||||||
|
parameters={"project_id": project_id})
|
||||||
|
x = ch_client.execute(r)
|
||||||
|
|
||||||
|
return helper.list_to_camel_case(x)
|
||||||
19
api/chalicelib/core/product_analytics/properties.py
Normal file
19
api/chalicelib/core/product_analytics/properties.py
Normal file
|
|
@ -0,0 +1,19 @@
|
||||||
|
from chalicelib.utils import helper
|
||||||
|
from chalicelib.utils.ch_client import ClickHouseClient
|
||||||
|
|
||||||
|
|
||||||
|
def get_properties(project_id: int, event_name):
|
||||||
|
with ClickHouseClient() as ch_client:
|
||||||
|
r = ch_client.format(
|
||||||
|
"""SELECT all_properties.property_name,
|
||||||
|
all_properties.display_name
|
||||||
|
FROM product_analytics.event_properties
|
||||||
|
INNER JOIN product_analytics.all_properties USING (property_name)
|
||||||
|
WHERE event_properties.project_id=%(project_id)s
|
||||||
|
AND all_properties.project_id=%(project_id)s
|
||||||
|
AND event_properties.event_name=%(event_name)s
|
||||||
|
ORDER BY created_at;""",
|
||||||
|
parameters={"project_id": project_id,"event_name": event_name})
|
||||||
|
properties = ch_client.execute(r)
|
||||||
|
|
||||||
|
return helper.list_to_camel_case(properties)
|
||||||
|
|
@ -671,24 +671,36 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu
|
||||||
events_conditions.append({"type": event_where[-1]})
|
events_conditions.append({"type": event_where[-1]})
|
||||||
if not is_any:
|
if not is_any:
|
||||||
if schemas.ClickEventExtraOperator.has_value(event.operator):
|
if schemas.ClickEventExtraOperator.has_value(event.operator):
|
||||||
event_where.append(json_condition(
|
# event_where.append(json_condition(
|
||||||
"main",
|
# "main",
|
||||||
"$properties",
|
# "$properties",
|
||||||
"selector", op, event.value, e_k)
|
# "selector", op, event.value, e_k)
|
||||||
|
# )
|
||||||
|
event_where.append(
|
||||||
|
sh.multi_conditions(f"main.`$properties`.selector {op} %({e_k})s",
|
||||||
|
event.value, value_key=e_k)
|
||||||
)
|
)
|
||||||
events_conditions[-1]["condition"] = event_where[-1]
|
events_conditions[-1]["condition"] = event_where[-1]
|
||||||
else:
|
else:
|
||||||
if is_not:
|
if is_not:
|
||||||
event_where.append(json_condition(
|
# event_where.append(json_condition(
|
||||||
"sub", "$properties", _column, op, event.value, e_k
|
# "sub", "$properties", _column, op, event.value, e_k
|
||||||
))
|
# ))
|
||||||
|
event_where.append(
|
||||||
|
sh.multi_conditions(f"sub.`$properties`.{_column} {op} %({e_k})s",
|
||||||
|
event.value, value_key=e_k)
|
||||||
|
)
|
||||||
events_conditions_not.append(
|
events_conditions_not.append(
|
||||||
{
|
{
|
||||||
"type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
|
"type": f"sub.`$event_name`='{exp_ch_helper.get_event_type(event_type, platform=platform)}'"})
|
||||||
events_conditions_not[-1]["condition"] = event_where[-1]
|
events_conditions_not[-1]["condition"] = event_where[-1]
|
||||||
else:
|
else:
|
||||||
|
# event_where.append(
|
||||||
|
# json_condition("main", "$properties", _column, op, event.value, e_k)
|
||||||
|
# )
|
||||||
event_where.append(
|
event_where.append(
|
||||||
json_condition("main", "$properties", _column, op, event.value, e_k)
|
sh.multi_conditions(f"main.`$properties`.{_column} {op} %({e_k})s",
|
||||||
|
event.value, value_key=e_k)
|
||||||
)
|
)
|
||||||
events_conditions[-1]["condition"] = event_where[-1]
|
events_conditions[-1]["condition"] = event_where[-1]
|
||||||
else:
|
else:
|
||||||
|
|
@ -870,12 +882,15 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu
|
||||||
events_conditions[-1]["condition"] = []
|
events_conditions[-1]["condition"] = []
|
||||||
if not is_any and event.value not in [None, "*", ""]:
|
if not is_any and event.value not in [None, "*", ""]:
|
||||||
event_where.append(
|
event_where.append(
|
||||||
sh.multi_conditions(f"(toString(main1.`$properties`.message) {op} %({e_k})s OR toString(main1.`$properties`.name) {op} %({e_k})s)",
|
sh.multi_conditions(
|
||||||
event.value, value_key=e_k))
|
f"(toString(main1.`$properties`.message) {op} %({e_k})s OR toString(main1.`$properties`.name) {op} %({e_k})s)",
|
||||||
|
event.value, value_key=e_k))
|
||||||
events_conditions[-1]["condition"].append(event_where[-1])
|
events_conditions[-1]["condition"].append(event_where[-1])
|
||||||
events_extra_join += f" AND {event_where[-1]}"
|
events_extra_join += f" AND {event_where[-1]}"
|
||||||
if len(event.source) > 0 and event.source[0] not in [None, "*", ""]:
|
if len(event.source) > 0 and event.source[0] not in [None, "*", ""]:
|
||||||
event_where.append(sh.multi_conditions(f"toString(main1.`$properties`.source) = %({s_k})s", event.source, value_key=s_k))
|
event_where.append(
|
||||||
|
sh.multi_conditions(f"toString(main1.`$properties`.source) = %({s_k})s", event.source,
|
||||||
|
value_key=s_k))
|
||||||
events_conditions[-1]["condition"].append(event_where[-1])
|
events_conditions[-1]["condition"].append(event_where[-1])
|
||||||
events_extra_join += f" AND {event_where[-1]}"
|
events_extra_join += f" AND {event_where[-1]}"
|
||||||
|
|
||||||
|
|
@ -1193,6 +1208,28 @@ def search_query_parts_ch(data: schemas.SessionsSearchPayloadSchema, error_statu
|
||||||
events_conditions[-1]["condition"] = " AND ".join(events_conditions[-1]["condition"])
|
events_conditions[-1]["condition"] = " AND ".join(events_conditions[-1]["condition"])
|
||||||
else:
|
else:
|
||||||
continue
|
continue
|
||||||
|
if event.properties is not None and len(event.properties.filters) > 0:
|
||||||
|
event_fiters = []
|
||||||
|
for l, property in enumerate(event.properties.filters):
|
||||||
|
a_k = f"{e_k}_att_{l}"
|
||||||
|
full_args = {**full_args,
|
||||||
|
**sh.multi_values(property.value, value_key=a_k)}
|
||||||
|
op = sh.get_sql_operator(property.operator)
|
||||||
|
condition = f"main.properties.{property.name} {op} %({a_k})s"
|
||||||
|
if property.is_predefined:
|
||||||
|
condition = f"main.{property.name} {op} %({a_k})s"
|
||||||
|
event_where.append(
|
||||||
|
sh.multi_conditions(condition, property.value, value_key=a_k)
|
||||||
|
)
|
||||||
|
event_fiters.append(event_where[-1])
|
||||||
|
if len(event_fiters) > 0:
|
||||||
|
events_conditions[-1]["condition"] += " AND ("
|
||||||
|
for l, e_f in enumerate(event_fiters):
|
||||||
|
if l > 0:
|
||||||
|
events_conditions[-1]["condition"] += event.properties.operators[l - 1] + e_f
|
||||||
|
else:
|
||||||
|
events_conditions[-1]["condition"] += e_f
|
||||||
|
events_conditions[-1]["condition"] += ")"
|
||||||
if event_index == 0 or or_events:
|
if event_index == 0 or or_events:
|
||||||
event_where += ss_constraints
|
event_where += ss_constraints
|
||||||
if is_not:
|
if is_not:
|
||||||
|
|
|
||||||
|
|
@ -175,11 +175,11 @@ def search_sessions(data: schemas.SessionsSearchPayloadSchema, project: schemas.
|
||||||
ORDER BY sort_key {data.order}
|
ORDER BY sort_key {data.order}
|
||||||
LIMIT %(sessions_limit)s OFFSET %(sessions_limit_s)s) AS sorted_sessions;""",
|
LIMIT %(sessions_limit)s OFFSET %(sessions_limit_s)s) AS sorted_sessions;""",
|
||||||
parameters=full_args)
|
parameters=full_args)
|
||||||
logging.debug("--------------------")
|
|
||||||
logging.debug(main_query)
|
|
||||||
logging.debug("--------------------")
|
|
||||||
try:
|
try:
|
||||||
|
logging.debug("--------------------")
|
||||||
sessions_list = cur.execute(main_query)
|
sessions_list = cur.execute(main_query)
|
||||||
|
logging.debug("--------------------")
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
logging.warning("--------- SESSIONS-CH SEARCH QUERY EXCEPTION -----------")
|
logging.warning("--------- SESSIONS-CH SEARCH QUERY EXCEPTION -----------")
|
||||||
logging.warning(main_query)
|
logging.warning(main_query)
|
||||||
|
|
|
||||||
|
|
@ -11,9 +11,3 @@ if smtp.has_smtp():
|
||||||
logger.info("valid SMTP configuration found")
|
logger.info("valid SMTP configuration found")
|
||||||
else:
|
else:
|
||||||
logger.info("no SMTP configuration found or SMTP validation failed")
|
logger.info("no SMTP configuration found or SMTP validation failed")
|
||||||
|
|
||||||
if config("EXP_CH_DRIVER", cast=bool, default=True):
|
|
||||||
logging.info(">>> Using new CH driver")
|
|
||||||
from . import ch_client_exp as ch_client
|
|
||||||
else:
|
|
||||||
from . import ch_client
|
|
||||||
|
|
|
||||||
|
|
@ -1,73 +1,185 @@
|
||||||
import logging
|
import logging
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from functools import wraps
|
||||||
|
from queue import Queue, Empty
|
||||||
|
|
||||||
import clickhouse_driver
|
import clickhouse_connect
|
||||||
|
from clickhouse_connect.driver.query import QueryContext
|
||||||
from decouple import config
|
from decouple import config
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_CH_CONFIG = {"host": config("ch_host"),
|
||||||
|
"user": config("ch_user", default="default"),
|
||||||
|
"password": config("ch_password", default=""),
|
||||||
|
"port": config("ch_port_http", cast=int),
|
||||||
|
"client_name": config("APP_NAME", default="PY")}
|
||||||
|
CH_CONFIG = dict(_CH_CONFIG)
|
||||||
|
|
||||||
settings = {}
|
settings = {}
|
||||||
if config('ch_timeout', cast=int, default=-1) > 0:
|
if config('ch_timeout', cast=int, default=-1) > 0:
|
||||||
logger.info(f"CH-max_execution_time set to {config('ch_timeout')}s")
|
logging.info(f"CH-max_execution_time set to {config('ch_timeout')}s")
|
||||||
settings = {**settings, "max_execution_time": config('ch_timeout', cast=int)}
|
settings = {**settings, "max_execution_time": config('ch_timeout', cast=int)}
|
||||||
|
|
||||||
if config('ch_receive_timeout', cast=int, default=-1) > 0:
|
if config('ch_receive_timeout', cast=int, default=-1) > 0:
|
||||||
logger.info(f"CH-receive_timeout set to {config('ch_receive_timeout')}s")
|
logging.info(f"CH-receive_timeout set to {config('ch_receive_timeout')}s")
|
||||||
settings = {**settings, "receive_timeout": config('ch_receive_timeout', cast=int)}
|
settings = {**settings, "receive_timeout": config('ch_receive_timeout', cast=int)}
|
||||||
|
|
||||||
|
extra_args = {}
|
||||||
|
if config("CH_COMPRESSION", cast=bool, default=True):
|
||||||
|
extra_args["compression"] = "lz4"
|
||||||
|
|
||||||
|
|
||||||
|
def transform_result(self, original_function):
|
||||||
|
@wraps(original_function)
|
||||||
|
def wrapper(*args, **kwargs):
|
||||||
|
if kwargs.get("parameters"):
|
||||||
|
if config("LOCAL_DEV", cast=bool, default=False):
|
||||||
|
logger.debug(self.format(query=kwargs.get("query", ""), parameters=kwargs.get("parameters")))
|
||||||
|
else:
|
||||||
|
logger.debug(
|
||||||
|
str.encode(self.format(query=kwargs.get("query", ""), parameters=kwargs.get("parameters"))))
|
||||||
|
elif len(args) > 0:
|
||||||
|
if config("LOCAL_DEV", cast=bool, default=False):
|
||||||
|
logger.debug(args[0])
|
||||||
|
else:
|
||||||
|
logger.debug(str.encode(args[0]))
|
||||||
|
result = original_function(*args, **kwargs)
|
||||||
|
if isinstance(result, clickhouse_connect.driver.query.QueryResult):
|
||||||
|
column_names = result.column_names
|
||||||
|
result = result.result_rows
|
||||||
|
result = [dict(zip(column_names, row)) for row in result]
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
|
class ClickHouseConnectionPool:
|
||||||
|
def __init__(self, min_size, max_size):
|
||||||
|
self.min_size = min_size
|
||||||
|
self.max_size = max_size
|
||||||
|
self.pool = Queue()
|
||||||
|
self.lock = threading.Lock()
|
||||||
|
self.total_connections = 0
|
||||||
|
|
||||||
|
# Initialize the pool with min_size connections
|
||||||
|
for _ in range(self.min_size):
|
||||||
|
client = clickhouse_connect.get_client(**CH_CONFIG,
|
||||||
|
database=config("ch_database", default="default"),
|
||||||
|
settings=settings,
|
||||||
|
**extra_args)
|
||||||
|
self.pool.put(client)
|
||||||
|
self.total_connections += 1
|
||||||
|
|
||||||
|
def get_connection(self):
|
||||||
|
try:
|
||||||
|
# Try to get a connection without blocking
|
||||||
|
client = self.pool.get_nowait()
|
||||||
|
return client
|
||||||
|
except Empty:
|
||||||
|
with self.lock:
|
||||||
|
if self.total_connections < self.max_size:
|
||||||
|
client = clickhouse_connect.get_client(**CH_CONFIG,
|
||||||
|
database=config("ch_database", default="default"),
|
||||||
|
settings=settings,
|
||||||
|
**extra_args)
|
||||||
|
self.total_connections += 1
|
||||||
|
return client
|
||||||
|
# If max_size reached, wait until a connection is available
|
||||||
|
client = self.pool.get()
|
||||||
|
return client
|
||||||
|
|
||||||
|
def release_connection(self, client):
|
||||||
|
self.pool.put(client)
|
||||||
|
|
||||||
|
def close_all(self):
|
||||||
|
with self.lock:
|
||||||
|
while not self.pool.empty():
|
||||||
|
client = self.pool.get()
|
||||||
|
client.close()
|
||||||
|
self.total_connections = 0
|
||||||
|
|
||||||
|
|
||||||
|
CH_pool: ClickHouseConnectionPool = None
|
||||||
|
|
||||||
|
RETRY_MAX = config("CH_RETRY_MAX", cast=int, default=50)
|
||||||
|
RETRY_INTERVAL = config("CH_RETRY_INTERVAL", cast=int, default=2)
|
||||||
|
RETRY = 0
|
||||||
|
|
||||||
|
|
||||||
|
def make_pool():
|
||||||
|
if not config('CH_POOL', cast=bool, default=True):
|
||||||
|
return
|
||||||
|
global CH_pool
|
||||||
|
global RETRY
|
||||||
|
if CH_pool is not None:
|
||||||
|
try:
|
||||||
|
CH_pool.close_all()
|
||||||
|
except Exception as error:
|
||||||
|
logger.error("Error while closing all connexions to CH", exc_info=error)
|
||||||
|
try:
|
||||||
|
CH_pool = ClickHouseConnectionPool(min_size=config("CH_MINCONN", cast=int, default=4),
|
||||||
|
max_size=config("CH_MAXCONN", cast=int, default=8))
|
||||||
|
if CH_pool is not None:
|
||||||
|
logger.info("Connection pool created successfully for CH")
|
||||||
|
except ConnectionError as error:
|
||||||
|
logger.error("Error while connecting to CH", exc_info=error)
|
||||||
|
if RETRY < RETRY_MAX:
|
||||||
|
RETRY += 1
|
||||||
|
logger.info(f"waiting for {RETRY_INTERVAL}s before retry n°{RETRY}")
|
||||||
|
time.sleep(RETRY_INTERVAL)
|
||||||
|
make_pool()
|
||||||
|
else:
|
||||||
|
raise error
|
||||||
|
|
||||||
|
|
||||||
class ClickHouseClient:
|
class ClickHouseClient:
|
||||||
__client = None
|
__client = None
|
||||||
|
|
||||||
def __init__(self, database=None):
|
def __init__(self, database=None):
|
||||||
extra_args = {}
|
if self.__client is None:
|
||||||
if config("CH_COMPRESSION", cast=bool, default=True):
|
if database is not None or not config('CH_POOL', cast=bool, default=True):
|
||||||
extra_args["compression"] = "lz4"
|
self.__client = clickhouse_connect.get_client(**CH_CONFIG,
|
||||||
self.__client = clickhouse_driver.Client(host=config("ch_host"),
|
database=database if database else config("ch_database",
|
||||||
database=database if database else config("ch_database",
|
default="default"),
|
||||||
default="default"),
|
settings=settings,
|
||||||
user=config("ch_user", default="default"),
|
**extra_args)
|
||||||
password=config("ch_password", default=""),
|
|
||||||
port=config("ch_port", cast=int),
|
else:
|
||||||
settings=settings,
|
self.__client = CH_pool.get_connection()
|
||||||
**extra_args) \
|
|
||||||
if self.__client is None else self.__client
|
self.__client.execute = transform_result(self, self.__client.query)
|
||||||
|
self.__client.format = self.format
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
return self
|
|
||||||
|
|
||||||
def execute(self, query, parameters=None, **args):
|
|
||||||
try:
|
|
||||||
results = self.__client.execute(query=query, params=parameters, with_column_types=True, **args)
|
|
||||||
keys = tuple(x for x, y in results[1])
|
|
||||||
return [dict(zip(keys, i)) for i in results[0]]
|
|
||||||
except Exception as err:
|
|
||||||
logger.error("--------- CH EXCEPTION -----------", exc_info=err)
|
|
||||||
logger.error("--------- CH QUERY EXCEPTION -----------")
|
|
||||||
logger.error(self.format(query=query, parameters=parameters)
|
|
||||||
.replace('\n', '\\n')
|
|
||||||
.replace(' ', ' ')
|
|
||||||
.replace(' ', ' '))
|
|
||||||
logger.error("--------------------")
|
|
||||||
raise err
|
|
||||||
|
|
||||||
def insert(self, query, params=None, **args):
|
|
||||||
return self.__client.execute(query=query, params=params, **args)
|
|
||||||
|
|
||||||
def client(self):
|
|
||||||
return self.__client
|
return self.__client
|
||||||
|
|
||||||
def format(self, query, parameters):
|
def format(self, query, parameters=None):
|
||||||
if parameters is None:
|
if parameters:
|
||||||
return query
|
ctx = QueryContext(query=query, parameters=parameters)
|
||||||
return self.__client.substitute_params(query, parameters, self.__client.connection.context)
|
return ctx.final_query
|
||||||
|
return query
|
||||||
|
|
||||||
def __exit__(self, *args):
|
def __exit__(self, *args):
|
||||||
pass
|
if config('CH_POOL', cast=bool, default=True):
|
||||||
|
CH_pool.release_connection(self.__client)
|
||||||
|
else:
|
||||||
|
self.__client.close()
|
||||||
|
|
||||||
|
|
||||||
async def init():
|
async def init():
|
||||||
logger.info(f">CH_POOL:not defined")
|
logger.info(f">use CH_POOL:{config('CH_POOL', default=True)}")
|
||||||
|
if config('CH_POOL', cast=bool, default=True):
|
||||||
|
make_pool()
|
||||||
|
|
||||||
|
|
||||||
async def terminate():
|
async def terminate():
|
||||||
pass
|
global CH_pool
|
||||||
|
if CH_pool is not None:
|
||||||
|
try:
|
||||||
|
CH_pool.close_all()
|
||||||
|
logger.info("Closed all connexions to CH")
|
||||||
|
except Exception as error:
|
||||||
|
logger.error("Error while closing all connexions to CH", exc_info=error)
|
||||||
|
|
|
||||||
|
|
@ -1,178 +0,0 @@
|
||||||
import logging
|
|
||||||
import threading
|
|
||||||
import time
|
|
||||||
from functools import wraps
|
|
||||||
from queue import Queue, Empty
|
|
||||||
|
|
||||||
import clickhouse_connect
|
|
||||||
from clickhouse_connect.driver.query import QueryContext
|
|
||||||
from decouple import config
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
_CH_CONFIG = {"host": config("ch_host"),
|
|
||||||
"user": config("ch_user", default="default"),
|
|
||||||
"password": config("ch_password", default=""),
|
|
||||||
"port": config("ch_port_http", cast=int),
|
|
||||||
"client_name": config("APP_NAME", default="PY")}
|
|
||||||
CH_CONFIG = dict(_CH_CONFIG)
|
|
||||||
|
|
||||||
settings = {}
|
|
||||||
if config('ch_timeout', cast=int, default=-1) > 0:
|
|
||||||
logging.info(f"CH-max_execution_time set to {config('ch_timeout')}s")
|
|
||||||
settings = {**settings, "max_execution_time": config('ch_timeout', cast=int)}
|
|
||||||
|
|
||||||
if config('ch_receive_timeout', cast=int, default=-1) > 0:
|
|
||||||
logging.info(f"CH-receive_timeout set to {config('ch_receive_timeout')}s")
|
|
||||||
settings = {**settings, "receive_timeout": config('ch_receive_timeout', cast=int)}
|
|
||||||
|
|
||||||
extra_args = {}
|
|
||||||
if config("CH_COMPRESSION", cast=bool, default=True):
|
|
||||||
extra_args["compression"] = "lz4"
|
|
||||||
|
|
||||||
|
|
||||||
def transform_result(self, original_function):
|
|
||||||
@wraps(original_function)
|
|
||||||
def wrapper(*args, **kwargs):
|
|
||||||
if kwargs.get("parameters"):
|
|
||||||
logger.debug(str.encode(self.format(query=kwargs.get("query", ""), parameters=kwargs.get("parameters"))))
|
|
||||||
elif len(args) > 0:
|
|
||||||
logger.debug(str.encode(args[0]))
|
|
||||||
result = original_function(*args, **kwargs)
|
|
||||||
if isinstance(result, clickhouse_connect.driver.query.QueryResult):
|
|
||||||
column_names = result.column_names
|
|
||||||
result = result.result_rows
|
|
||||||
result = [dict(zip(column_names, row)) for row in result]
|
|
||||||
|
|
||||||
return result
|
|
||||||
|
|
||||||
return wrapper
|
|
||||||
|
|
||||||
|
|
||||||
class ClickHouseConnectionPool:
|
|
||||||
def __init__(self, min_size, max_size):
|
|
||||||
self.min_size = min_size
|
|
||||||
self.max_size = max_size
|
|
||||||
self.pool = Queue()
|
|
||||||
self.lock = threading.Lock()
|
|
||||||
self.total_connections = 0
|
|
||||||
|
|
||||||
# Initialize the pool with min_size connections
|
|
||||||
for _ in range(self.min_size):
|
|
||||||
client = clickhouse_connect.get_client(**CH_CONFIG,
|
|
||||||
database=config("ch_database", default="default"),
|
|
||||||
settings=settings,
|
|
||||||
**extra_args)
|
|
||||||
self.pool.put(client)
|
|
||||||
self.total_connections += 1
|
|
||||||
|
|
||||||
def get_connection(self):
|
|
||||||
try:
|
|
||||||
# Try to get a connection without blocking
|
|
||||||
client = self.pool.get_nowait()
|
|
||||||
return client
|
|
||||||
except Empty:
|
|
||||||
with self.lock:
|
|
||||||
if self.total_connections < self.max_size:
|
|
||||||
client = clickhouse_connect.get_client(**CH_CONFIG,
|
|
||||||
database=config("ch_database", default="default"),
|
|
||||||
settings=settings,
|
|
||||||
**extra_args)
|
|
||||||
self.total_connections += 1
|
|
||||||
return client
|
|
||||||
# If max_size reached, wait until a connection is available
|
|
||||||
client = self.pool.get()
|
|
||||||
return client
|
|
||||||
|
|
||||||
def release_connection(self, client):
|
|
||||||
self.pool.put(client)
|
|
||||||
|
|
||||||
def close_all(self):
|
|
||||||
with self.lock:
|
|
||||||
while not self.pool.empty():
|
|
||||||
client = self.pool.get()
|
|
||||||
client.close()
|
|
||||||
self.total_connections = 0
|
|
||||||
|
|
||||||
|
|
||||||
CH_pool: ClickHouseConnectionPool = None
|
|
||||||
|
|
||||||
RETRY_MAX = config("CH_RETRY_MAX", cast=int, default=50)
|
|
||||||
RETRY_INTERVAL = config("CH_RETRY_INTERVAL", cast=int, default=2)
|
|
||||||
RETRY = 0
|
|
||||||
|
|
||||||
|
|
||||||
def make_pool():
|
|
||||||
if not config('CH_POOL', cast=bool, default=True):
|
|
||||||
return
|
|
||||||
global CH_pool
|
|
||||||
global RETRY
|
|
||||||
if CH_pool is not None:
|
|
||||||
try:
|
|
||||||
CH_pool.close_all()
|
|
||||||
except Exception as error:
|
|
||||||
logger.error("Error while closing all connexions to CH", exc_info=error)
|
|
||||||
try:
|
|
||||||
CH_pool = ClickHouseConnectionPool(min_size=config("CH_MINCONN", cast=int, default=4),
|
|
||||||
max_size=config("CH_MAXCONN", cast=int, default=8))
|
|
||||||
if CH_pool is not None:
|
|
||||||
logger.info("Connection pool created successfully for CH")
|
|
||||||
except ConnectionError as error:
|
|
||||||
logger.error("Error while connecting to CH", exc_info=error)
|
|
||||||
if RETRY < RETRY_MAX:
|
|
||||||
RETRY += 1
|
|
||||||
logger.info(f"waiting for {RETRY_INTERVAL}s before retry n°{RETRY}")
|
|
||||||
time.sleep(RETRY_INTERVAL)
|
|
||||||
make_pool()
|
|
||||||
else:
|
|
||||||
raise error
|
|
||||||
|
|
||||||
|
|
||||||
class ClickHouseClient:
|
|
||||||
__client = None
|
|
||||||
|
|
||||||
def __init__(self, database=None):
|
|
||||||
if self.__client is None:
|
|
||||||
if database is not None or not config('CH_POOL', cast=bool, default=True):
|
|
||||||
self.__client = clickhouse_connect.get_client(**CH_CONFIG,
|
|
||||||
database=database if database else config("ch_database",
|
|
||||||
default="default"),
|
|
||||||
settings=settings,
|
|
||||||
**extra_args)
|
|
||||||
|
|
||||||
else:
|
|
||||||
self.__client = CH_pool.get_connection()
|
|
||||||
|
|
||||||
self.__client.execute = transform_result(self, self.__client.query)
|
|
||||||
self.__client.format = self.format
|
|
||||||
|
|
||||||
def __enter__(self):
|
|
||||||
return self.__client
|
|
||||||
|
|
||||||
def format(self, query, parameters=None):
|
|
||||||
if parameters:
|
|
||||||
ctx = QueryContext(query=query, parameters=parameters)
|
|
||||||
return ctx.final_query
|
|
||||||
return query
|
|
||||||
|
|
||||||
def __exit__(self, *args):
|
|
||||||
if config('CH_POOL', cast=bool, default=True):
|
|
||||||
CH_pool.release_connection(self.__client)
|
|
||||||
else:
|
|
||||||
self.__client.close()
|
|
||||||
|
|
||||||
|
|
||||||
async def init():
|
|
||||||
logger.info(f">use CH_POOL:{config('CH_POOL', default=True)}")
|
|
||||||
if config('CH_POOL', cast=bool, default=True):
|
|
||||||
make_pool()
|
|
||||||
|
|
||||||
|
|
||||||
async def terminate():
|
|
||||||
global CH_pool
|
|
||||||
if CH_pool is not None:
|
|
||||||
try:
|
|
||||||
CH_pool.close_all()
|
|
||||||
logger.info("Closed all connexions to CH")
|
|
||||||
except Exception as error:
|
|
||||||
logger.error("Error while closing all connexions to CH", exc_info=error)
|
|
||||||
28
api/routers/subs/product_analytics.py
Normal file
28
api/routers/subs/product_analytics.py
Normal file
|
|
@ -0,0 +1,28 @@
|
||||||
|
import schemas
|
||||||
|
from chalicelib.core.product_analytics import events, properties
|
||||||
|
from fastapi import Depends
|
||||||
|
from or_dependencies import OR_context
|
||||||
|
from routers.base import get_routers
|
||||||
|
|
||||||
|
public_app, app, app_apikey = get_routers()
|
||||||
|
|
||||||
|
|
||||||
|
@app.get('/{projectId}/properties/search', tags=["product_analytics"])
|
||||||
|
def get_event_properties(projectId: int, event_name: str = None,
|
||||||
|
context: schemas.CurrentContext = Depends(OR_context)):
|
||||||
|
if not event_name or len(event_name) == 0:
|
||||||
|
return {"data": []}
|
||||||
|
return {"data": properties.get_properties(project_id=projectId, event_name=event_name)}
|
||||||
|
|
||||||
|
|
||||||
|
@app.get('/{projectId}/events/names', tags=["dashboard"])
|
||||||
|
def get_all_events(projectId: int,
|
||||||
|
context: schemas.CurrentContext = Depends(OR_context)):
|
||||||
|
return {"data": events.get_events(project_id=projectId)}
|
||||||
|
|
||||||
|
|
||||||
|
@app.post('/{projectId}/events/search', tags=["dashboard"])
|
||||||
|
def search_events(projectId: int,
|
||||||
|
# data: schemas.CreateDashboardSchema = Body(...),
|
||||||
|
context: schemas.CurrentContext = Depends(OR_context)):
|
||||||
|
return {"data": events.search_events(project_id=projectId, data={})}
|
||||||
|
|
@ -1,15 +0,0 @@
|
||||||
import schemas
|
|
||||||
from chalicelib.core.metrics import product_anaytics2
|
|
||||||
from fastapi import Depends
|
|
||||||
from or_dependencies import OR_context
|
|
||||||
from routers.base import get_routers
|
|
||||||
|
|
||||||
|
|
||||||
public_app, app, app_apikey = get_routers()
|
|
||||||
|
|
||||||
|
|
||||||
@app.post('/{projectId}/events/search', tags=["dashboard"])
|
|
||||||
def search_events(projectId: int,
|
|
||||||
# data: schemas.CreateDashboardSchema = Body(...),
|
|
||||||
context: schemas.CurrentContext = Depends(OR_context)):
|
|
||||||
return product_anaytics2.search_events(project_id=projectId, data={})
|
|
||||||
|
|
@ -545,6 +545,70 @@ class RequestGraphqlFilterSchema(BaseModel):
|
||||||
return values
|
return values
|
||||||
|
|
||||||
|
|
||||||
|
class EventPredefinedPropertyType(str, Enum):
|
||||||
|
TIME = "$time"
|
||||||
|
SOURCE = "$source"
|
||||||
|
DURATION_S = "$duration_s"
|
||||||
|
DESCRIPTION = "description"
|
||||||
|
AUTO_CAPTURED = "$auto_captured"
|
||||||
|
SDK_EDITION = "$sdk_edition"
|
||||||
|
SDK_VERSION = "$sdk_version"
|
||||||
|
DEVICE_ID = "$device_id"
|
||||||
|
OS = "$os"
|
||||||
|
OS_VERSION = "$os_version"
|
||||||
|
BROWSER = "$browser"
|
||||||
|
BROWSER_VERSION = "$browser_version"
|
||||||
|
DEVICE = "$device"
|
||||||
|
SCREEN_HEIGHT = "$screen_height"
|
||||||
|
SCREEN_WIDTH = "$screen_width"
|
||||||
|
CURRENT_URL = "$current_url"
|
||||||
|
INITIAL_REFERRER = "$initial_referrer"
|
||||||
|
REFERRING_DOMAIN = "$referring_domain"
|
||||||
|
REFERRER = "$referrer"
|
||||||
|
INITIAL_REFERRING_DOMAIN = "$initial_referring_domain"
|
||||||
|
SEARCH_ENGINE = "$search_engine"
|
||||||
|
SEARCH_ENGINE_KEYWORD = "$search_engine_keyword"
|
||||||
|
UTM_SOURCE = "utm_source"
|
||||||
|
UTM_MEDIUM = "utm_medium"
|
||||||
|
UTM_CAMPAIGN = "utm_campaign"
|
||||||
|
COUNTRY = "$country"
|
||||||
|
STATE = "$state"
|
||||||
|
CITY = "$city"
|
||||||
|
ISSUE_TYPE = "issue_type"
|
||||||
|
TAGS = "$tags"
|
||||||
|
IMPORT = "$import"
|
||||||
|
|
||||||
|
|
||||||
|
class PropertyFilterSchema(BaseModel):
|
||||||
|
name: Union[EventPredefinedPropertyType, str] = Field(...)
|
||||||
|
operator: Union[SearchEventOperator, MathOperator] = Field(...)
|
||||||
|
value: List[Union[int, str]] = Field(...)
|
||||||
|
property_type: Optional[Literal["string", "number", "date"]] = Field(default=None)
|
||||||
|
|
||||||
|
@computed_field
|
||||||
|
@property
|
||||||
|
def is_predefined(self) -> bool:
|
||||||
|
return EventPredefinedPropertyType.has_value(self.name)
|
||||||
|
|
||||||
|
@model_validator(mode="after")
|
||||||
|
def transform_name(self):
|
||||||
|
if isinstance(self.name, Enum):
|
||||||
|
self.name = self.name.value
|
||||||
|
return self
|
||||||
|
|
||||||
|
|
||||||
|
class EventPropertiesSchema(BaseModel):
|
||||||
|
operators: List[Literal["and", "or"]] = Field(...)
|
||||||
|
filters: List[PropertyFilterSchema] = Field(...)
|
||||||
|
|
||||||
|
@model_validator(mode="after")
|
||||||
|
def event_filter_validator(self):
|
||||||
|
assert len(self.filters) == 0 \
|
||||||
|
or len(self.operators) == len(self.filters) - 1, \
|
||||||
|
"Number of operators must match the number of filter-1"
|
||||||
|
return self
|
||||||
|
|
||||||
|
|
||||||
class SessionSearchEventSchema2(BaseModel):
|
class SessionSearchEventSchema2(BaseModel):
|
||||||
is_event: Literal[True] = True
|
is_event: Literal[True] = True
|
||||||
value: List[Union[str, int]] = Field(...)
|
value: List[Union[str, int]] = Field(...)
|
||||||
|
|
@ -553,6 +617,7 @@ class SessionSearchEventSchema2(BaseModel):
|
||||||
source: Optional[List[Union[ErrorSource, int, str]]] = Field(default=None)
|
source: Optional[List[Union[ErrorSource, int, str]]] = Field(default=None)
|
||||||
sourceOperator: Optional[MathOperator] = Field(default=None)
|
sourceOperator: Optional[MathOperator] = Field(default=None)
|
||||||
filters: Optional[List[RequestGraphqlFilterSchema]] = Field(default_factory=list)
|
filters: Optional[List[RequestGraphqlFilterSchema]] = Field(default_factory=list)
|
||||||
|
properties: Optional[EventPropertiesSchema] = Field(default=None)
|
||||||
|
|
||||||
_remove_duplicate_values = field_validator('value', mode='before')(remove_duplicate_values)
|
_remove_duplicate_values = field_validator('value', mode='before')(remove_duplicate_values)
|
||||||
_single_to_list_values = field_validator('value', mode='before')(single_to_list)
|
_single_to_list_values = field_validator('value', mode='before')(single_to_list)
|
||||||
|
|
|
||||||
1
ee/api/.gitignore
vendored
1
ee/api/.gitignore
vendored
|
|
@ -230,6 +230,7 @@ Pipfile.lock
|
||||||
/chalicelib/core/socket_ios.py
|
/chalicelib/core/socket_ios.py
|
||||||
/chalicelib/core/sourcemaps
|
/chalicelib/core/sourcemaps
|
||||||
/chalicelib/core/tags.py
|
/chalicelib/core/tags.py
|
||||||
|
/chalicelib/core/product_analytics
|
||||||
/chalicelib/saml
|
/chalicelib/saml
|
||||||
/chalicelib/utils/__init__.py
|
/chalicelib/utils/__init__.py
|
||||||
/chalicelib/utils/args_transformer.py
|
/chalicelib/utils/args_transformer.py
|
||||||
|
|
|
||||||
|
|
@ -150,9 +150,9 @@ app.include_router(spot.public_app)
|
||||||
app.include_router(spot.app)
|
app.include_router(spot.app)
|
||||||
app.include_router(spot.app_apikey)
|
app.include_router(spot.app_apikey)
|
||||||
|
|
||||||
app.include_router(product_anaytics.public_app)
|
app.include_router(product_anaytics.public_app, prefix="/ap")
|
||||||
app.include_router(product_anaytics.app)
|
app.include_router(product_anaytics.app, prefix="/ap")
|
||||||
app.include_router(product_anaytics.app_apikey)
|
app.include_router(product_anaytics.app_apikey, prefix="/ap")
|
||||||
|
|
||||||
if config("ENABLE_SSO", cast=bool, default=True):
|
if config("ENABLE_SSO", cast=bool, default=True):
|
||||||
app.include_router(saml.public_app)
|
app.include_router(saml.public_app)
|
||||||
|
|
|
||||||
|
|
@ -52,6 +52,7 @@ rm -rf ./chalicelib/core/socket_ios.py
|
||||||
rm -rf ./chalicelib/core/sourcemaps
|
rm -rf ./chalicelib/core/sourcemaps
|
||||||
rm -rf ./chalicelib/core/user_testing.py
|
rm -rf ./chalicelib/core/user_testing.py
|
||||||
rm -rf ./chalicelib/core/tags.py
|
rm -rf ./chalicelib/core/tags.py
|
||||||
|
rm -rf ./chalicelib/core/product_analytics
|
||||||
rm -rf ./chalicelib/saml
|
rm -rf ./chalicelib/saml
|
||||||
rm -rf ./chalicelib/utils/__init__.py
|
rm -rf ./chalicelib/utils/__init__.py
|
||||||
rm -rf ./chalicelib/utils/args_transformer.py
|
rm -rf ./chalicelib/utils/args_transformer.py
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue