feat(chalice): refactored insights
This commit is contained in:
parent
523dfccfe7
commit
31d35723ec
3 changed files with 81 additions and 52 deletions
|
|
@ -1,13 +1,14 @@
|
|||
import schemas_ee
|
||||
from chalicelib.utils import ch_client
|
||||
from datetime import datetime, timedelta
|
||||
from chalicelib.utils.TimeUTC import TimeUTC
|
||||
|
||||
|
||||
def _table_slice(table, index):
|
||||
col = list()
|
||||
for row in table:
|
||||
col.append(row[index])
|
||||
return col
|
||||
|
||||
|
||||
def _table_where(table, index, value):
|
||||
new_table = list()
|
||||
for row in table:
|
||||
|
|
@ -15,6 +16,7 @@ def _table_where(table, index, value):
|
|||
new_table.append(row)
|
||||
return new_table
|
||||
|
||||
|
||||
def _sum_table_index(table, index):
|
||||
s = 0
|
||||
count = 0
|
||||
|
|
@ -26,13 +28,16 @@ def _sum_table_index(table, index):
|
|||
count += 1
|
||||
return s
|
||||
|
||||
|
||||
def _mean_table_index(table, index):
|
||||
s = _sum_table_index(table, index)
|
||||
c = len(table)
|
||||
return s/c
|
||||
return s / c
|
||||
|
||||
|
||||
def _sort_table_index(table, index, reverse=False):
|
||||
return sorted(table, key= lambda k: k[index], reverse=reverse)
|
||||
return sorted(table, key=lambda k: k[index], reverse=reverse)
|
||||
|
||||
|
||||
def _select_rec(l, selector):
|
||||
print('selector:', selector)
|
||||
|
|
@ -49,7 +54,7 @@ def _select_rec(l, selector):
|
|||
return [_select_rec(L, selector[1:])]
|
||||
|
||||
|
||||
#TODO Deal with None values
|
||||
# TODO Deal with None values
|
||||
|
||||
def __get_two_values(response, time_index='hh', name_index='name'):
|
||||
columns = list(response[0].keys())
|
||||
|
|
@ -84,16 +89,16 @@ def __handle_timestep(time_step):
|
|||
if time_step == 'hour':
|
||||
return f"toStartOfHour({base})", 3600
|
||||
elif time_step == 'day':
|
||||
return f"toStartOfDay({base})", 24*3600
|
||||
return f"toStartOfDay({base})", 24 * 3600
|
||||
elif time_step == 'week':
|
||||
return f"toStartOfWeek({base})", 7*24*3600
|
||||
return f"toStartOfWeek({base})", 7 * 24 * 3600
|
||||
else:
|
||||
assert type(time_step) == int, "time_step must be {'hour', 'day', 'week'} or an integer representing the time step in minutes"
|
||||
return f"toStartOfInterval({base}, INTERVAL {time_step} minute)", int(time_step)*60
|
||||
assert type(
|
||||
time_step) == int, "time_step must be {'hour', 'day', 'week'} or an integer representing the time step in minutes"
|
||||
return f"toStartOfInterval({base}, INTERVAL {time_step} minute)", int(time_step) * 60
|
||||
|
||||
|
||||
def query_requests_by_period(project_id, start_time=(datetime.now()-timedelta(days=1)).strftime('%Y-%m-%d'),
|
||||
end_time=datetime.now().strftime('%Y-%m-%d'), time_step=3600, conn=None):
|
||||
def query_requests_by_period(project_id, start_time, end_time, time_step, conn=None):
|
||||
function, steps = __handle_timestep(time_step)
|
||||
query = f"""WITH
|
||||
{function.format(f"toDateTime64('{start_time}', 0)")} as start,
|
||||
|
|
@ -106,7 +111,8 @@ SELECT T1.hh, count(T2.session_id) as sessions, avg(T2.success) as success_rate,
|
|||
res = conn.execute(query=query)
|
||||
else:
|
||||
res = conn.execute(query=query)
|
||||
table_hh1, table_hh2, columns, this_period_hosts, last_period_hosts = __get_two_values(res, time_index='hh', name_index='source')
|
||||
table_hh1, table_hh2, columns, this_period_hosts, last_period_hosts = __get_two_values(res, time_index='hh',
|
||||
name_index='source')
|
||||
del res
|
||||
|
||||
new_hosts = [x for x in this_period_hosts if x not in last_period_hosts]
|
||||
|
|
@ -132,12 +138,13 @@ SELECT T1.hh, count(T2.session_id) as sessions, avg(T2.success) as success_rate,
|
|||
# d1_tmp = d1_tmp[d1_tmp[:, success_idx].argsort()]
|
||||
return {'ratio': list(zip(_table_slice(d1_tmp, source_idx), _table_slice(d1_tmp, success_idx))),
|
||||
'increase': sorted(delta_success.items(), key=lambda k: k[1], reverse=False),
|
||||
'new_events': new_hosts}
|
||||
'newEvents': new_hosts}
|
||||
|
||||
|
||||
def query_most_errors_by_period(project_id, start_time=(datetime.now()-timedelta(days=1)).strftime('%Y-%m-%d'),
|
||||
end_time=datetime.now().strftime('%Y-%m-%d'), time_step=3600, conn=None):
|
||||
def query_most_errors_by_period(project_id, start_time, end_time, time_step, conn=None):
|
||||
function, steps = __handle_timestep(time_step)
|
||||
print(function)
|
||||
print(steps)
|
||||
query = f"""WITH {function.format(f"toDateTime64('{start_time}', 0)")} as start,
|
||||
{function.format(f"toDateTime64('{end_time}', 0)")} as end
|
||||
SELECT T1.hh, count(T2.session_id) as sessions, T2.name as names,
|
||||
|
|
@ -148,6 +155,9 @@ def query_most_errors_by_period(project_id, start_time=(datetime.now()-timedelta
|
|||
WHERE project_id = {project_id} AND event_type = 'ERROR') AS T2 ON T2.dtime = T1.hh
|
||||
GROUP BY T1.hh, T2.name
|
||||
ORDER BY T1.hh DESC;"""
|
||||
# print("----------------------------------")
|
||||
# print(query)
|
||||
# print("----------------------------------")
|
||||
if conn is None:
|
||||
with ch_client.ClickHouseClient() as conn:
|
||||
res = conn.execute(query=query)
|
||||
|
|
@ -155,7 +165,7 @@ def query_most_errors_by_period(project_id, start_time=(datetime.now()-timedelta
|
|||
res = conn.execute(query=query)
|
||||
|
||||
table_hh1, table_hh2, columns, this_period_errors, last_period_errors = __get_two_values(res, time_index='hh',
|
||||
name_index='names')
|
||||
name_index='names')
|
||||
del res
|
||||
|
||||
new_errors = [x for x in this_period_errors if x not in last_period_errors]
|
||||
|
|
@ -168,18 +178,18 @@ def query_most_errors_by_period(project_id, start_time=(datetime.now()-timedelta
|
|||
# total = table_hh1[:, sessions_idx].sum()
|
||||
error_increase = dict()
|
||||
for n in this_period_errors:
|
||||
percentage_errors[n] = _sum_table_index(_table_where(table_hh1, names_idx, n), sessions_idx)/total
|
||||
percentage_errors[n] = _sum_table_index(_table_where(table_hh1, names_idx, n), sessions_idx) / total
|
||||
# percentage_errors[n] = (table_hh1[table_hh1[:, names_idx] == n][:, sessions_idx].sum())/total
|
||||
for n in common_errors:
|
||||
error_increase[n] = _sum_table_index(_table_where(table_hh1, names_idx, n), names_idx) - _sum_table_index(_table_where(table_hh2, names_idx, n), names_idx)
|
||||
error_increase[n] = _sum_table_index(_table_where(table_hh1, names_idx, n), names_idx) - _sum_table_index(
|
||||
_table_where(table_hh2, names_idx, n), names_idx)
|
||||
# error_increase[n] = table_hh1[table_hh1[:, names_idx] == n][:, names_idx].sum() - table_hh2[table_hh2[:, names_idx] == n][:, names_idx].sum()
|
||||
return {'ratio': sorted(percentage_errors.items(), key=lambda k: k[1], reverse=True),
|
||||
'increase': sorted(error_increase.items(), key=lambda k: k[1], reverse=True),
|
||||
'new_events': new_errors}
|
||||
'newEvents': new_errors}
|
||||
|
||||
|
||||
def query_cpu_memory_by_period(project_id, start_time=(datetime.now()-timedelta(days=1)).strftime('%Y-%m-%d'),
|
||||
end_time=datetime.now().strftime('%Y-%m-%d'), time_step=3600, conn=None):
|
||||
def query_cpu_memory_by_period(project_id, start_time, end_time, time_step, conn=None):
|
||||
function, steps = __handle_timestep(time_step)
|
||||
query = f"""WITH
|
||||
{function.format(f"toDateTime64('{start_time}', 0)")} as start,
|
||||
|
|
@ -192,7 +202,7 @@ SELECT T1.hh, count(T2.session_id) as sessions, avg(T2.avg_cpu) as cpu_used, avg
|
|||
else:
|
||||
res = conn.execute(query=query)
|
||||
table_hh1, table_hh2, columns, this_period_resources, last_period_resources = __get_two_values(res, time_index='hh',
|
||||
name_index='names')
|
||||
name_index='names')
|
||||
del res
|
||||
|
||||
memory_idx = columns.index('memory_used')
|
||||
|
|
@ -201,11 +211,10 @@ SELECT T1.hh, count(T2.session_id) as sessions, avg(T2.avg_cpu) as cpu_used, avg
|
|||
_tmp = _mean_table_index(table_hh2, memory_idx)
|
||||
# _tmp = table_hh2[:, memory_idx].mean()
|
||||
return {'cpu_increase': _mean_table_index(table_hh1, cpu_idx) - _mean_table_index(table_hh2, cpu_idx),
|
||||
'memory_increase': (_mean_table_index(table_hh1, memory_idx) - _tmp)/_tmp}
|
||||
'memory_increase': (_mean_table_index(table_hh1, memory_idx) - _tmp) / _tmp}
|
||||
|
||||
|
||||
def query_click_rage_by_period(project_id, start_time=(datetime.now()-timedelta(days=1)).strftime('%Y-%m-%d'),
|
||||
end_time=datetime.now().strftime('%Y-%m-%d'), time_step=3600, conn=None):
|
||||
def query_click_rage_by_period(project_id, start_time, end_time, time_step, conn=None):
|
||||
function, steps = __handle_timestep(time_step)
|
||||
click_rage_condition = "issue_type = 'click_rage'"
|
||||
query = f"""WITH
|
||||
|
|
@ -214,13 +223,13 @@ def query_click_rage_by_period(project_id, start_time=(datetime.now()-timedelta(
|
|||
SELECT T1.hh, count(T2.session_id) as sessions, T2.url_host as names, groupUniqArray(T2.url_path) as sources FROM (SELECT arrayJoin(arrayMap(x -> toDateTime(x), range(toUInt32(start), toUInt32(end), {steps}))) as hh) AS T1
|
||||
LEFT JOIN (SELECT session_id, url_host, url_path, {function.format('datetime')} as dtime FROM experimental.events WHERE project_id = {project_id} AND event_type = 'ISSUE' AND {click_rage_condition}) AS T2 ON T2.dtime = T1.hh GROUP BY T1.hh, T2.url_host ORDER BY T1.hh DESC;"""
|
||||
if conn is None:
|
||||
with ch_client.ClickHouseClient(database='experimental') as conn:
|
||||
with ch_client.ClickHouseClient() as conn:
|
||||
res = conn.execute(query=query)
|
||||
else:
|
||||
res = conn.execute(query=query)
|
||||
|
||||
table_hh1, table_hh2, columns, this_period_rage, last_period_rage = __get_two_values(res, time_index='hh',
|
||||
name_index='names')
|
||||
name_index='names')
|
||||
del res
|
||||
|
||||
new_names = [x for x in this_period_rage if x not in last_period_rage]
|
||||
|
|
@ -236,33 +245,46 @@ def query_click_rage_by_period(project_id, start_time=(datetime.now()-timedelta(
|
|||
continue
|
||||
_tmp = _sum_table_index(_table_where(table_hh2, names_idx, n), sessions_idx)
|
||||
# _tmp = table_hh2[:, sessions_idx][n].sum()
|
||||
raged_increment[n] = (_sum_table_index(_table_where(table_hh1, names_idx, n), sessions_idx)-_tmp)/_tmp
|
||||
raged_increment[n] = (_sum_table_index(_table_where(table_hh1, names_idx, n), sessions_idx) - _tmp) / _tmp
|
||||
# raged_increment[n] = (table_hh1[:, sessions_idx][n].sum()-_tmp)/_tmp
|
||||
|
||||
total = _sum_table_index(table_hh1, sessions_idx)
|
||||
# total = table_hh1[:, sessions_idx].sum()
|
||||
return {'ratio': list(zip(_table_slice(table_hh1, names_idx), map(lambda k: k/total, _table_slice(table_hh1, sessions_idx)))),
|
||||
'increase': sorted(raged_increment.items(), key=lambda k: k[1], reverse=True),
|
||||
'new_events': new_names,
|
||||
}
|
||||
return {'ratio': list(
|
||||
zip(_table_slice(table_hh1, names_idx), map(lambda k: k / total, _table_slice(table_hh1, sessions_idx)))),
|
||||
'increase': sorted(raged_increment.items(), key=lambda k: k[1], reverse=True),
|
||||
'newEvents': new_names,
|
||||
}
|
||||
|
||||
|
||||
def fetch_selected(project_id, data: schemas_ee.GetInsightsPayloadSchema, time_step=3600, ):
|
||||
# NO need for assertion here, you should validate it in the schema definition
|
||||
# assert len(selectedEvents) > 0, """'list of selected events must be non empty. Available events are 'errors', 'network', 'rage' and 'resources''"""
|
||||
def fetch_selected(project_id, data: schemas_ee.GetInsightsPayloadSchema):
|
||||
output = {}
|
||||
with ch_client.ClickHouseClient() as conn:
|
||||
if 'errors' in data.selected_events:
|
||||
output['errors'] = query_most_errors_by_period(project_id, data.startDate, end_time, time_step, conn=conn)
|
||||
if 'network' in data.selected_events:
|
||||
output['network'] = query_requests_by_period(project_id, start_time, end_time, time_step, conn=conn)
|
||||
if 'rage' in data.selected_events:
|
||||
output['rage'] = query_click_rage_by_period(project_id, start_time, end_time, time_step, conn=conn)
|
||||
if 'resources' in data.selected_events:
|
||||
output['resources'] = query_cpu_memory_by_period(project_id, start_time, end_time, time_step, conn=conn)
|
||||
if schemas_ee.InsightEvents.errors in data.selected_events:
|
||||
output[schemas_ee.InsightEvents.errors] = query_most_errors_by_period(project_id=project_id,
|
||||
start_time=data.startTimestamp,
|
||||
end_time=data.endTimestamp,
|
||||
time_step=data.time_step,
|
||||
conn=conn)
|
||||
if schemas_ee.InsightEvents.network in data.selected_events:
|
||||
output[schemas_ee.InsightEvents.network] = query_requests_by_period(project_id=project_id,
|
||||
start_time=data.startTimestamp,
|
||||
end_time=data.endTimestamp,
|
||||
time_step=data.time_step,
|
||||
conn=conn)
|
||||
if schemas_ee.InsightEvents.rage in data.selected_events:
|
||||
output[schemas_ee.InsightEvents.rage] = query_click_rage_by_period(project_id=project_id,
|
||||
start_time=data.startTimestamp,
|
||||
end_time=data.endTimestamp,
|
||||
time_step=data.time_step, conn=conn)
|
||||
if schemas_ee.InsightEvents.resources in data.selected_events:
|
||||
output[schemas_ee.InsightEvents.resources] = query_cpu_memory_by_period(project_id=project_id,
|
||||
start_time=data.startTimestamp,
|
||||
end_time=data.endTimestamp,
|
||||
time_step=data.time_step,
|
||||
conn=conn)
|
||||
return output
|
||||
|
||||
|
||||
# if __name__ == '__main__':
|
||||
# # configs
|
||||
# start = '2022-04-19'
|
||||
|
|
|
|||
|
|
@ -132,5 +132,6 @@ def send_interactions(projectId: int, data: schemas_ee.SignalsSchema = Body(...)
|
|||
def sessions_search(projectId: int, data: schemas_ee.GetInsightsPayloadSchema = Body(...),
|
||||
context: schemas_ee.CurrentContext = Depends(OR_context)):
|
||||
return {'data': sessions_insights.fetch_selected(data=data, project_id=projectId,
|
||||
start_time=data.startDate,
|
||||
end_time=data.endDate, time_step=data.timestep)}
|
||||
# start_time=data.startDate,
|
||||
# end_time=data.endDate
|
||||
)}
|
||||
|
|
|
|||
|
|
@ -39,17 +39,23 @@ class SignalsSchema(BaseModel):
|
|||
data: dict = Field(default={})
|
||||
|
||||
|
||||
class InsightEvents(str, Enum):
|
||||
errors = "errors"
|
||||
network = "network"
|
||||
rage = "rage"
|
||||
resources = "resources"
|
||||
|
||||
|
||||
class GetInsightsPayloadSchema(BaseModel):
|
||||
startDate: int = Field(TimeUTC.now(delta_days=-1))
|
||||
endDate: int = Field(TimeUTC.now())
|
||||
timestep: str = Field(...)
|
||||
# list of selected events must be non empty. Available events are 'errors', 'network', 'rage' and 'resources'
|
||||
selected_events: List[str] = Field(..., min_items=1)
|
||||
startTimestamp: int = Field(TimeUTC.now(-7))
|
||||
endTimestamp: int = Field(TimeUTC.now())
|
||||
time_step: int = Field(default=3600)
|
||||
selected_events: List[InsightEvents] = Field(..., min_items=1)
|
||||
|
||||
class Config:
|
||||
alias_generator = schemas.attribute_to_camel_case
|
||||
|
||||
|
||||
|
||||
class CreateMemberSchema(schemas.CreateMemberSchema):
|
||||
roleId: Optional[int] = Field(None)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue