Airflow setup and workflow templates for training in script folder

This commit is contained in:
MauricioGarciaS 2022-12-12 14:35:41 +01:00
parent 77536c3153
commit 574af2588d
8 changed files with 183 additions and 38 deletions

View file

@ -36,7 +36,11 @@ with dag:
)
hello_world = BashOperator(
task_id='OneTest',
bash_command=f'python {_work_dir}/scripts/task.py --mode train --kernel linear',
bash_command=f'python {_work_dir}/scripts/processing.py --batch_size 500',
# provide_context=True
)
first_world >> hello_world
this_world = BashOperator(
task_id='ThisTest',
bash_command=f'python {_work_dir}/scripts/task.py --mode train --kernel linear',
)
first_world >> hello_world >> this_world

View file

@ -58,23 +58,24 @@ x-airflow-common:
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
_PIP_ADDITIONAL_REQUIREMENTS: 'argcomplete'
AIRFLOW__CODE_EDITOR__ENABLED: 'true'
AIRFLOW__CODE_EDITOR__GIT_ENABLED: 'false'
AIRFLOW__CODE_EDITOR__STRING_NORMALIZATION: 'true'
AIRFLOW__CODE_EDITOR__MOUNT: '/opt/airflow/dags'
pg_user: airflow
pg_password: airflow
pg_dbname: airflow
pg_host: postgresql+psycopg2://airflow:airflow@postgres/airflow
pg_port: 5432
PG_TIMEOUT: 30
PG_POOL: 'true'
pg_user: "${pg_user}"
pg_password: "${pg_password}"
pg_dbname: "${pg_dbname}"
pg_host: "${pg_host}"
pg_port: "${pg_port}"
PG_TIMEOUT: "${PG_TIMEOUT}"
PG_POOL: "${PG_POOL}"
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
- ./scripts:/opt/airflow/scripts
- ./cache:/opt/airflow/cache
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on

View file

@ -8,6 +8,7 @@ threadpoolctl==3.1.0
joblib==1.2.0
scipy
scikit-learn
mlflow
airflow-code-editor
@ -17,3 +18,5 @@ clickhouse-driver==0.2.4
python3-saml==1.14.0
python-multipart==0.0.5
python-decouple
argcomplete

View file

