Dev (#2829)
* fix(chalice): fixed Math-operators validation refactor(chalice): search for sessions that have events for heatmaps * refactor(chalice): search for sessions that have at least 1 location event for heatmaps * fix(chalice): fixed Math-operators validation refactor(chalice): search for sessions that have events for heatmaps * refactor(chalice): search for sessions that have at least 1 location event for heatmaps * feat(chalice): autocomplete return top 10 with stats * fix(chalice): fixed autocomplete top 10 meta-filters * refactor(chalice): refactored db-drivers refactor(scripts): defined ch-dataPort
This commit is contained in:
parent
f360961500
commit
aa8cebca7e
3 changed files with 23 additions and 34 deletions
|
|
@ -6,9 +6,18 @@ from queue import Queue, Empty
|
|||
|
||||
import clickhouse_connect
|
||||
from clickhouse_connect.driver.query import QueryContext
|
||||
from clickhouse_connect.driver.exceptions import DatabaseError
|
||||
from decouple import config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_CH_CONFIG = {"host": config("ch_host"),
|
||||
"user": config("ch_user", default="default"),
|
||||
"password": config("ch_password", default=""),
|
||||
"port": config("ch_port_http", cast=int),
|
||||
"client_name": config("APP_NAME", default="PY")}
|
||||
CH_CONFIG = dict(_CH_CONFIG)
|
||||
|
||||
settings = {}
|
||||
if config('ch_timeout', cast=int, default=-1) > 0:
|
||||
logging.info(f"CH-max_execution_time set to {config('ch_timeout')}s")
|
||||
|
|
@ -18,14 +27,6 @@ if config('ch_receive_timeout', cast=int, default=-1) > 0:
|
|||
logging.info(f"CH-receive_timeout set to {config('ch_receive_timeout')}s")
|
||||
settings = {**settings, "receive_timeout": config('ch_receive_timeout', cast=int)}
|
||||
|
||||
logger.info("-- CH config --")
|
||||
logger.info(f'host={config("ch_host")}')
|
||||
logger.info(f'database={config("ch_database", default="default")}')
|
||||
logger.info(f'user={config("ch_user", default="default")}')
|
||||
logger.info(f'password={config("ch_password", default="")}')
|
||||
logger.info(f'port={config("ch_port_http", cast=int)}')
|
||||
logger.info(f'settings={settings}')
|
||||
|
||||
extra_args = {}
|
||||
if config("CH_COMPRESSION", cast=bool, default=True):
|
||||
extra_args["compression"] = "lz4"
|
||||
|
|
@ -47,21 +48,17 @@ def transform_result(original_function):
|
|||
|
||||
|
||||
class ClickHouseConnectionPool:
|
||||
def __init__(self, min_size, max_size, settings):
|
||||
def __init__(self, min_size, max_size):
|
||||
self.min_size = min_size
|
||||
self.max_size = max_size
|
||||
self.pool = Queue()
|
||||
self.lock = threading.Lock()
|
||||
self.total_connections = 0
|
||||
self.settings = settings
|
||||
|
||||
# Initialize the pool with min_size connections
|
||||
for _ in range(self.min_size):
|
||||
client = clickhouse_connect.get_client(host=config("ch_host"),
|
||||
client = clickhouse_connect.get_client(**CH_CONFIG,
|
||||
database=config("ch_database", default="default"),
|
||||
user=config("ch_user", default="default"),
|
||||
password=config("ch_password", default=""),
|
||||
port=config("ch_port_http", cast=int),
|
||||
settings=settings,
|
||||
**extra_args)
|
||||
self.pool.put(client)
|
||||
|
|
@ -75,15 +72,10 @@ class ClickHouseConnectionPool:
|
|||
except Empty:
|
||||
with self.lock:
|
||||
if self.total_connections < self.max_size:
|
||||
client = clickhouse_connect.get_client(
|
||||
host=config("ch_host"),
|
||||
database=config("ch_database", default="default"),
|
||||
user=config("ch_user", default="default"),
|
||||
password=config("ch_password", default=""),
|
||||
port=config("ch_port_http", cast=int),
|
||||
settings=settings,
|
||||
**extra_args
|
||||
)
|
||||
client = clickhouse_connect.get_client(**CH_CONFIG,
|
||||
database=config("ch_database", default="default"),
|
||||
settings=settings,
|
||||
**extra_args)
|
||||
self.total_connections += 1
|
||||
return client
|
||||
# If max_size reached, wait until a connection is available
|
||||
|
|
@ -119,12 +111,11 @@ def make_pool():
|
|||
except Exception as error:
|
||||
logger.error("Error while closing all connexions to CH", error)
|
||||
try:
|
||||
CH_pool = ClickHouseConnectionPool(min_size=config("PG_MINCONN", cast=int, default=4),
|
||||
max_size=config("PG_MAXCONN", cast=int, default=8),
|
||||
settings=settings)
|
||||
CH_pool = ClickHouseConnectionPool(min_size=config("CH_MINCONN", cast=int, default=4),
|
||||
max_size=config("CH_MAXCONN", cast=int, default=8))
|
||||
if CH_pool is not None:
|
||||
logger.info("Connection pool created successfully for CH")
|
||||
except Exception as error:
|
||||
except ConnectionError as error:
|
||||
logger.error("Error while connecting to CH", error)
|
||||
if RETRY < RETRY_MAX:
|
||||
RETRY += 1
|
||||
|
|
@ -140,15 +131,12 @@ class ClickHouseClient:
|
|||
|
||||
def __init__(self, database=None):
|
||||
if self.__client is None:
|
||||
if config('CH_POOL', cast=bool, default=True):
|
||||
if database is None and config('CH_POOL', cast=bool, default=True):
|
||||
self.__client = CH_pool.get_connection()
|
||||
else:
|
||||
self.__client = clickhouse_connect.get_client(host=config("ch_host"),
|
||||
self.__client = clickhouse_connect.get_client(**CH_CONFIG,
|
||||
database=database if database else config("ch_database",
|
||||
default="default"),
|
||||
user=config("ch_user", default="default"),
|
||||
password=config("ch_password", default=""),
|
||||
port=config("ch_port_http", cast=int),
|
||||
settings=settings,
|
||||
**extra_args)
|
||||
self.__client.execute = transform_result(self.__client.query)
|
||||
|
|
@ -173,7 +161,7 @@ class ClickHouseClient:
|
|||
|
||||
|
||||
async def init():
|
||||
logger.info(f">CH_POOL:{config('CH_POOL', default=None)}")
|
||||
logger.info(f">use CH_POOL:{config('CH_POOL', default=True)}")
|
||||
if config('CH_POOL', cast=bool, default=True):
|
||||
make_pool()
|
||||
|
||||
|
|
|
|||
|
|
@ -166,7 +166,7 @@ class PostgresClient:
|
|||
|
||||
|
||||
async def init():
|
||||
logger.info(f">PG_POOL:{config('PG_POOL', default=None)}")
|
||||
logger.info(f">use PG_POOL:{config('PG_POOL', default=True)}")
|
||||
if config('PG_POOL', cast=bool, default=True):
|
||||
make_pool()
|
||||
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ clickhouse: &clickhouse
|
|||
password: ""
|
||||
service:
|
||||
webPort: 9000
|
||||
dataPort: 8123
|
||||
|
||||
# For enterpriseEdition
|
||||
quickwit: &quickwit
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue