Merge pull request #882 from openreplay/recommendation

Recommendation
This commit is contained in:
Kraiem Taha Yassine 2022-12-13 18:12:30 +01:00 committed by GitHub
commit 20539254b1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 1093 additions and 5 deletions

View file

@ -12,9 +12,11 @@ from starlette.responses import StreamingResponse, JSONResponse
from chalicelib.core import traces
from chalicelib.utils import helper
from chalicelib.utils import pg_client
from chalicelib.utils import events_queue
from routers import core, core_dynamic, ee, saml
from routers.crons import core_crons
from routers.crons import core_dynamic_crons
from routers.crons import ee_crons
from routers.subs import dashboard, insights, metrics, v1_api_ee
from routers.subs import v1_api
@ -81,9 +83,10 @@ app.queue_system = queue.Queue()
async def startup():
logging.info(">>>>> starting up <<<<<")
await pg_client.init()
await events_queue.init()
app.schedule.start()
for job in core_crons.cron_jobs + core_dynamic_crons.cron_jobs + traces.cron_jobs:
for job in core_crons.cron_jobs + core_dynamic_crons.cron_jobs + traces.cron_jobs + ee_crons.ee_cron_jobs:
app.schedule.add_job(id=job["func"].__name__, **job)
ap_logger.info(">Scheduled jobs:")
@ -96,6 +99,7 @@ async def shutdown():
logging.info(">>>>> shutting down <<<<<")
app.schedule.shutdown(wait=True)
await traces.process_traces_queue()
await events_queue.terminate()
await pg_client.terminate()

View file

@ -0,0 +1,12 @@
import schemas_ee
import logging
from chalicelib.utils import events_queue
def handle_frontend_signals_queued(project_id: int, user_id: int, data: schemas_ee.SignalsSchema):
try:
events_queue.global_queue.put((project_id, user_id, data))
return {'data': 'insertion succeded'}
except Exception as e:
logging.info(f'Error while inserting: {e}')
return {'errors': [e]}

View file

@ -0,0 +1,80 @@
import json
import queue
import logging
from chalicelib.utils import pg_client
global_queue = None
class EventQueue():
def __init__(self, test=False, queue_max_length=100):
self.events = queue.Queue()
self.events.maxsize = queue_max_length
self.test = test
def flush(self, conn):
events = list()
params = dict()
# while not self.events.empty():
# project_id, user_id, element = self.events.get()
# events.append("({project_id}, {user_id}, {timestamp}, '{action}', '{source}', '{category}', '{data}')".format(
# project_id=project_id, user_id=user_id, timestamp=element.timestamp, action=element.action, source=element.source, category=element.category, data=json.dumps(element.data)))
i = 0
while not self.events.empty():
project_id, user_id, element = self.events.get()
params[f'project_id_{i}'] = project_id
params[f'user_id_{i}'] = user_id
for _key, _val in element.dict().items():
if _key == 'data':
params[f'{_key}_{i}'] = json.dumps(_val)
else:
params[f'{_key}_{i}'] = _val
events.append(f"(%(project_id_{i})s, %(user_id_{i})s, %(timestamp_{i})s, %(action_{i})s, %(source_{i})s, %(category_{i})s, %(data_{i})s::jsonb)")
i += 1
if i == 0:
return 0
if self.test:
print(events)
return 1
conn.execute(
conn.mogrify(f"""INSERT INTO public.frontend_signals (project_id, user_id, timestamp, action, source, category, data)
VALUES {' , '.join(events)}""", params)
)
return 1
def force_flush(self):
if not self.events.empty():
try:
with pg_client.PostgresClient() as conn:
self.flush(conn)
except Exception as e:
logging.info(f'Error: {e}')
def put(self, element):
if self.events.full():
try:
with pg_client.PostgresClient() as conn:
self.flush(conn)
except Exception as e:
logging.info(f'Error: {e}')
self.events.put(element)
self.events.task_done()
async def init(test=False):
global global_queue
global_queue = EventQueue(test=test)
logging.info("> queue initialized")
async def terminate():
global global_queue
if global_queue is not None:
global_queue.force_flush()
logging.info('> queue fulshed')
# def __process_schema(trace):
# data = trace.dict()
# data["parameters"] = json.dumps(trace.parameters) if trace.parameters is not None and len(
# trace.parameters.keys()) > 0 else None
# data["payload"] = json.dumps(trace.payload) if trace.payload is not None and len(trace.payload.keys()) > 0 else None
# return data

