feat(api): assist stats (#1504)

* feat(api): assist stats - api - swagger

* feat(api): assist stats - api - swagger

* feat(api): assist stats - api - swagger

* feat(api): assist stats - sort options and date updates

* feat(api): assist stats - sort options and date updates

* feat(api): api assist stats

* feat(api): api assist stats - schema validation and sort options

* feat(api): assist stats - top members updated

* feat(api): assist stats - top members updated and avg

* feat(api): assist stats - averages

* feat(api): assist stats - averages

* feat(api): assist stats - unique team members

* feat(api): assist stats - aggregate assist stats periodically

* feat(api): assist stats - sql
This commit is contained in:
Shekar Siri 2023-10-09 10:55:02 +02:00 committed by GitHub
parent 54286f906d
commit 860cb93b3d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 553 additions and 6 deletions

View file

@ -0,0 +1,386 @@
import logging
from datetime import datetime
from chalicelib.utils import pg_client, helper
from schemas import AssistStatsSessionsRequest, AssistStatsSessionsResponse, AssistStatsTopMembersResponse
def insert_aggregated_data():
try:
logging.info("Assist Stats: Inserting aggregated data")
end_timestamp = int(datetime.timestamp(datetime.now())) * 1000
start_timestamp = __last_run_end_timestamp_from_aggregates()
if start_timestamp is None: # first run
logging.info("Assist Stats: First run, inserting data for last 7 days")
start_timestamp = end_timestamp - (7 * 24 * 60 * 60 * 1000)
offset = 0
chunk_size = 1000
while True:
constraints = [
"timestamp BETWEEN %(start_timestamp)s AND %(end_timestamp)s"
]
params = {
"limit": chunk_size,
"offset": offset,
"start_timestamp": start_timestamp + 1,
"end_timestamp": end_timestamp,
"step_size": f"{60} seconds",
}
logging.info(f"Assist Stats: Fetching data from {start_timestamp} to {end_timestamp}")
aggregated_data = __get_all_events_hourly_averages(constraints, params)
if not aggregated_data: # No more data to insert
logging.info("Assist Stats: No more data to insert")
break
logging.info(f"Assist Stats: Inserting {len(aggregated_data)} rows")
for data in aggregated_data:
sql = """
INSERT INTO assist_events_aggregates
(timestamp, project_id, agent_id, assist_avg, call_avg, control_avg, assist_total, call_total, control_total)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
params = (
data['time'],
data['project_id'],
data['agent_id'],
data['assist_avg'],
data['call_avg'],
data['control_avg'],
data['assist_total'],
data['call_total'],
data['control_total']
)
with pg_client.PostgresClient() as cur:
cur.execute(sql, params)
offset += chunk_size
# get the first timestamp from the table assist_events based on start_timestamp
sql = f"""
SELECT MAX(timestamp) as first_timestamp
FROM assist_events
WHERE timestamp > %(start_timestamp)s AND duration > 0
GROUP BY timestamp
ORDER BY timestamp DESC LIMIT 1
"""
with pg_client.PostgresClient() as cur:
cur.execute(sql, params)
result = cur.fetchone()
first_timestamp = result['first_timestamp'] if result else None
# insert the first timestamp into assist_events_aggregates_logs
if first_timestamp is not None:
sql = "INSERT INTO assist_events_aggregates_logs (time) VALUES (%s)"
params = (first_timestamp,)
with pg_client.PostgresClient() as cur:
cur.execute(sql, params)
except Exception as e:
logging.error(f"Error inserting aggregated data -: {e}")
def __last_run_end_timestamp_from_aggregates():
sql = "SELECT MAX(time) as last_run_time FROM assist_events_aggregates_logs;"
with pg_client.PostgresClient() as cur:
cur.execute(sql)
result = cur.fetchone()
last_run_time = result['last_run_time'] if result else None
if last_run_time is None: # first run handle all data
sql = "SELECT MIN(timestamp) as last_timestamp FROM assist_events;"
with pg_client.PostgresClient() as cur:
cur.execute(sql)
result = cur.fetchone()
last_run_time = result['last_timestamp'] if result else None
return last_run_time
def __get_all_events_hourly_averages(constraints, params):
sql = f"""
WITH time_series AS (
SELECT
EXTRACT(epoch FROM generate_series(
date_trunc('hour', to_timestamp(%(start_timestamp)s/1000)),
date_trunc('hour', to_timestamp(%(end_timestamp)s/1000)) + interval '1 hour',
interval %(step_size)s
))::bigint as unix_time
)
SELECT
time_series.unix_time * 1000 as time,
project_id,
agent_id,
ROUND(AVG(CASE WHEN event_type = 'assist' THEN duration ELSE 0 END)) as assist_avg,
ROUND(AVG(CASE WHEN event_type = 'call' THEN duration ELSE 0 END)) as call_avg,
ROUND(AVG(CASE WHEN event_type = 'control' THEN duration ELSE 0 END)) as control_avg,
ROUND(SUM(CASE WHEN event_type = 'assist' THEN duration ELSE 0 END)) as assist_total,
ROUND(SUM(CASE WHEN event_type = 'call' THEN duration ELSE 0 END)) as call_total,
ROUND(SUM(CASE WHEN event_type = 'control' THEN duration ELSE 0 END)) as control_total
FROM
time_series
LEFT JOIN assist_events ON time_series.unix_time = EXTRACT(epoch FROM DATE_TRUNC('hour', to_timestamp(assist_events.timestamp/1000)))
WHERE
{' AND '.join(f'{constraint}' for constraint in constraints)}
GROUP BY time, project_id, agent_id
ORDER BY time
LIMIT %(limit)s OFFSET %(offset)s;
"""
with pg_client.PostgresClient() as cur:
query = cur.mogrify(sql, params)
cur.execute(query)
rows = cur.fetchall()
return rows
def get_averages(
project_id: int,
start_timestamp: int,
end_timestamp: int,
user_id: int = None,
):
constraints = [
"project_id = %(project_id)s",
"timestamp BETWEEN %(start_timestamp)s AND %(end_timestamp)s",
]
params = {
"project_id": project_id,
"limit": 5,
"offset": 0,
"start_timestamp": start_timestamp,
"end_timestamp": end_timestamp,
"step_size": f"{60} seconds",
}
if user_id is not None:
constraints.append("agent_id = %(agent_id)s")
params["agent_id"] = user_id
totals = __get_all_events_totals(constraints, params)
rows = __get_all_events_averages(constraints, params)
params["start_timestamp"] = start_timestamp - (end_timestamp - start_timestamp)
params["end_timestamp"] = start_timestamp
previous_totals = __get_all_events_totals(constraints, params)
return {
"currentPeriod": totals[0],
"previousPeriod": previous_totals[0],
"list": helper.list_to_camel_case(rows),
}
def __get_all_events_totals(constraints, params):
sql = f"""
SELECT
ROUND(SUM(assist_total)) as assist_total,
ROUND(AVG(assist_avg)) as assist_avg,
ROUND(SUM(call_total)) as call_total,
ROUND(AVG(call_avg)) as call_avg,
ROUND(SUM(control_total)) as control_total,
ROUND(AVG(control_avg)) as control_avg
FROM assist_events_aggregates
WHERE {' AND '.join(f'{constraint}' for constraint in constraints)}
"""
with pg_client.PostgresClient() as cur:
query = cur.mogrify(sql, params)
cur.execute(query)
rows = cur.fetchall()
return helper.list_to_camel_case(rows)
def __get_all_events_averages(constraints, params):
sql = f"""
SELECT
timestamp,
assist_avg,
call_avg,
control_avg,
assist_total,
call_total,
control_total
FROM assist_events_aggregates
WHERE
{' AND '.join(f'{constraint}' for constraint in constraints)}
ORDER BY timestamp;
"""
with pg_client.PostgresClient() as cur:
query = cur.mogrify(sql, params)
cur.execute(query)
rows = cur.fetchall()
return rows
def __get_all_events_averagesx(constraints, params):
sql = f"""
WITH time_series AS (
SELECT
EXTRACT(epoch FROM generate_series(
date_trunc('minute', to_timestamp(%(start_timestamp)s/1000)),
date_trunc('minute', to_timestamp(%(end_timestamp)s/1000)),
interval %(step_size)s
))::bigint as unix_time
)
SELECT
time_series.unix_time as time,
project_id,
ROUND(AVG(CASE WHEN event_type = 'assist' THEN duration ELSE 0 END)) as assist_avg,
ROUND(AVG(CASE WHEN event_type = 'call' THEN duration ELSE 0 END)) as call_avg,
ROUND(AVG(CASE WHEN event_type = 'control' THEN duration ELSE 0 END)) as control_avg,
ROUND(SUM(CASE WHEN event_type = 'assist' THEN duration ELSE 0 END)) as assist_total,
ROUND(SUM(CASE WHEN event_type = 'call' THEN duration ELSE 0 END)) as call_total,
ROUND(SUM(CASE WHEN event_type = 'control' THEN duration ELSE 0 END)) as control_total
FROM
time_series
LEFT JOIN assist_events ON time_series.unix_time = EXTRACT(epoch FROM DATE_TRUNC('minute', to_timestamp(assist_events.timestamp/1000)))
WHERE
{' AND '.join(f'{constraint}' for constraint in constraints)}
GROUP BY time, project_id
ORDER BY time;
"""
with pg_client.PostgresClient() as cur:
query = cur.mogrify(sql, params)
cur.execute(query)
rows = cur.fetchall()
return rows
def get_top_members(
project_id: int,
start_timestamp: int,
end_timestamp: int,
sort_by: str,
sort_order: str,
user_id: int = None,
) -> AssistStatsTopMembersResponse:
event_type_mapping = {
"sessionsAssisted": "assist",
"assistDuration": "assist",
"callDuration": "call",
"controlDuration": "control"
}
event_type = event_type_mapping.get(sort_by)
if event_type is None:
raise ValueError("Invalid sortBy option")
constraints = [
"project_id = %(project_id)s",
"timestamp BETWEEN %(start_timestamp)s AND %(end_timestamp)s",
"duration > 0",
# "event_type = %(event_type)s",
]
params = {
"project_id": project_id,
"limit": 5,
"offset": 0,
"sort_by": sort_by,
"sort_order": sort_order.upper(),
"start_timestamp": start_timestamp,
"end_timestamp": end_timestamp,
# "event_type": event_type,
}
if user_id is not None:
constraints.append("agent_id = %(agent_id)s")
params["agent_id"] = user_id
sql = f"""
SELECT
COUNT(1) OVER () AS total,
ae.agent_id,
u.name AS name,
CASE WHEN '{sort_by}' = 'sessionsAssisted'
THEN SUM(CASE WHEN ae.event_type = 'assist' THEN 1 ELSE 0 END)
ELSE SUM(CASE WHEN ae.event_type <> 'assist' THEN ae.duration ELSE 0 END)
END AS count
FROM assist_events ae
JOIN users u ON u.user_id = ae.agent_id
WHERE {' AND '.join(f'ae.{constraint}' for constraint in constraints)}
AND ae.event_type = '{event_type}'
GROUP BY ae.agent_id, u.name
ORDER BY count {params['sort_order']}
LIMIT %(limit)s OFFSET %(offset)s
"""
with pg_client.PostgresClient() as cur:
query = cur.mogrify(sql, params)
cur.execute(query)
rows = cur.fetchall()
if len(rows) == 0:
return AssistStatsTopMembersResponse(total=0, list=[])
count = rows[0]["total"]
rows = helper.list_to_camel_case(rows)
for row in rows:
row.pop("total")
return AssistStatsTopMembersResponse(total=count, list=rows)
def get_sessions(
project_id: int,
data: AssistStatsSessionsRequest,
) -> AssistStatsSessionsResponse:
constraints = [
"project_id = %(project_id)s",
"timestamp BETWEEN %(start_timestamp)s AND %(end_timestamp)s",
]
params = {
"project_id": project_id,
"limit": data.limit,
"offset": (data.page - 1) * data.limit,
"sort_by": data.sort,
"sort_order": data.order.upper(),
"start_timestamp": data.startTimestamp,
"end_timestamp": data.endTimestamp,
}
if data.userId is not None:
constraints.append("agent_id = %(agent_id)s")
params["agent_id"] = data.userId
sql = f"""
SELECT
COUNT(1) OVER () AS count,
ae.session_id,
MIN(ae.timestamp) as timestamp,
SUM(CASE WHEN ae.event_type = 'call' THEN ae.duration ELSE 0 END) AS call_duration,
SUM(CASE WHEN ae.event_type = 'control' THEN ae.duration ELSE 0 END) AS control_duration,
SUM(CASE WHEN ae.event_type = 'assist' THEN ae.duration ELSE 0 END) AS assist_duration,
(SELECT json_agg(json_build_object('name', u.name, 'id', u.user_id))
FROM users u
WHERE u.user_id = ANY (array_agg(ae.agent_id))) AS team_members
FROM assist_events ae
WHERE {' AND '.join(f'ae.{constraint}' for constraint in constraints)}
GROUP BY ae.session_id
ORDER BY {params['sort_by']} {params['sort_order']}
LIMIT %(limit)s OFFSET %(offset)s
"""
with pg_client.PostgresClient() as cur:
query = cur.mogrify(sql, params)
cur.execute(query)
rows = cur.fetchall()
if len(rows) == 0:
return AssistStatsSessionsResponse(total=0, page=1, list=[])
count = rows[0]["count"]
rows = helper.list_to_camel_case(rows)
for row in rows:
row.pop("count")
return AssistStatsSessionsResponse(total=count, page=data.page, list=rows)

View file

@ -3,6 +3,7 @@ from apscheduler.triggers.interval import IntervalTrigger
from chalicelib.core import telemetry
from chalicelib.core import weekly_report, jobs, health
from chalicelib.core import assist_stats
async def run_scheduled_jobs() -> None:
@ -25,6 +26,10 @@ async def weekly_health_cron() -> None:
health.weekly_cron()
async def assist_events_aggregates_cron() -> None:
assist_stats.insert_aggregated_data()
cron_jobs = [
{"func": telemetry_cron, "trigger": CronTrigger(day_of_week="*"),
"misfire_grace_time": 60 * 60, "max_instances": 1},
@ -35,5 +40,7 @@ cron_jobs = [
{"func": health_cron, "trigger": IntervalTrigger(hours=0, minutes=30, start_date="2023-04-01 0:0:0", jitter=300),
"misfire_grace_time": 60 * 60, "max_instances": 1},
{"func": weekly_health_cron, "trigger": CronTrigger(day_of_week="sun", hour=5),
"misfire_grace_time": 60 * 60, "max_instances": 1}
"misfire_grace_time": 60 * 60, "max_instances": 1},
{"func": assist_events_aggregates_cron,
"trigger": IntervalTrigger(hours=1, start_date="2023-04-01 0:0:0", jitter=10), }
]

View file

@ -1,7 +1,9 @@
from typing import Union
import json
from datetime import datetime, timedelta
from typing import Union, List, Dict
from decouple import config
from fastapi import Depends, Body
from fastapi import Depends, Body, Query
from starlette.responses import FileResponse
import schemas
from chalicelib.core import log_tool_rollbar, sourcemaps, events, sessions_assignments, projects, \
@ -10,7 +12,7 @@ from chalicelib.core import log_tool_rollbar, sourcemaps, events, sessions_assig
log_tool_stackdriver, reset_password, log_tool_cloudwatch, log_tool_sentry, log_tool_sumologic, log_tools, sessions, \
log_tool_newrelic, announcements, log_tool_bugsnag, weekly_report, integration_jira_cloud, integration_github, \
assist, mobile, tenants, boarding, notifications, webhook, users, \
custom_metrics, saved_search, integrations_global
custom_metrics, saved_search, integrations_global, assist_stats
from chalicelib.core.collaboration_msteams import MSTeams
from chalicelib.core.collaboration_slack import Slack
from or_dependencies import OR_context, OR_role
@ -862,3 +864,53 @@ async def check_recording_status(project_id: int):
@public_app.get('/', tags=["health"])
def health_check():
return {}
@public_app.get('/{project_id}/assist-stats/avg', tags=["assist-stats"])
def get_assist_stats_avg(
project_id: int,
startTimestamp: int = None,
endTimestamp: int = None,
userId: str = None
):
return assist_stats.get_averages(
project_id=project_id,
start_timestamp=startTimestamp,
end_timestamp=endTimestamp,
user_id=userId)
@public_app.get(
'/{project_id}/assist-stats/top-members',
tags=["assist-stats"],
response_model=schemas.AssistStatsTopMembersResponse
)
def get_assist_stats_top_members(
project_id: int,
startTimestamp: int = None,
endTimestamp: int = None,
sortyBy: str = "sessionsAssisted",
sortOder: str = "desc"
):
return assist_stats.get_top_members(
project_id=project_id,
start_timestamp=startTimestamp,
end_timestamp=endTimestamp,
sort_by=sortyBy,
sort_order=sortOder
)
@public_app.post(
'/{project_id}/assist-stats/sessions',
tags=["assist-stats"],
response_model=schemas.AssistStatsSessionsResponse
)
def get_assist_stats_sessions(
project_id: int,
data: schemas.AssistStatsSessionsRequest = Body(...),
):
return assist_stats.get_sessions(
project_id=project_id,
data=data
)

View file

@ -1,7 +1,7 @@
from typing import Annotated, Any
from typing import Optional, List, Union, Literal
from pydantic import Field, EmailStr, HttpUrl, SecretStr, AnyHttpUrl
from pydantic import Field, EmailStr, HttpUrl, SecretStr, AnyHttpUrl, validator
from pydantic import field_validator, model_validator, computed_field
from chalicelib.utils.TimeUTC import TimeUTC
@ -1609,3 +1609,72 @@ class ModuleStatus(BaseModel):
module: Literal["assist", "notes", "bug-reports",
"offline-recordings", "alerts"] = Field(..., description="Possible values: notes, bugs, live")
status: bool = Field(...)
class AssistStatsAverage(BaseModel):
key: str = Field(...)
avg: float = Field(...)
chartData: List[dict] = Field(...)
class AssistStatsMember(BaseModel):
name: str
count: int
class AssistStatsSessionAgent(BaseModel):
name: str
id: int
class AssistStatsTopMembersResponse(BaseModel):
total: int
list: List[AssistStatsMember]
class AssistStatsSessionRecording(BaseModel):
recordId: int = Field(...)
name: str = Field(...)
duration: int = Field(...)
class AssistStatsSession(BaseModel):
sessionId: str = Field(...)
timestamp: int = Field(...)
teamMembers: List[AssistStatsSessionAgent] = Field(...)
assistDuration: Optional[int] = Field(default=0)
callDuration: Optional[int] = Field(default=0)
controlDuration: Optional[int] = Field(default=0)
# recordings: list[AssistStatsSessionRecording] = Field(default=[])
assist_sort_options = ["timestamp", "assist_duration", "call_duration", "control_duration"]
class AssistStatsSessionsRequest(BaseModel):
startTimestamp: int = Field(...)
endTimestamp: int = Field(...)
limit: Optional[int] = Field(default=10)
page: Optional[int] = Field(default=1)
sort: Optional[str] = Field(default="timestamp",
enum=assist_sort_options)
order: Optional[str] = Field(default="desc", choices=["desc", "asc"])
userId: Optional[int] = Field(default=None)
@field_validator("sort")
def validate_sort(cls, v):
if v not in assist_sort_options:
raise ValueError(f"Invalid sort option. Allowed options: {', '.join(assist_sort_options)}")
return v
@field_validator("order")
def validate_order(cls, v):
if v not in ["desc", "asc"]:
raise ValueError("Invalid order option. Must be 'desc' or 'asc'.")
return v
class AssistStatsSessionsResponse(BaseModel):
total: int = Field(...)
page: int = Field(...)
list: List[AssistStatsSession] = Field(default=[])

View file

@ -118,6 +118,39 @@ ALTER TABLE IF EXISTS events.clicks
ALTER TABLE IF EXISTS public.metrics
ADD COLUMN IF NOT EXISTS card_info jsonb NULL;
CREATE TABLE IF NOT EXISTS assist_events
(
event_id varchar NOT NULL PRIMARY KEY,
project_id integer NOT NULL,
session_id varchar NOT NULL,
event_type varchar NOT NULL,
event_state varchar NOT NULL,
timestamp integer NOT NULL,
user_id varchar,
agent_id varchar
);
CREATE TABLE IF NOT EXISTS assist_events_aggregates
(
timestamp BIGINT not null,
project_id integer not null,
agent_id integer not null,
assist_avg BIGINT,
call_avg BIGINT,
control_avg BIGINT,
assist_total BIGINT,
call_total BIGINT,
control_total BIGINT
);
CREATE TABLE IF NOT EXISTS assist_events_aggregates_logs
(
time BIGINT not null
);
COMMIT;
\elif :is_next