diff --git a/ee/recommendation/airflow-local/Dockerfile b/ee/recommendation/airflow-local/Dockerfile new file mode 100644 index 000000000..992bcf89a --- /dev/null +++ b/ee/recommendation/airflow-local/Dockerfile @@ -0,0 +1,14 @@ +FROM apache/airflow:2.4.3 +COPY requirements.txt . + +USER root +RUN apt-get update \ + && apt-get install -y \ + vim \ + && apt-get install gcc libc-dev g++ -y \ + && apt-get install -y pkg-config libxml2-dev libxmlsec1-dev libxmlsec1-openssl + + +USER airflow +RUN pip install --upgrade pip +RUN pip install -r requirements.txt diff --git a/ee/recommendation/airflow-local/dags/training_dag.py b/ee/recommendation/airflow-local/dags/training_dag.py new file mode 100644 index 000000000..73c98b4ef --- /dev/null +++ b/ee/recommendation/airflow-local/dags/training_dag.py @@ -0,0 +1,42 @@ +from datetime import datetime, timedelta +from textwrap import dedent + +import pendulum + +from airflow import DAG +from airflow.operators.bash import BashOperator +from airflow.operators.python import PythonOperator +import os +_work_dir = os.getcwd() + +def my_function(): + l = os.listdir('scripts') + print(l) + return l + +dag = DAG( + "first_test", + default_args={ + "depends_on_past": True, + "retries": 1, + "retry_delay": timedelta(minutes=3), + }, + start_date=pendulum.datetime(2015, 12, 1, tz="UTC"), + description="My first test", + schedule="@daily", + catchup=False, +) + + +#assigning the task for our dag to do +with dag: + first_world = PythonOperator( + task_id='FirstTest', + python_callable=my_function, + ) + hello_world = BashOperator( + task_id='OneTest', + bash_command=f'python {_work_dir}/scripts/task.py --mode train --kernel linear', + # provide_context=True + ) + first_world >> hello_world diff --git a/ee/recommendation/airflow-local/docker-compose.yaml b/ee/recommendation/airflow-local/docker-compose.yaml new file mode 100644 index 000000000..bc860831f --- /dev/null +++ b/ee/recommendation/airflow-local/docker-compose.yaml @@ -0,0 +1,284 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL. +# +# WARNING: This configuration is for local development. Do not use it in a production deployment. +# +# This configuration supports basic configuration using environment variables or an .env file +# The following variables are supported: +# +# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow. +# Default: apache/airflow:2.4.3 +# AIRFLOW_UID - User ID in Airflow containers +# Default: 50000 +# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode +# +# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested). +# Default: airflow +# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account (if requested). +# Default: airflow +# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers. +# Default: '' +# +# Feel free to modify this file to suit your needs. +--- +version: '3' +x-airflow-common: + &airflow-common + # In order to add custom dependencies or upgrade provider packages you can use your extended image. + # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml + # and uncomment the "build" line below, Then run `docker-compose build` to build the images. + # image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.4.3} + build: . + environment: + &airflow-common-env + AIRFLOW__CORE__EXECUTOR: CeleryExecutor + AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow + # For backward compatibility, with Airflow <2.3 + AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow + AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow + AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 + AIRFLOW__CORE__FERNET_KEY: '' + AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' + AIRFLOW__CORE__LOAD_EXAMPLES: 'false' + AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth' + _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} + AIRFLOW__CODE_EDITOR__ENABLED: 'true' + AIRFLOW__CODE_EDITOR__GIT_ENABLED: 'false' + AIRFLOW__CODE_EDITOR__STRING_NORMALIZATION: 'true' + AIRFLOW__CODE_EDITOR__MOUNT: '/opt/airflow/dags' + pg_user: airflow + pg_password: airflow + pg_dbname: airflow + pg_host: postgresql+psycopg2://airflow:airflow@postgres/airflow + pg_port: 5432 + PG_TIMEOUT: 30 + PG_POOL: 'true' + volumes: + - ./dags:/opt/airflow/dags + - ./logs:/opt/airflow/logs + - ./plugins:/opt/airflow/plugins + - ./scripts:/opt/airflow/scripts + user: "${AIRFLOW_UID:-50000}:0" + depends_on: + &airflow-common-depends-on + redis: + condition: service_healthy + postgres: + condition: service_healthy + +services: + postgres: + image: postgres:13 + environment: + POSTGRES_USER: airflow + POSTGRES_PASSWORD: airflow + POSTGRES_DB: airflow + volumes: + - postgres-db-volume:/var/lib/postgresql/data + healthcheck: + test: ["CMD", "pg_isready", "-U", "airflow"] + interval: 5s + retries: 5 + restart: always + + redis: + image: redis:latest + expose: + - 6379 + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 30s + retries: 50 + restart: always + + airflow-webserver: + <<: *airflow-common + command: webserver + ports: + - 8080:8080 + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] + interval: 10s + timeout: 10s + retries: 5 + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-scheduler: + <<: *airflow-common + command: scheduler + healthcheck: + test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"'] + interval: 10s + timeout: 10s + retries: 5 + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-worker: + <<: *airflow-common + command: celery worker + healthcheck: + test: + - "CMD-SHELL" + - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' + interval: 10s + timeout: 10s + retries: 5 + environment: + <<: *airflow-common-env + # Required to handle warm shutdown of the celery workers properly + # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation + DUMB_INIT_SETSID: "0" + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-triggerer: + <<: *airflow-common + command: triggerer + healthcheck: + test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"'] + interval: 10s + timeout: 10s + retries: 5 + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-init: + <<: *airflow-common + entrypoint: /bin/bash + # yamllint disable rule:line-length + command: + - -c + - | + function ver() { + printf "%04d%04d%04d%04d" $${1//./ } + } + register-python-argcomplete airflow >> ~/.bashrc + airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version) + airflow_version_comparable=$$(ver $${airflow_version}) + min_airflow_version=2.2.0 + min_airflow_version_comparable=$$(ver $${min_airflow_version}) + if [[ -z "${AIRFLOW_UID}" ]]; then + echo + echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m" + echo "If you are on Linux, you SHOULD follow the instructions below to set " + echo "AIRFLOW_UID environment variable, otherwise files will be owned by root." + echo "For other operating systems you can get rid of the warning with manually created .env file:" + echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user" + echo + fi + one_meg=1048576 + mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg)) + cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat) + disk_available=$$(df / | tail -1 | awk '{print $$4}') + warning_resources="false" + if (( mem_available < 4000 )) ; then + echo + echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m" + echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))" + echo + warning_resources="true" + fi + if (( cpus_available < 2 )); then + echo + echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m" + echo "At least 2 CPUs recommended. You have $${cpus_available}" + echo + warning_resources="true" + fi + if (( disk_available < one_meg * 10 )); then + echo + echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m" + echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))" + echo + warning_resources="true" + fi + if [[ $${warning_resources} == "true" ]]; then + echo + echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m" + echo "Please follow the instructions to increase amount of resources available:" + echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin" + echo + fi + mkdir -p /sources/logs /sources/dags /sources/plugins + chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins} + exec /entrypoint airflow version + # yamllint enable rule:line-length + environment: + <<: *airflow-common-env + _AIRFLOW_DB_UPGRADE: 'true' + _AIRFLOW_WWW_USER_CREATE: 'true' + _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} + _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow} + _PIP_ADDITIONAL_REQUIREMENTS: '' + user: "0:0" + volumes: + - .:/sources + + airflow-cli: + <<: *airflow-common + profiles: + - debug + environment: + <<: *airflow-common-env + CONNECTION_CHECK_MAX_COUNT: "0" + # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252 + command: + - bash + - -c + - airflow + + # You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up + # or by explicitly targeted on the command line e.g. docker-compose up flower. + # See: https://docs.docker.com/compose/profiles/ + flower: + <<: *airflow-common + command: celery flower + profiles: + - flower + ports: + - 5555:5555 + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:5555/"] + interval: 10s + timeout: 10s + retries: 5 + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + +volumes: + postgres-db-volume: diff --git a/ee/recommendation/requirements.txt b/ee/recommendation/airflow-local/requirements.txt similarity index 55% rename from ee/recommendation/requirements.txt rename to ee/recommendation/airflow-local/requirements.txt index 8f3f9e958..46fe26aa1 100644 --- a/ee/recommendation/requirements.txt +++ b/ee/recommendation/airflow-local/requirements.txt @@ -3,21 +3,17 @@ urllib3==1.26.12 pyjwt==2.5.0 psycopg2-binary==2.9.3 -numpy==1.23.4 +numpy threadpoolctl==3.1.0 joblib==1.2.0 -scipy==1.9.3 -scikit-learn==1.1.3 +scipy +scikit-learn -apache-airflow==2.4.3 +airflow-code-editor -fastapi==0.85.0 -fastapi-utils -uvicorn[standard]==0.18.3 -python-decouple==3.6 pydantic[email]==1.10.2 -apscheduler==3.9.1 clickhouse-driver==0.2.4 python3-saml==1.14.0 python-multipart==0.0.5 +python-decouple diff --git a/ee/recommendation/recommendation.py b/ee/recommendation/airflow-local/scripts/core/recommendation.py similarity index 89% rename from ee/recommendation/recommendation.py rename to ee/recommendation/airflow-local/scripts/core/recommendation.py index f525bdc98..b808c20d5 100644 --- a/ee/recommendation/recommendation.py +++ b/ee/recommendation/airflow-local/scripts/core/recommendation.py @@ -2,6 +2,7 @@ from utils.ch_client import ClickHouseClient from utils.pg_client import PostgresClient def get_features_clickhouse(**kwargs): + """Gets features from ClickHouse database""" if 'limit' in kwargs: limit = kwargs['limit'] else: @@ -16,6 +17,7 @@ ON T1.session_id = T2.session_id AND T1.project_id = T2.project_id;""" def query_funnels(*kwargs): + """Gets Funnels (PG database)""" # If public.funnel is empty funnels_query = f"""SELECT project_id, user_id, filter FROM (SELECT project_id, user_id, metric_id FROM public.metrics WHERE metric_type='funnel' ) as T1 LEFT JOIN (SELECT filter, metric_id FROM public.metric_series) as T2 ON T1.metric_id = T2.metric_id""" @@ -29,6 +31,7 @@ def query_funnels(*kwargs): def query_metrics(*kwargs): + """Gets Metrics (PG_database)""" metrics_query = """SELECT metric_type, metric_of, metric_value, metric_format FROM public.metrics""" with PostgresClient() as conn: conn.execute(metrics_query) @@ -37,9 +40,10 @@ def query_metrics(*kwargs): def query_with_filters(*kwargs): + """Gets Metrics with filters (PG database)""" filters_query = """SELECT T1.metric_id as metric_id, project_id, name, metric_type, metric_of, filter FROM ( - SELECT metric_id, project_id, name, metric_type, metric_of FROM metric_series WHERE filter != '{}') as T1 INNER JOIN - (SELECT metric_id, filter FROM metrics) as T2 ON T1.metric_id = T2.metric_id""" + SELECT metric_id, project_id, name, metric_type, metric_of FROM metrics) as T1 INNER JOIN + (SELECT metric_id, filter FROM metric_series WHERE filter != '{}') as T2 ON T1.metric_id = T2.metric_id""" with PostgresClient() as conn: conn.execute(filters_query) res = conn.fetchall() @@ -104,6 +108,10 @@ def get_by_user(data, user_id): return [data[k] for k in index_] +def test(): + print('One test') + if __name__ == '__main__': - data = get_features_clickhouse() - print('Data length:', len(data)) + print('Just a test') + #data = get_features_clickhouse() + #print('Data length:', len(data)) diff --git a/ee/recommendation/airflow-local/scripts/task.py b/ee/recommendation/airflow-local/scripts/task.py new file mode 100644 index 000000000..b06ebfd0e --- /dev/null +++ b/ee/recommendation/airflow-local/scripts/task.py @@ -0,0 +1,23 @@ +import argparse +from core import recommendation +from utils import pg_client +import asyncio + +#TODO: remove this module +import pandas as pd + +parser = argparse.ArgumentParser(description='Handle machine learning inputs.') +parser.add_argument('--mode', choices=['train', 'test'], required=True, help='--mode sets the model in train or test mode') +parser.add_argument('--kernel', default='linear', help='--kernel set the kernel to be used for SVM') + +args = parser.parse_args() + +if __name__ == '__main__': + asyncio.run(pg_client.init()) + data1 = recommendation.query_funnels() + print(pd.DataFrame(data1)) + data2 = recommendation.query_with_filters() + print(pd.DataFrame(data2)) + data3 = recommendation.query_metrics() + print(pd.DataFrame(data3)) + print(args) diff --git a/ee/recommendation/airflow-local/scripts/utils/ch_client.py b/ee/recommendation/airflow-local/scripts/utils/ch_client.py new file mode 100644 index 000000000..514820212 --- /dev/null +++ b/ee/recommendation/airflow-local/scripts/utils/ch_client.py @@ -0,0 +1,54 @@ +import logging + +import clickhouse_driver +from decouple import config + +logging.basicConfig(level=config("LOGLEVEL", default=logging.INFO)) + +settings = {} +if config('ch_timeout', cast=int, default=-1) > 0: + logging.info(f"CH-max_execution_time set to {config('ch_timeout')}s") + settings = {**settings, "max_execution_time": config('ch_timeout', cast=int)} + +if config('ch_receive_timeout', cast=int, default=-1) > 0: + logging.info(f"CH-receive_timeout set to {config('ch_receive_timeout')}s") + settings = {**settings, "receive_timeout": config('ch_receive_timeout', cast=int)} + + +class ClickHouseClient: + __client = None + + def __init__(self): + self.__client = clickhouse_driver.Client(host=config("ch_host"), + database="default", + port=config("ch_port", cast=int), + settings=settings) \ + if self.__client is None else self.__client + + def __enter__(self): + return self + + def execute(self, query, params=None, **args): + try: + results = self.__client.execute(query=query, params=params, with_column_types=True, **args) + keys = tuple(x for x, y in results[1]) + return [dict(zip(keys, i)) for i in results[0]] + except Exception as err: + logging.error("--------- CH QUERY EXCEPTION -----------") + logging.error(self.format(query=query, params=params)) + logging.error("--------------------") + raise err + + def insert(self, query, params=None, **args): + return self.__client.execute(query=query, params=params, **args) + + def client(self): + return self.__client + + def format(self, query, params): + if params is None: + return query + return self.__client.substitute_params(query, params, self.__client.connection.context) + + def __exit__(self, *args): + pass diff --git a/ee/recommendation/airflow-local/scripts/utils/pg_client.py b/ee/recommendation/airflow-local/scripts/utils/pg_client.py new file mode 100644 index 000000000..69a5b5a8b --- /dev/null +++ b/ee/recommendation/airflow-local/scripts/utils/pg_client.py @@ -0,0 +1,166 @@ +import logging +import time +from threading import Semaphore + +import psycopg2 +import psycopg2.extras +from decouple import config +from psycopg2 import pool + +logging.basicConfig(level=config("LOGLEVEL", default=logging.INFO)) +logging.getLogger('apscheduler').setLevel(config("LOGLEVEL", default=logging.INFO)) + +_PG_CONFIG = {"host": config("pg_host"), + "database": config("pg_dbname"), + "user": config("pg_user"), + "password": config("pg_password"), + "port": config("pg_port", cast=int), + "application_name": config("APP_NAME", default="PY")} +PG_CONFIG = dict(_PG_CONFIG) +if config("PG_TIMEOUT", cast=int, default=0) > 0: + PG_CONFIG["options"] = f"-c statement_timeout={config('PG_TIMEOUT', cast=int) * 1000}" + + +class ORThreadedConnectionPool(psycopg2.pool.ThreadedConnectionPool): + def __init__(self, minconn, maxconn, *args, **kwargs): + self._semaphore = Semaphore(maxconn) + super().__init__(minconn, maxconn, *args, **kwargs) + + def getconn(self, *args, **kwargs): + self._semaphore.acquire() + try: + return super().getconn(*args, **kwargs) + except psycopg2.pool.PoolError as e: + if str(e) == "connection pool is closed": + make_pool() + raise e + + def putconn(self, *args, **kwargs): + try: + super().putconn(*args, **kwargs) + self._semaphore.release() + except psycopg2.pool.PoolError as e: + if str(e) == "trying to put unkeyed connection": + print("!!! trying to put unkeyed connection") + print(f"env-PG_POOL:{config('PG_POOL', default=None)}") + return + raise e + + +postgreSQL_pool: ORThreadedConnectionPool = None + +RETRY_MAX = config("PG_RETRY_MAX", cast=int, default=50) +RETRY_INTERVAL = config("PG_RETRY_INTERVAL", cast=int, default=2) +RETRY = 0 + + +def make_pool(): + if not config('PG_POOL', cast=bool, default=True): + return + global postgreSQL_pool + global RETRY + if postgreSQL_pool is not None: + try: + postgreSQL_pool.closeall() + except (Exception, psycopg2.DatabaseError) as error: + logging.error("Error while closing all connexions to PostgreSQL", error) + try: + postgreSQL_pool = ORThreadedConnectionPool(config("PG_MINCONN", cast=int, default=20), + config("PG_MAXCONN", cast=int, default=80), + **PG_CONFIG) + if (postgreSQL_pool): + logging.info("Connection pool created successfully") + except (Exception, psycopg2.DatabaseError) as error: + logging.error("Error while connecting to PostgreSQL", error) + if RETRY < RETRY_MAX: + RETRY += 1 + logging.info(f"waiting for {RETRY_INTERVAL}s before retry n°{RETRY}") + time.sleep(RETRY_INTERVAL) + make_pool() + else: + raise error + + +class PostgresClient: + connection = None + cursor = None + long_query = False + unlimited_query = False + + def __init__(self, long_query=False, unlimited_query=False): + self.long_query = long_query + self.unlimited_query = unlimited_query + if unlimited_query: + long_config = dict(_PG_CONFIG) + long_config["application_name"] += "-UNLIMITED" + self.connection = psycopg2.connect(**long_config) + elif long_query: + long_config = dict(_PG_CONFIG) + long_config["application_name"] += "-LONG" + long_config["options"] = f"-c statement_timeout=" \ + f"{config('pg_long_timeout', cast=int, default=5 * 60) * 1000}" + self.connection = psycopg2.connect(**long_config) + elif not config('PG_POOL', cast=bool, default=True): + single_config = dict(_PG_CONFIG) + single_config["application_name"] += "-NOPOOL" + single_config["options"] = f"-c statement_timeout={config('PG_TIMEOUT', cast=int, default=30) * 1000}" + self.connection = psycopg2.connect(**single_config) + else: + self.connection = postgreSQL_pool.getconn() + + def __enter__(self): + if self.cursor is None: + self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor) + self.cursor.recreate = self.recreate_cursor + return self.cursor + + def __exit__(self, *args): + try: + self.connection.commit() + self.cursor.close() + if self.long_query or self.unlimited_query: + self.connection.close() + except Exception as error: + logging.error("Error while committing/closing PG-connection", error) + if str(error) == "connection already closed" \ + and not self.long_query \ + and not self.unlimited_query \ + and config('PG_POOL', cast=bool, default=True): + logging.info("Recreating the connexion pool") + make_pool() + else: + raise error + finally: + if config('PG_POOL', cast=bool, default=True) \ + and not self.long_query \ + and not self.unlimited_query: + postgreSQL_pool.putconn(self.connection) + + def recreate_cursor(self, rollback=False): + if rollback: + try: + self.connection.rollback() + except Exception as error: + logging.error("Error while rollbacking connection for recreation", error) + try: + self.cursor.close() + except Exception as error: + logging.error("Error while closing cursor for recreation", error) + self.cursor = None + return self.__enter__() + + +async def init(): + logging.info(f">PG_POOL:{config('PG_POOL', default=None)}") + if config('PG_POOL', cast=bool, default=True): + make_pool() + + +async def terminate(): + global postgreSQL_pool + if postgreSQL_pool is not None: + try: + postgreSQL_pool.closeall() + logging.info("Closed all connexions to PostgreSQL") + except (Exception, psycopg2.DatabaseError) as error: + logging.error("Error while closing all connexions to PostgreSQL", error) diff --git a/ee/recommendation/run.sh b/ee/recommendation/run.sh deleted file mode 100644 index e5f70a5a4..000000000 --- a/ee/recommendation/run.sh +++ /dev/null @@ -1,8 +0,0 @@ -mkdir tmp -cp ../api/chalicelib/utils/ch_client.py tmp -cp ../api/chalicelib/utils/events_queue.py tmp -cp ../../api/chalicelib/utils/pg_client.py tmp -docker build -t my_test . -rm tmp/*.py -rmdir tmp -docker run -d -p 8080:8080 my_test diff --git a/ee/recommendation/signals.sql b/ee/recommendation/signals.sql new file mode 100644 index 000000000..5500969ed --- /dev/null +++ b/ee/recommendation/signals.sql @@ -0,0 +1,11 @@ +CREATE TABLE IF NOT EXISTS frontend_signals +( + project_id bigint NOT NULL, + user_id text NOT NULL, + timestamp bigint NOT NULL, + action text NOT NULL, + source text NOT NULL, + category text NOT NULL, + data json +); +CREATE INDEX IF NOT EXISTS frontend_signals_user_id_idx ON frontend_signals (user_id);