View file

@ -435,3 +435,4 @@ def get_all_notes(projectId: int, data: schemas.SearchNoteSchema = Body(...),
if "errors" in data:
return data
return {'data': data}

View file

@ -23,13 +23,14 @@ def unlock_cron() -> None:
cron_jobs = [
{"func": unlock_cron, "trigger": "cron", "hour": "*"}
{"func": unlock_cron, "trigger": "cron", "hour": "*"},
]
SINGLE_CRONS = [{"func": telemetry_cron, "trigger": "cron", "day_of_week": "*"},
{"func": run_scheduled_jobs, "trigger": "interval", "seconds": 60, "misfire_grace_time": 20},
{"func": weekly_report, "trigger": "cron", "day_of_week": "mon", "hour": 5,
"misfire_grace_time": 60 * 60}]
"misfire_grace_time": 60 * 60}
]
if config("LOCAL_CRONS", default=False, cast=bool):
cron_jobs += SINGLE_CRONS

View file

@ -0,0 +1,10 @@
from chalicelib.utils import events_queue
def pg_events_queue() -> None:
events_queue.global_queue.force_flush()
ee_cron_jobs = [
{"func": pg_events_queue, "trigger": "interval", "seconds": 60*5, "misfire_grace_time": 20},
]

View file

@ -1,5 +1,5 @@
from chalicelib.core import roles, traces
from chalicelib.core import unlock
from chalicelib.core import unlock, signals
from chalicelib.utils import assist_helper
unlock.check()
@ -71,3 +71,13 @@ def get_trails(data: schemas_ee.TrailSearchPayloadSchema = Body(...),
@app.post('/trails/actions', tags=["traces", "trails"])
def get_available_trail_actions(context: schemas.CurrentContext = Depends(OR_context)):
return {'data': traces.get_available_actions(tenant_id=context.tenant_id)}
@app.post('/{projectId}/signals', tags=['signals'])
def send_interactions(projectId: int, data: schemas_ee.SignalsSchema = Body(...),
context: schemas.CurrentContext = Depends(OR_context)):
data = signals.handle_frontend_signals_queued(project_id=projectId, user_id=context.user_id, data=data)
if "errors" in data:
return data
return {'data': data}

View file

@ -31,6 +31,14 @@ class RolePayloadSchema(BaseModel):
alias_generator = schemas.attribute_to_camel_case
class SignalsSchema(BaseModel):
timestamp: int = Field(...)
action: str = Field(...)
source: str = Field(...)
category: str = Field(...)
data: dict = Field(default={})
class CreateMemberSchema(schemas.CreateMemberSchema):
roleId: Optional[int] = Field(None)

View file

@ -0,0 +1,14 @@
FROM apache/airflow:2.4.3
COPY requirements.txt .
USER root
RUN apt-get update \
&& apt-get install -y \
vim \
&& apt-get install gcc libc-dev g++ -y \
&& apt-get install -y pkg-config libxml2-dev libxmlsec1-dev libxmlsec1-openssl
USER airflow
RUN pip install --upgrade pip
RUN pip install -r requirements.txt

View file

@ -0,0 +1 @@
docker-compose down --volumes --rmi all

View file

@ -0,0 +1,46 @@
from datetime import datetime, timedelta
from textwrap import dedent
import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
import os
_work_dir = os.getcwd()
def my_function():
l = os.listdir('scripts')
print(l)
return l
dag = DAG(
"first_test",
default_args={
"depends_on_past": True,
"retries": 1,
"retry_delay": timedelta(minutes=3),
},
start_date=pendulum.datetime(2015, 12, 1, tz="UTC"),
description="My first test",
schedule="@daily",
catchup=False,
)
#assigning the task for our dag to do
with dag:
first_world = PythonOperator(
task_id='FirstTest',
python_callable=my_function,
)
hello_world = BashOperator(
task_id='OneTest',
bash_command=f'python {_work_dir}/scripts/processing.py --batch_size 500',
# provide_context=True
)
this_world = BashOperator(
task_id='ThisTest',
bash_command=f'python {_work_dir}/scripts/task.py --mode train --kernel linear',
)
first_world >> hello_world >> this_world

View file

@ -0,0 +1,285 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL.
#
# WARNING: This configuration is for local development. Do not use it in a production deployment.
#
# This configuration supports basic configuration using environment variables or an .env file
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow.
# Default: apache/airflow:2.4.3
# AIRFLOW_UID - User ID in Airflow containers
# Default: 50000
# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode
#
# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested).
# Default: airflow
# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account (if requested).
# Default: airflow
# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
# Default: ''
#
# Feel free to modify this file to suit your needs.
---
version: '3'
x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
# image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.4.3}
build: .
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
# For backward compatibility, with Airflow <2.3
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: 'argcomplete'
AIRFLOW__CODE_EDITOR__ENABLED: 'true'
AIRFLOW__CODE_EDITOR__GIT_ENABLED: 'false'
AIRFLOW__CODE_EDITOR__STRING_NORMALIZATION: 'true'
AIRFLOW__CODE_EDITOR__MOUNT: '/opt/airflow/dags'
pg_user: "${pg_user}"
pg_password: "${pg_password}"
pg_dbname: "${pg_dbname}"
pg_host: "${pg_host}"
pg_port: "${pg_port}"
PG_TIMEOUT: "${PG_TIMEOUT}"
PG_POOL: "${PG_POOL}"
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
- ./scripts:/opt/airflow/scripts
- ./cache:/opt/airflow/cache
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
redis:
image: redis:latest
expose:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
environment:
<<: *airflow-common-env
# Required to handle warm shutdown of the celery workers properly
# See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
DUMB_INIT_SETSID: "0"
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-triggerer:
<<: *airflow-common
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
# yamllint disable rule:line-length
command:
- -c
- |
function ver() {
printf "%04d%04d%04d%04d" $${1//./ }
}
register-python-argcomplete airflow >> ~/.bashrc
airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version)
airflow_version_comparable=$$(ver $${airflow_version})
min_airflow_version=2.2.0
min_airflow_version_comparable=$$(ver $${min_airflow_version})
if [[ -z "${AIRFLOW_UID}" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
echo "If you are on Linux, you SHOULD follow the instructions below to set "
echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
echo "For other operating systems you can get rid of the warning with manually created .env file:"
echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
echo
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
disk_available=$$(df / | tail -1 | awk '{print $$4}')
warning_resources="false"
if (( mem_available < 4000 )) ; then
echo
echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
echo
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
echo "At least 2 CPUs recommended. You have $${cpus_available}"
echo
warning_resources="true"
fi
if (( disk_available < one_meg * 10 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
echo
warning_resources="true"
fi
if [[ $${warning_resources} == "true" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
echo "Please follow the instructions to increase amount of resources available:"
echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
echo
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
# yamllint enable rule:line-length
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
_PIP_ADDITIONAL_REQUIREMENTS: ''
user: "0:0"
volumes:
- .:/sources
airflow-cli:
<<: *airflow-common
profiles:
- debug
environment:
<<: *airflow-common-env
CONNECTION_CHECK_MAX_COUNT: "0"
# Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
command:
- bash
- -c
- airflow
# You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up
# or by explicitly targeted on the command line e.g. docker-compose up flower.
# See: https://docs.docker.com/compose/profiles/
flower:
<<: *airflow-common
command: celery flower
profiles:
- flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
volumes:
postgres-db-volume:

View file

@ -0,0 +1,22 @@
requests==2.28.1
urllib3==1.26.12
pyjwt==2.5.0
psycopg2-binary==2.9.3
numpy
threadpoolctl==3.1.0
joblib==1.2.0
scipy
scikit-learn
mlflow
airflow-code-editor
pydantic[email]==1.10.2
clickhouse-driver==0.2.4
python3-saml==1.14.0
python-multipart==0.0.5
python-decouple
argcomplete

11
ee/recommendation/run.sh Normal file
View file

@ -0,0 +1,11 @@
echo 'Setting up required modules..'
mkdir scripts
mkdir plugins
mkdir logs
mkdir scripts/utils
cp ../../api/chalicelib/utils/pg_client.py scripts/utils
cp ../api/chalicelib/utils/ch_client.py scripts/utils
echo 'Building containers...'
docker-compose up airflow-init
echo 'Running containers...'
docker-compose up

View file

@ -0,0 +1,161 @@
from utils.ch_client import ClickHouseClient
from utils.pg_client import PostgresClient
def get_features_clickhouse(**kwargs):
"""Gets features from ClickHouse database"""
if 'limit' in kwargs:
limit = kwargs['limit']
else:
limit = 500
query = f"""SELECT session_id, project_id, user_id, events_count, errors_count, duration, country, issue_score, device_type, rage, jsexception, badrequest FROM (
SELECT session_id, project_id, user_id, events_count, errors_count, duration, toInt8(user_country) as country, issue_score, toInt8(user_device_type) as device_type FROM experimental.sessions WHERE user_id IS NOT NULL) as T1
INNER JOIN (SELECT session_id, project_id, sum(issue_type = 'click_rage') as rage, sum(issue_type = 'js_exception') as jsexception, sum(issue_type = 'bad_request') as badrequest FROM experimental.events WHERE event_type = 'ISSUE' AND session_id > 0 GROUP BY session_id, project_id LIMIT {limit}) as T2
ON T1.session_id = T2.session_id AND T1.project_id = T2.project_id;"""
with ClickHouseClient() as conn:
res = conn.execute(query)
return res
def get_features_postgres(**kwargs):
with PostgresClient() as conn:
funnels = query_funnels(conn, **kwargs)
metrics = query_metrics(conn, **kwargs)
filters = query_with_filters(conn, **kwargs)
#clean_filters(funnels)
#clean_filters(filters)
return clean_filters_split(funnels, isfunnel=True), metrics, clean_filters_split(filters)
def query_funnels(conn, **kwargs):
"""Gets Funnels (PG database)"""
# If public.funnel is empty
funnels_query = f"""SELECT project_id, user_id, filter FROM (SELECT project_id, user_id, metric_id FROM public.metrics WHERE metric_type='funnel'
) as T1 LEFT JOIN (SELECT filter, metric_id FROM public.metric_series) as T2 ON T1.metric_id = T2.metric_id"""
# Else
# funnels_query = "SELECT project_id, user_id, filter FROM public.funnels"
conn.execute(funnels_query)
res = conn.fetchall()
return res
def query_metrics(conn, **kwargs):
"""Gets Metrics (PG_database)"""
metrics_query = """SELECT metric_type, metric_of, metric_value, metric_format FROM public.metrics"""
conn.execute(metrics_query)
res = conn.fetchall()
return res
def query_with_filters(conn, **kwargs):
"""Gets Metrics with filters (PG database)"""
filters_query = """SELECT T1.metric_id as metric_id, project_id, name, metric_type, metric_of, filter FROM (
SELECT metric_id, project_id, name, metric_type, metric_of FROM metrics) as T1 INNER JOIN
(SELECT metric_id, filter FROM metric_series WHERE filter != '{}') as T2 ON T1.metric_id = T2.metric_id"""
conn.execute(filters_query)
res = conn.fetchall()
return res
def transform_funnel(project_id, user_id, data):
res = list()
for k in range(len(data)):
_tmp = data[k]
if _tmp['project_id'] != project_id or _tmp['user_id'] != user_id:
continue
else:
_tmp = _tmp['filter']['events']
res.append(_tmp)
return res
def transform_with_filter(data, *kwargs):
res = list()
for k in range(len(data)):
_tmp = data[k]
jump = False
for _key in kwargs.keys():
if data[_key] != kwargs[_key]:
jump = True
break
if jump:
continue
_type = data['metric_type']
if _type == 'funnel':
res.append(['funnel', _tmp['filter']['events']])
elif _type == 'timeseries':
res.append(['timeseries', _tmp['filter']['filters'], _tmp['filter']['events']])
elif _type == 'table':
res.append(['table', _tmp['metric_of'], _tmp['filter']['events']])
return res
def transform(element):
key_ = element.pop('user_id')
secondary_key_ = element.pop('session_id')
context_ = element.pop('project_id')
features_ = element
del element
return {(key_, context_): {secondary_key_: list(features_.values())}}
def get_by_project(data, project_id):
head_ = [list(d.keys())[0][1] for d in data]
index_ = [k for k in range(len(head_)) if head_[k] == project_id]
return [data[k] for k in index_]
def get_by_user(data, user_id):
head_ = [list(d.keys())[0][0] for d in data]
index_ = [k for k in range(len(head_)) if head_[k] == user_id]
return [data[k] for k in index_]
def clean_filters(data):
for j in range(len(data)):
_filter = data[j]['filter']
_tmp = list()
for i in range(len(_filter['filters'])):
if 'value' in _filter['filters'][i].keys():
_tmp.append({'type': _filter['filters'][i]['type'],
'value': _filter['filters'][i]['value'],
'operator': _filter['filters'][i]['operator']})
data[j]['filter'] = _tmp
def clean_filters_split(data, isfunnel=False):
_data = list()
for j in range(len(data)):
_filter = data[j]['filter']
_tmp = list()
for i in range(len(_filter['filters'])):
if 'value' in _filter['filters'][i].keys():
_type = _filter['filters'][i]['type']
_value = _filter['filters'][i]['value']
if isinstance(_value, str):
_value = [_value]
_operator = _filter['filters'][i]['operator']
if isfunnel:
_data.append({'project_id': data[j]['project_id'], 'user_id': data[j]['user_id'],
'type': _type,
'value': _value,
'operator': _operator
})
else:
_data.append({'metric_id': data[j]['metric_id'], 'project_id': data[j]['project_id'],
'name': data[j]['name'], 'metric_type': data[j]['metric_type'],
'metric_of': data[j]['metric_of'],
'type': _type,
'value': _value,
'operator': _operator
})
return _data
def test():
print('One test')
if __name__ == '__main__':
print('Just a test')
#data = get_features_clickhouse()
#print('Data length:', len(data))

View file

@ -0,0 +1,15 @@
from sklearn.svm import SVC
class SVM_recommendation():
def __init__(**params):
f"""{SVC.__doc__}"""
self.svm = SVC(params)
def fit(self, X1=None, X2=None):
assert X1 is not None or X2 is not None, 'X1 or X2 must be given'
self.svm.fit(X1)
self.svm.fit(X2)
def predict(self, X):
return self.svm.predict(X)

View file

@ -0,0 +1,60 @@
import mlflow
##
import numpy as np
import pickle
from sklearn import datasets, linear_model
from sklearn.metrics import mean_squared_error, r2_score
# source: https://scikit-learn.org/stable/auto_examples/linear_model/plot_ols.html
# Load the diabetes dataset
diabetes_X, diabetes_y = datasets.load_diabetes(return_X_y=True)
# Use only one feature
diabetes_X = diabetes_X[:, np.newaxis, 2]
# Split the data into training/testing sets
diabetes_X_train = diabetes_X[:-20]
diabetes_X_test = diabetes_X[-20:]
# Split the targets into training/testing sets
diabetes_y_train = diabetes_y[:-20]
diabetes_y_test = diabetes_y[-20:]
def print_predictions(m, y_pred):
# The coefficients
print('Coefficients: \n', m.coef_)
# The mean squared error
print('Mean squared error: %.2f'
% mean_squared_error(diabetes_y_test, y_pred))
# The coefficient of determination: 1 is perfect prediction
print('Coefficient of determination: %.2f'
% r2_score(diabetes_y_test, y_pred))
# Create linear regression object
lr_model = linear_model.LinearRegression()
# Train the model using the training sets
lr_model.fit(diabetes_X_train, diabetes_y_train)
# Make predictions using the testing set
diabetes_y_pred = lr_model.predict(diabetes_X_test)
print_predictions(lr_model, diabetes_y_pred)
# save the model in the native sklearn format
filename = 'lr_model.pkl'
pickle.dump(lr_model, open(filename, 'wb'))
##
# load the model into memory
loaded_model = pickle.load(open(filename, 'rb'))
# log and register the model using MLflow scikit-learn API
mlflow.set_tracking_uri("postgresql+psycopg2://airflow:airflow@postgres/mlruns")
reg_model_name = "SklearnLinearRegression"
print("--")
mlflow.sklearn.log_model(loaded_model, "sk_learn",
serialization_format="cloudpickle",
registered_model_name=reg_model_name)

View file

@ -0,0 +1,42 @@
import time
import argparse
from core import features
from utils import pg_client
import multiprocessing as mp
from decouple import config
import asyncio
import pandas
def features_ch(q):
q.put(features.get_features_clickhouse())
def features_pg(q):
q.put(features.get_features_postgres())
def get_features():
#mp.set_start_method('spawn')
#q = mp.Queue()
#p1 = mp.Process(target=features_ch, args=(q,))
#p1.start()
pg_features = features.get_features_postgres()
ch_features = []#p1.join()
return [pg_features, ch_features]
parser = argparse.ArgumentParser(description='Gets and process data from Postgres and ClickHouse.')
parser.add_argument('--batch_size', type=int, required=True, help='--batch_size max size of columns per file to be saved in opt/airflow/cache')
args = parser.parse_args()
if __name__ == '__main__':
asyncio.run(pg_client.init())
print(args)
t1 = time.time()
data = get_features()
#print(data)
cache_dir = config("data_dir", default=f"/opt/airflow/cache")
for d in data[0]:
pandas.DataFrame(d).to_csv(f'{cache_dir}/tmp-{hash(time.time())}', sep=',')
t2 = time.time()
print(f'DONE! information retrieved in {t2-t1: .2f} seconds')

View file

@ -0,0 +1,41 @@
import time
import argparse
from decouple import config
from core import recommendation_model
import pandas
import json
import os
def transform_dict_string(s_dicts):
data = list()
for s_dict in s_dicts:
data.append(json.loads(s_dict.replace("'", '"').replace('None','null').replace('False','false')))
return data
def process_file(file_name):
return pandas.read_csv(file_name, sep=",")
def read_batches():
base_dir = config('dir_path', default='/opt/airflow/cache')
files = os.listdir(base_dir)
for file in files:
yield process_file(f'{base_dir}/{file}')
parser = argparse.ArgumentParser(description='Handle machine learning inputs.')
parser.add_argument('--mode', choices=['train', 'test'], required=True, help='--mode sets the model in train or test mode')
parser.add_argument('--kernel', default='linear', help='--kernel set the kernel to be used for SVM')
args = parser.parse_args()
if __name__ == '__main__':
print(args)
t1 = time.time()
buff = read_batches()
for b in buff:
print(b.head())
t2 = time.time()
print(f'DONE! information retrieved in {t2-t1: .2f} seconds')

View file

@ -0,0 +1,54 @@
import logging
import clickhouse_driver
from decouple import config
logging.basicConfig(level=config("LOGLEVEL", default=logging.INFO))
settings = {}
if config('ch_timeout', cast=int, default=-1) > 0:
logging.info(f"CH-max_execution_time set to {config('ch_timeout')}s")
settings = {**settings, "max_execution_time": config('ch_timeout', cast=int)}
if config('ch_receive_timeout', cast=int, default=-1) > 0:
logging.info(f"CH-receive_timeout set to {config('ch_receive_timeout')}s")
settings = {**settings, "receive_timeout": config('ch_receive_timeout', cast=int)}
class ClickHouseClient:
__client = None
def __init__(self):
self.__client = clickhouse_driver.Client(host=config("ch_host"),
database="default",
port=config("ch_port", cast=int),
settings=settings) \
if self.__client is None else self.__client
def __enter__(self):
return self
def execute(self, query, params=None, **args):
try:
results = self.__client.execute(query=query, params=params, with_column_types=True, **args)
keys = tuple(x for x, y in results[1])
return [dict(zip(keys, i)) for i in results[0]]
except Exception as err:
logging.error("--------- CH QUERY EXCEPTION -----------")
logging.error(self.format(query=query, params=params))
logging.error("--------------------")
raise err
def insert(self, query, params=None, **args):
return self.__client.execute(query=query, params=params, **args)
def client(self):
return self.__client
def format(self, query, params):
if params is None:
return query
return self.__client.substitute_params(query, params, self.__client.connection.context)
def __exit__(self, *args):
pass

View file

@ -0,0 +1,166 @@
import logging
import time
from threading import Semaphore
import psycopg2
import psycopg2.extras
from decouple import config
from psycopg2 import pool
logging.basicConfig(level=config("LOGLEVEL", default=logging.INFO))
logging.getLogger('apscheduler').setLevel(config("LOGLEVEL", default=logging.INFO))
_PG_CONFIG = {"host": config("pg_host"),
"database": config("pg_dbname"),
"user": config("pg_user"),
"password": config("pg_password"),
"port": config("pg_port", cast=int),
"application_name": config("APP_NAME", default="PY")}
PG_CONFIG = dict(_PG_CONFIG)
if config("PG_TIMEOUT", cast=int, default=0) > 0:
PG_CONFIG["options"] = f"-c statement_timeout={config('PG_TIMEOUT', cast=int) * 1000}"
class ORThreadedConnectionPool(psycopg2.pool.ThreadedConnectionPool):
def __init__(self, minconn, maxconn, *args, **kwargs):
self._semaphore = Semaphore(maxconn)
super().__init__(minconn, maxconn, *args, **kwargs)
def getconn(self, *args, **kwargs):
self._semaphore.acquire()
try:
return super().getconn(*args, **kwargs)
except psycopg2.pool.PoolError as e:
if str(e) == "connection pool is closed":
make_pool()
raise e
def putconn(self, *args, **kwargs):
try:
super().putconn(*args, **kwargs)
self._semaphore.release()
except psycopg2.pool.PoolError as e:
if str(e) == "trying to put unkeyed connection":
print("!!! trying to put unkeyed connection")
print(f"env-PG_POOL:{config('PG_POOL', default=None)}")
return
raise e
postgreSQL_pool: ORThreadedConnectionPool = None
RETRY_MAX = config("PG_RETRY_MAX", cast=int, default=50)
RETRY_INTERVAL = config("PG_RETRY_INTERVAL", cast=int, default=2)
RETRY = 0
def make_pool():
if not config('PG_POOL', cast=bool, default=True):
return
global postgreSQL_pool
global RETRY
if postgreSQL_pool is not None:
try:
postgreSQL_pool.closeall()
except (Exception, psycopg2.DatabaseError) as error:
logging.error("Error while closing all connexions to PostgreSQL", error)
try:
postgreSQL_pool = ORThreadedConnectionPool(config("PG_MINCONN", cast=int, default=20),
config("PG_MAXCONN", cast=int, default=80),
**PG_CONFIG)
if (postgreSQL_pool):
logging.info("Connection pool created successfully")
except (Exception, psycopg2.DatabaseError) as error:
logging.error("Error while connecting to PostgreSQL", error)
if RETRY < RETRY_MAX:
RETRY += 1
logging.info(f"waiting for {RETRY_INTERVAL}s before retry n°{RETRY}")
time.sleep(RETRY_INTERVAL)
make_pool()
else:
raise error
class PostgresClient:
connection = None
cursor = None
long_query = False
unlimited_query = False
def __init__(self, long_query=False, unlimited_query=False):
self.long_query = long_query
self.unlimited_query = unlimited_query
if unlimited_query:
long_config = dict(_PG_CONFIG)
long_config["application_name"] += "-UNLIMITED"
self.connection = psycopg2.connect(**long_config)
elif long_query:
long_config = dict(_PG_CONFIG)
long_config["application_name"] += "-LONG"
long_config["options"] = f"-c statement_timeout=" \
f"{config('pg_long_timeout', cast=int, default=5 * 60) * 1000}"
self.connection = psycopg2.connect(**long_config)
elif not config('PG_POOL', cast=bool, default=True):
single_config = dict(_PG_CONFIG)
single_config["application_name"] += "-NOPOOL"
single_config["options"] = f"-c statement_timeout={config('PG_TIMEOUT', cast=int, default=30) * 1000}"
self.connection = psycopg2.connect(**single_config)
else:
self.connection = postgreSQL_pool.getconn()
def __enter__(self):
if self.cursor is None:
self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
self.cursor.recreate = self.recreate_cursor
return self.cursor
def __exit__(self, *args):
try:
self.connection.commit()
self.cursor.close()
if self.long_query or self.unlimited_query:
self.connection.close()
except Exception as error:
logging.error("Error while committing/closing PG-connection", error)
if str(error) == "connection already closed" \
and not self.long_query \
and not self.unlimited_query \
and config('PG_POOL', cast=bool, default=True):
logging.info("Recreating the connexion pool")
make_pool()
else:
raise error
finally:
if config('PG_POOL', cast=bool, default=True) \
and not self.long_query \
and not self.unlimited_query:
postgreSQL_pool.putconn(self.connection)
def recreate_cursor(self, rollback=False):
if rollback:
try:
self.connection.rollback()
except Exception as error:
logging.error("Error while rollbacking connection for recreation", error)
try:
self.cursor.close()
except Exception as error:
logging.error("Error while closing cursor for recreation", error)
self.cursor = None
return self.__enter__()
async def init():
logging.info(f">PG_POOL:{config('PG_POOL', default=None)}")
if config('PG_POOL', cast=bool, default=True):
make_pool()
async def terminate():
global postgreSQL_pool
if postgreSQL_pool is not None:
try:
postgreSQL_pool.closeall()
logging.info("Closed all connexions to PostgreSQL")
except (Exception, psycopg2.DatabaseError) as error:
logging.error("Error while closing all connexions to PostgreSQL", error)

View file

@ -0,0 +1,11 @@
CREATE TABLE IF NOT EXISTS frontend_signals
(
project_id bigint NOT NULL,
user_id text NOT NULL,
timestamp bigint NOT NULL,
action text NOT NULL,
source text NOT NULL,
category text NOT NULL,
data json
);
CREATE INDEX IF NOT EXISTS frontend_signals_user_id_idx ON frontend_signals (user_id);

View file

@ -0,0 +1,20 @@
BEGIN;
CREATE OR REPLACE FUNCTION openreplay_version()
RETURNS text AS
$$
SELECT 'v1.10.0-ee'
$$ LANGUAGE sql IMMUTABLE;
CREATE TABLE IF NOT EXISTS frontend_signals
(
project_id bigint NOT NULL,
user_id integer NOT NULL references users (user_id) ON DELETE CASCADE,
timestamp bigint NOT NULL,
action text NOT NULL,
source text NOT NULL,
category text NOT NULL,
data jsonb
);
CREATE INDEX IF NOT EXISTS frontend_signals_user_id_idx ON frontend_signals (user_id);
COMMIT;

View file

@ -646,6 +646,19 @@ $$
CREATE INDEX IF NOT EXISTS user_favorite_sessions_user_id_session_id_idx ON user_favorite_sessions (user_id, session_id);
CREATE TABLE IF NOT EXISTS frontend_signals
(
project_id bigint NOT NULL,
user_id text NOT NULL,
timestamp bigint NOT NULL,
action text NOT NULL,
source text NOT NULL,
category text NOT NULL,
data json
);
CREATE INDEX IF NOT EXISTS frontend_signals_user_id_idx ON frontend_signals (user_id);
CREATE TABLE IF NOT EXISTS assigned_sessions
(
session_id bigint NOT NULL REFERENCES sessions (session_id) ON DELETE CASCADE,
@ -1482,4 +1495,4 @@ ON CONFLICT (predefined_key) DO UPDATE
metric_type=excluded.metric_type,
view_type=excluded.view_type;
COMMIT;
COMMIT;