Added airflow worker, scheduler, trigger, webserver

This commit is contained in:
MauricioGarciaS 2022-11-29 10:38:54 +01:00
parent e8edb34b9b
commit dcc8ef5fb0
10 changed files with 611 additions and 21 deletions

View file

@ -0,0 +1,14 @@
FROM apache/airflow:2.4.3
COPY requirements.txt .
USER root
RUN apt-get update \
&& apt-get install -y \
vim \
&& apt-get install gcc libc-dev g++ -y \
&& apt-get install -y pkg-config libxml2-dev libxmlsec1-dev libxmlsec1-openssl
USER airflow
RUN pip install --upgrade pip
RUN pip install -r requirements.txt

View file

@ -0,0 +1,42 @@
from datetime import datetime, timedelta
from textwrap import dedent
import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
import os
_work_dir = os.getcwd()
def my_function():
l = os.listdir('scripts')
print(l)
return l
dag = DAG(
"first_test",
default_args={
"depends_on_past": True,
"retries": 1,
"retry_delay": timedelta(minutes=3),
},
start_date=pendulum.datetime(2015, 12, 1, tz="UTC"),
description="My first test",
schedule="@daily",
catchup=False,
)
#assigning the task for our dag to do
with dag:
first_world = PythonOperator(
task_id='FirstTest',
python_callable=my_function,
)
hello_world = BashOperator(
task_id='OneTest',
bash_command=f'python {_work_dir}/scripts/task.py --mode train --kernel linear',
# provide_context=True
)
first_world >> hello_world

View file