@ -16,7 +16,16 @@ ON T1.session_id = T2.session_id AND T1.project_id = T2.project_id;"""
return res
def query_funnels(*kwargs):
def get_features_postgres(**kwargs):
with PostgresClient() as conn:
funnels = query_funnels(conn, **kwargs)
metrics = query_metrics(conn, **kwargs)
filters = query_with_filters(conn, **kwargs)
return funnels, metrics, filters
def query_funnels(conn, **kwargs):
"""Gets Funnels (PG database)"""
# If public.funnel is empty
funnels_query = f"""SELECT project_id, user_id, filter FROM (SELECT project_id, user_id, metric_id FROM public.metrics WHERE metric_type='funnel'
@ -24,29 +33,26 @@ def query_funnels(*kwargs):
# Else
# funnels_query = "SELECT project_id, user_id, filter FROM public.funnels"
with PostgresClient() as conn:
conn.execute(funnels_query)
res = conn.fetchall()
conn.execute(funnels_query)
res = conn.fetchall()
return res
def query_metrics(*kwargs):
def query_metrics(conn, **kwargs):
"""Gets Metrics (PG_database)"""
metrics_query = """SELECT metric_type, metric_of, metric_value, metric_format FROM public.metrics"""
with PostgresClient() as conn:
conn.execute(metrics_query)
res = conn.fetchall()
conn.execute(metrics_query)
res = conn.fetchall()
return res
def query_with_filters(*kwargs):
def query_with_filters(conn, **kwargs):
"""Gets Metrics with filters (PG database)"""
filters_query = """SELECT T1.metric_id as metric_id, project_id, name, metric_type, metric_of, filter FROM (
SELECT metric_id, project_id, name, metric_type, metric_of FROM metrics) as T1 INNER JOIN
(SELECT metric_id, filter FROM metric_series WHERE filter != '{}') as T2 ON T1.metric_id = T2.metric_id"""
with PostgresClient() as conn:
conn.execute(filters_query)
res = conn.fetchall()
conn.execute(filters_query)
res = conn.fetchall()
return res
@ -83,10 +89,6 @@ def transform_with_filter(data, *kwargs):
return res
def transform_data():
pass
def transform(element):
key_ = element.pop('user_id')
secondary_key_ = element.pop('session_id')

View file

@ -0,0 +1,15 @@
from sklearn.svm import SVC
class SVM_recommendation():
def __init__(**params):
f"""{SVC.__doc__}"""
self.svm = SVC(params)
def fit(self, X1=None, X2=None):
assert X1 is not None or X2 is not None, 'X1 or X2 must be given'
self.svm.fit(X1)
self.svm.fit(X2)
def predict(self, X):
return self.svm.predict(X)

View file

@ -0,0 +1,60 @@
import mlflow
##
import numpy as np
import pickle
from sklearn import datasets, linear_model
from sklearn.metrics import mean_squared_error, r2_score
# source: https://scikit-learn.org/stable/auto_examples/linear_model/plot_ols.html
# Load the diabetes dataset
diabetes_X, diabetes_y = datasets.load_diabetes(return_X_y=True)
# Use only one feature
diabetes_X = diabetes_X[:, np.newaxis, 2]
# Split the data into training/testing sets
diabetes_X_train = diabetes_X[:-20]
diabetes_X_test = diabetes_X[-20:]
# Split the targets into training/testing sets
diabetes_y_train = diabetes_y[:-20]
diabetes_y_test = diabetes_y[-20:]
def print_predictions(m, y_pred):
# The coefficients
print('Coefficients: \n', m.coef_)
# The mean squared error
print('Mean squared error: %.2f'
% mean_squared_error(diabetes_y_test, y_pred))
# The coefficient of determination: 1 is perfect prediction
print('Coefficient of determination: %.2f'
% r2_score(diabetes_y_test, y_pred))
# Create linear regression object
lr_model = linear_model.LinearRegression()
# Train the model using the training sets
lr_model.fit(diabetes_X_train, diabetes_y_train)
# Make predictions using the testing set
diabetes_y_pred = lr_model.predict(diabetes_X_test)
print_predictions(lr_model, diabetes_y_pred)
# save the model in the native sklearn format
filename = 'lr_model.pkl'
pickle.dump(lr_model, open(filename, 'wb'))
##
# load the model into memory
loaded_model = pickle.load(open(filename, 'rb'))
# log and register the model using MLflow scikit-learn API
mlflow.set_tracking_uri("postgresql+psycopg2://airflow:airflow@postgres/mlruns")
reg_model_name = "SklearnLinearRegression"
print("--")
mlflow.sklearn.log_model(loaded_model, "sk_learn",
serialization_format="cloudpickle",
registered_model_name=reg_model_name)

View file

@ -0,0 +1,42 @@
import time
import argparse
from core import features
from utils import pg_client
import multiprocessing as mp
from decouple import config
import asyncio
import pandas
def features_ch(q):
q.put(features.get_features_clickhouse())
def features_pg(q):
q.put(features.get_features_postgres())
def get_features():
#mp.set_start_method('spawn')
#q = mp.Queue()
#p1 = mp.Process(target=features_ch, args=(q,))
#p1.start()
pg_features = features.get_features_postgres()
ch_features = []#p1.join()
return [pg_features, ch_features]
parser = argparse.ArgumentParser(description='Gets and process data from Postgres and ClickHouse.')
parser.add_argument('--batch_size', type=int, required=True, help='--batch_size max size of columns per file to be saved in opt/airflow/cache')
args = parser.parse_args()
if __name__ == '__main__':
asyncio.run(pg_client.init())
print(args)
t1 = time.time()
data = get_features()
#print(data)
cache_dir = config("data_dir", default=f"/opt/airflow/cache")
for d in data[0]:
pandas.DataFrame(d).to_csv(f'{cache_dir}/tmp-{hash(time.time())}', sep=',')
t2 = time.time()
print(f'DONE! information retrieved in {t2-t1: .2f} seconds')

View file

@ -1,10 +1,29 @@
import time
import argparse
from core import recommendation
from utils import pg_client
import asyncio
from decouple import config
from core import recommendation_model
import pandas
import json
import os
def transform_dict_string(s_dicts):
data = list()
for s_dict in s_dicts:
data.append(json.loads(s_dict.replace("'", '"').replace('None','null').replace('False','false')))
return data
def process_file(file_name):
return pandas.read_csv(file_name, sep=",")
def read_batches():
base_dir = config('dir_path', default='/opt/airflow/cache')
files = os.listdir(base_dir)
for file in files:
yield process_file(f'{base_dir}/{file}')
#TODO: remove this module
import pandas as pd
parser = argparse.ArgumentParser(description='Handle machine learning inputs.')
parser.add_argument('--mode', choices=['train', 'test'], required=True, help='--mode sets the model in train or test mode')
@ -13,11 +32,10 @@ parser.add_argument('--kernel', default='linear', help='--kernel set the kernel
args = parser.parse_args()
if __name__ == '__main__':
asyncio.run(pg_client.init())
data1 = recommendation.query_funnels()
print(pd.DataFrame(data1))
data2 = recommendation.query_with_filters()
print(pd.DataFrame(data2))
data3 = recommendation.query_metrics()
print(pd.DataFrame(data3))
print(args)
t1 = time.time()
buff = read_batches()
for b in buff:
print(b.head())
t2 = time.time()
print(f'DONE! information retrieved in {t2-t1: .2f} seconds')