@ -0,0 +1,284 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL.
#
# WARNING: This configuration is for local development. Do not use it in a production deployment.
#
# This configuration supports basic configuration using environment variables or an .env file
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow.
# Default: apache/airflow:2.4.3
# AIRFLOW_UID - User ID in Airflow containers
# Default: 50000
# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode
#
# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested).
# Default: airflow
# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account (if requested).
# Default: airflow
# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
# Default: ''
#
# Feel free to modify this file to suit your needs.
---
version: '3'
x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
# image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.4.3}
build: .
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
# For backward compatibility, with Airflow <2.3
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
AIRFLOW__CODE_EDITOR__ENABLED: 'true'
AIRFLOW__CODE_EDITOR__GIT_ENABLED: 'false'
AIRFLOW__CODE_EDITOR__STRING_NORMALIZATION: 'true'
AIRFLOW__CODE_EDITOR__MOUNT: '/opt/airflow/dags'
pg_user: airflow
pg_password: airflow
pg_dbname: airflow
pg_host: postgresql+psycopg2://airflow:airflow@postgres/airflow
pg_port: 5432
PG_TIMEOUT: 30
PG_POOL: 'true'
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
- ./scripts:/opt/airflow/scripts
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
redis:
image: redis:latest
expose:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
environment:
<<: *airflow-common-env
# Required to handle warm shutdown of the celery workers properly
# See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
DUMB_INIT_SETSID: "0"
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-triggerer:
<<: *airflow-common
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
# yamllint disable rule:line-length
command:
- -c
- |
function ver() {
printf "%04d%04d%04d%04d" $${1//./ }
}
register-python-argcomplete airflow >> ~/.bashrc
airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version)
airflow_version_comparable=$$(ver $${airflow_version})
min_airflow_version=2.2.0
min_airflow_version_comparable=$$(ver $${min_airflow_version})
if [[ -z "${AIRFLOW_UID}" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
echo "If you are on Linux, you SHOULD follow the instructions below to set "
echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
echo "For other operating systems you can get rid of the warning with manually created .env file:"
echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
echo
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
disk_available=$$(df / | tail -1 | awk '{print $$4}')
warning_resources="false"
if (( mem_available < 4000 )) ; then
echo
echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
echo
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
echo "At least 2 CPUs recommended. You have $${cpus_available}"
echo
warning_resources="true"
fi
if (( disk_available < one_meg * 10 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
echo
warning_resources="true"
fi
if [[ $${warning_resources} == "true" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
echo "Please follow the instructions to increase amount of resources available:"
echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
echo
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
# yamllint enable rule:line-length
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
_PIP_ADDITIONAL_REQUIREMENTS: ''
user: "0:0"
volumes:
- .:/sources
airflow-cli:
<<: *airflow-common
profiles:
- debug
environment:
<<: *airflow-common-env
CONNECTION_CHECK_MAX_COUNT: "0"
# Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
command:
- bash
- -c
- airflow
# You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up
# or by explicitly targeted on the command line e.g. docker-compose up flower.
# See: https://docs.docker.com/compose/profiles/
flower:
<<: *airflow-common
command: celery flower
profiles:
- flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
volumes:
postgres-db-volume:

View file

@ -3,21 +3,17 @@ urllib3==1.26.12
pyjwt==2.5.0
psycopg2-binary==2.9.3
numpy==1.23.4
numpy
threadpoolctl==3.1.0
joblib==1.2.0
scipy==1.9.3
scikit-learn==1.1.3
scipy
scikit-learn
apache-airflow==2.4.3
airflow-code-editor
fastapi==0.85.0
fastapi-utils
uvicorn[standard]==0.18.3
python-decouple==3.6
pydantic[email]==1.10.2
apscheduler==3.9.1
clickhouse-driver==0.2.4
python3-saml==1.14.0
python-multipart==0.0.5
python-decouple

View file

@ -2,6 +2,7 @@ from utils.ch_client import ClickHouseClient
from utils.pg_client import PostgresClient
def get_features_clickhouse(**kwargs):
"""Gets features from ClickHouse database"""
if 'limit' in kwargs:
limit = kwargs['limit']
else:
@ -16,6 +17,7 @@ ON T1.session_id = T2.session_id AND T1.project_id = T2.project_id;"""
def query_funnels(*kwargs):
"""Gets Funnels (PG database)"""
# If public.funnel is empty
funnels_query = f"""SELECT project_id, user_id, filter FROM (SELECT project_id, user_id, metric_id FROM public.metrics WHERE metric_type='funnel'
) as T1 LEFT JOIN (SELECT filter, metric_id FROM public.metric_series) as T2 ON T1.metric_id = T2.metric_id"""
@ -29,6 +31,7 @@ def query_funnels(*kwargs):
def query_metrics(*kwargs):
"""Gets Metrics (PG_database)"""
metrics_query = """SELECT metric_type, metric_of, metric_value, metric_format FROM public.metrics"""
with PostgresClient() as conn:
conn.execute(metrics_query)
@ -37,9 +40,10 @@ def query_metrics(*kwargs):
def query_with_filters(*kwargs):
"""Gets Metrics with filters (PG database)"""
filters_query = """SELECT T1.metric_id as metric_id, project_id, name, metric_type, metric_of, filter FROM (
SELECT metric_id, project_id, name, metric_type, metric_of FROM metric_series WHERE filter != '{}') as T1 INNER JOIN
(SELECT metric_id, filter FROM metrics) as T2 ON T1.metric_id = T2.metric_id"""
SELECT metric_id, project_id, name, metric_type, metric_of FROM metrics) as T1 INNER JOIN
(SELECT metric_id, filter FROM metric_series WHERE filter != '{}') as T2 ON T1.metric_id = T2.metric_id"""
with PostgresClient() as conn:
conn.execute(filters_query)
res = conn.fetchall()
@ -104,6 +108,10 @@ def get_by_user(data, user_id):
return [data[k] for k in index_]
def test():
print('One test')
if __name__ == '__main__':
data = get_features_clickhouse()
print('Data length:', len(data))
print('Just a test')
#data = get_features_clickhouse()
#print('Data length:', len(data))

View file

@ -0,0 +1,23 @@
import argparse
from core import recommendation
from utils import pg_client
import asyncio
#TODO: remove this module
import pandas as pd
parser = argparse.ArgumentParser(description='Handle machine learning inputs.')
parser.add_argument('--mode', choices=['train', 'test'], required=True, help='--mode sets the model in train or test mode')
parser.add_argument('--kernel', default='linear', help='--kernel set the kernel to be used for SVM')
args = parser.parse_args()
if __name__ == '__main__':
asyncio.run(pg_client.init())
data1 = recommendation.query_funnels()
print(pd.DataFrame(data1))
data2 = recommendation.query_with_filters()
print(pd.DataFrame(data2))
data3 = recommendation.query_metrics()
print(pd.DataFrame(data3))
print(args)

View file

@ -0,0 +1,54 @@
import logging
import clickhouse_driver
from decouple import config
logging.basicConfig(level=config("LOGLEVEL", default=logging.INFO))
settings = {}
if config('ch_timeout', cast=int, default=-1) > 0:
logging.info(f"CH-max_execution_time set to {config('ch_timeout')}s")
settings = {**settings, "max_execution_time": config('ch_timeout', cast=int)}
if config('ch_receive_timeout', cast=int, default=-1) > 0:
logging.info(f"CH-receive_timeout set to {config('ch_receive_timeout')}s")
settings = {**settings, "receive_timeout": config('ch_receive_timeout', cast=int)}
class ClickHouseClient:
__client = None
def __init__(self):
self.__client = clickhouse_driver.Client(host=config("ch_host"),
database="default",
port=config("ch_port", cast=int),
settings=settings) \
if self.__client is None else self.__client
def __enter__(self):
return self
def execute(self, query, params=None, **args):
try:
results = self.__client.execute(query=query, params=params, with_column_types=True, **args)
keys = tuple(x for x, y in results[1])
return [dict(zip(keys, i)) for i in results[0]]
except Exception as err:
logging.error("--------- CH QUERY EXCEPTION -----------")
logging.error(self.format(query=query, params=params))
logging.error("--------------------")
raise err
def insert(self, query, params=None, **args):
return self.__client.execute(query=query, params=params, **args)
def client(self):
return self.__client
def format(self, query, params):
if params is None:
return query
return self.__client.substitute_params(query, params, self.__client.connection.context)
def __exit__(self, *args):
pass

View file

@ -0,0 +1,166 @@
import logging
import time
from threading import Semaphore
import psycopg2
import psycopg2.extras
from decouple import config
from psycopg2 import pool
logging.basicConfig(level=config("LOGLEVEL", default=logging.INFO))
logging.getLogger('apscheduler').setLevel(config("LOGLEVEL", default=logging.INFO))
_PG_CONFIG = {"host": config("pg_host"),
"database": config("pg_dbname"),
"user": config("pg_user"),
"password": config("pg_password"),
"port": config("pg_port", cast=int),
"application_name": config("APP_NAME", default="PY")}
PG_CONFIG = dict(_PG_CONFIG)
if config("PG_TIMEOUT", cast=int, default=0) > 0:
PG_CONFIG["options"] = f"-c statement_timeout={config('PG_TIMEOUT', cast=int) * 1000}"
class ORThreadedConnectionPool(psycopg2.pool.ThreadedConnectionPool):
def __init__(self, minconn, maxconn, *args, **kwargs):
self._semaphore = Semaphore(maxconn)
super().__init__(minconn, maxconn, *args, **kwargs)
def getconn(self, *args, **kwargs):
self._semaphore.acquire()
try:
return super().getconn(*args, **kwargs)
except psycopg2.pool.PoolError as e:
if str(e) == "connection pool is closed":
make_pool()
raise e
def putconn(self, *args, **kwargs):
try:
super().putconn(*args, **kwargs)
self._semaphore.release()
except psycopg2.pool.PoolError as e:
if str(e) == "trying to put unkeyed connection":
print("!!! trying to put unkeyed connection")
print(f"env-PG_POOL:{config('PG_POOL', default=None)}")
return
raise e
postgreSQL_pool: ORThreadedConnectionPool = None
RETRY_MAX = config("PG_RETRY_MAX", cast=int, default=50)
RETRY_INTERVAL = config("PG_RETRY_INTERVAL", cast=int, default=2)
RETRY = 0
def make_pool():
if not config('PG_POOL', cast=bool, default=True):
return
global postgreSQL_pool
global RETRY
if postgreSQL_pool is not None:
try:
postgreSQL_pool.closeall()
except (Exception, psycopg2.DatabaseError) as error:
logging.error("Error while closing all connexions to PostgreSQL", error)
try:
postgreSQL_pool = ORThreadedConnectionPool(config("PG_MINCONN", cast=int, default=20),
config("PG_MAXCONN", cast=int, default=80),
**PG_CONFIG)
if (postgreSQL_pool):
logging.info("Connection pool created successfully")
except (Exception, psycopg2.DatabaseError) as error:
logging.error("Error while connecting to PostgreSQL", error)
if RETRY < RETRY_MAX:
RETRY += 1
logging.info(f"waiting for {RETRY_INTERVAL}s before retry n°{RETRY}")
time.sleep(RETRY_INTERVAL)
make_pool()
else:
raise error
class PostgresClient:
connection = None
cursor = None
long_query = False
unlimited_query = False
def __init__(self, long_query=False, unlimited_query=False):
self.long_query = long_query
self.unlimited_query = unlimited_query
if unlimited_query:
long_config = dict(_PG_CONFIG)
long_config["application_name"] += "-UNLIMITED"
self.connection = psycopg2.connect(**long_config)
elif long_query:
long_config = dict(_PG_CONFIG)
long_config["application_name"] += "-LONG"
long_config["options"] = f"-c statement_timeout=" \
f"{config('pg_long_timeout', cast=int, default=5 * 60) * 1000}"
self.connection = psycopg2.connect(**long_config)
elif not config('PG_POOL', cast=bool, default=True):
single_config = dict(_PG_CONFIG)
single_config["application_name"] += "-NOPOOL"
single_config["options"] = f"-c statement_timeout={config('PG_TIMEOUT', cast=int, default=30) * 1000}"
self.connection = psycopg2.connect(**single_config)
else:
self.connection = postgreSQL_pool.getconn()
def __enter__(self):
if self.cursor is None:
self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
self.cursor.recreate = self.recreate_cursor
return self.cursor
def __exit__(self, *args):
try:
self.connection.commit()
self.cursor.close()
if self.long_query or self.unlimited_query:
self.connection.close()
except Exception as error:
logging.error("Error while committing/closing PG-connection", error)
if str(error) == "connection already closed" \
and not self.long_query \
and not self.unlimited_query \
and config('PG_POOL', cast=bool, default=True):
logging.info("Recreating the connexion pool")
make_pool()
else:
raise error
finally:
if config('PG_POOL', cast=bool, default=True) \
and not self.long_query \
and not self.unlimited_query:
postgreSQL_pool.putconn(self.connection)
def recreate_cursor(self, rollback=False):
if rollback:
try:
self.connection.rollback()
except Exception as error:
logging.error("Error while rollbacking connection for recreation", error)
try:
self.cursor.close()
except Exception as error:
logging.error("Error while closing cursor for recreation", error)
self.cursor = None
return self.__enter__()
async def init():
logging.info(f">PG_POOL:{config('PG_POOL', default=None)}")
if config('PG_POOL', cast=bool, default=True):
make_pool()
async def terminate():
global postgreSQL_pool
if postgreSQL_pool is not None:
try:
postgreSQL_pool.closeall()
logging.info("Closed all connexions to PostgreSQL")
except (Exception, psycopg2.DatabaseError) as error:
logging.error("Error while closing all connexions to PostgreSQL", error)

View file

@ -1,8 +0,0 @@
mkdir tmp
cp ../api/chalicelib/utils/ch_client.py tmp
cp ../api/chalicelib/utils/events_queue.py tmp
cp ../../api/chalicelib/utils/pg_client.py tmp
docker build -t my_test .
rm tmp/*.py
rmdir tmp
docker run -d -p 8080:8080 my_test

View file

@ -0,0 +1,11 @@
CREATE TABLE IF NOT EXISTS frontend_signals
(
project_id bigint NOT NULL,
user_id text NOT NULL,
timestamp bigint NOT NULL,
action text NOT NULL,
source text NOT NULL,
category text NOT NULL,
data json
);
CREATE INDEX IF NOT EXISTS frontend_signals_user_id_idx ON frontend_signals (user_id);