import json import queue import re from typing import Optional, List from apscheduler.triggers.interval import IntervalTrigger from decouple import config from fastapi import Request, Response, BackgroundTasks from pydantic import BaseModel, Field from starlette.background import BackgroundTask import app as main_app import schemas from chalicelib.utils import pg_client, helper from chalicelib.utils.TimeUTC import TimeUTC from schemas import CurrentContext IGNORE_ROUTES = [ {"method": ["*"], "path": "/notifications"}, {"method": ["*"], "path": "/announcements"}, {"method": ["*"], "path": "/client"}, {"method": ["*"], "path": "/account"}, {"method": ["GET"], "path": "/projects"}, {"method": ["*"], "path": "/{projectId}/sessions/search2"}, {"method": ["GET"], "path": "/{projectId}/sessions2/favorite"}, {"method": ["GET"], "path": re.compile("^/{projectId}/sessions2/{sessionId}/.*")}, {"method": ["GET"], "path": "/{projectId}/sample_rate"}, {"method": ["GET"], "path": "/boarding"}, {"method": ["GET"], "path": "/{projectId}/metadata"}, {"method": ["GET"], "path": "/{projectId}/integration/sources"}, {"method": ["GET"], "path": "/{projectId}/funnels"}, {"method": ["GET"], "path": "/integrations/slack/channels"}, {"method": ["GET"], "path": "/webhooks"}, {"method": ["GET"], "path": "/{projectId}/alerts"}, {"method": ["GET"], "path": "/client/members"}, {"method": ["GET"], "path": "/client/roles"}, {"method": ["GET"], "path": "/announcements/view"}, {"method": ["GET"], "path": "/config/weekly_report"}, {"method": ["GET"], "path": "/{projectId}/events/search"}, {"method": ["POST"], "path": "/{projectId}/errors/search"}, {"method": ["GET"], "path": "/{projectId}/errors/stats"}, {"method": ["GET"], "path": re.compile("^/{projectId}/errors/{errorId}/.*")}, {"method": ["GET"], "path": re.compile("^/integrations/.*")}, {"method": ["*"], "path": re.compile("^/{projectId}/dashboard/.*")}, {"method": ["*"], "path": re.compile("^/{projectId}/funnels$")}, {"method": ["*"], "path": re.compile("^/{projectId}/funnels/.*")}, ] IGNORE_IN_PAYLOAD = ["token", "password", "authorizationToken", "authHeader", "xQueryKey", "awsSecretAccessKey", "serviceAccountCredentials", "accessKey", "applicationKey", "apiKey"] class TraceSchema(BaseModel): user_id: Optional[int] = Field(None) tenant_id: int = Field(...) auth: Optional[str] = Field(None) action: str = Field(...) method: str = Field(...) path_format: str = Field(...) endpoint: str = Field(...) payload: Optional[dict] = Field(None) parameters: Optional[dict] = Field(None) status: Optional[int] = Field(None) created_at: int = Field(...) def __process_trace(trace: TraceSchema): data = trace.model_dump() data["parameters"] = json.dumps(trace.parameters) if trace.parameters is not None and len( trace.parameters.keys()) > 0 else None data["payload"] = json.dumps(trace.payload) if trace.payload is not None and len(trace.payload.keys()) > 0 else None return data async def write_trace(trace: TraceSchema): data = __process_trace(trace) with pg_client.PostgresClient() as cur: cur.execute( cur.mogrify( f"""INSERT INTO traces(user_id, tenant_id, created_at, auth, action, method, path_format, endpoint, payload, parameters, status) VALUES (%(user_id)s, %(tenant_id)s, %(created_at)s, %(auth)s, %(action)s, %(method)s, %(path_format)s, %(endpoint)s, %(payload)s::jsonb, %(parameters)s::jsonb, %(status)s);""", data) ) async def write_traces_batch(traces: List[TraceSchema]): if len(traces) == 0: return params = {} values = [] for i, t in enumerate(traces): data = __process_trace(t) for key in data.keys(): params[f"{key}_{i}"] = data[key] values.append( f"(%(user_id_{i})s, %(tenant_id_{i})s, %(created_at_{i})s, %(auth_{i})s, %(action_{i})s, %(method_{i})s, %(path_format_{i})s, %(endpoint_{i})s, %(payload_{i})s::jsonb, %(parameters_{i})s::jsonb, %(status_{i})s)") with pg_client.PostgresClient() as cur: cur.execute( cur.mogrify( f"""INSERT INTO traces(user_id, tenant_id, created_at, auth, action, method, path_format, endpoint, payload, parameters, status) VALUES {" , ".join(values)};""", params) ) async def process_trace(action: str, path_format: str, request: Request, response: Response): if not hasattr(request.state, "currentContext"): return current_context: CurrentContext = request.state.currentContext body: json = None if request.method in ["POST", "PUT", "DELETE"]: try: body = await request.json() except Exception: pass if body: intersect = list(set(body.keys()) & set(IGNORE_IN_PAYLOAD)) for attribute in intersect: body[attribute] = "HIDDEN" current_trace = TraceSchema(tenant_id=current_context.tenant_id, user_id=current_context.user_id if isinstance(current_context, CurrentContext) \ else None, auth="jwt" if isinstance(current_context, CurrentContext) else "apiKey", action=action, endpoint=str(request.url.path), method=request.method, payload=body, parameters=dict(request.query_params), status=response.status_code, path_format=path_format, created_at=TimeUTC.now()) if not hasattr(main_app.app, "queue_system"): main_app.app.queue_system = queue.Queue() q: queue.Queue = main_app.app.queue_system q.put(current_trace) def trace(action: str, path_format: str, request: Request, response: Response): for p in IGNORE_ROUTES: if (isinstance(p["path"], str) and p["path"] == path_format \ or isinstance(p["path"], re.Pattern) and re.search(p["path"], path_format)) \ and (p["method"][0] == "*" or request.method in p["method"]): return background_task: BackgroundTask = BackgroundTask(process_trace, action=action, path_format=path_format, request=request, response=response) if response.background is None: response.background = BackgroundTasks() response.background.add_task(background_task) async def process_traces_queue(): queue_system: queue.Queue = main_app.app.queue_system traces = [] while not queue_system.empty(): obj = queue_system.get_nowait() traces.append(obj) if len(traces) > 0: await write_traces_batch(traces) def get_all(tenant_id, data: schemas.TrailSearchPayloadSchema): with pg_client.PostgresClient() as cur: conditions = ["traces.tenant_id=%(tenant_id)s", "traces.created_at>=%(startDate)s", "traces.created_at<=%(endDate)s"] params = {"tenant_id": tenant_id, "startDate": data.startDate, "endDate": data.endDate, "p_start": (data.page - 1) * data.limit, "p_end": data.page * data.limit, **data.model_dump()} if data.user_id is not None: conditions.append("user_id=%(user_id)s") if data.action is not None: conditions.append("action=%(action)s") if data.query is not None and len(data.query) > 0: conditions.append("users.name ILIKE %(query)s") conditions.append("users.tenant_id = %(tenant_id)s") params["query"] = helper.values_for_operator(value=data.query, op=schemas.SearchEventOperator.CONTAINS) cur.execute( cur.mogrify( f"""SELECT COUNT(*) AS count, COALESCE(JSONB_AGG(full_traces ORDER BY rn) FILTER (WHERE rn > %(p_start)s AND rn <= %(p_end)s), '[]'::JSONB) AS sessions FROM (SELECT traces.*,users.email,users.name AS username, ROW_NUMBER() OVER (ORDER BY traces.created_at {data.order}) AS rn FROM traces LEFT JOIN users USING (user_id) WHERE {" AND ".join(conditions)} ORDER BY traces.created_at {data.order}) AS full_traces;""", params) ) rows = cur.fetchone() return helper.dict_to_camel_case(rows) def get_available_actions(tenant_id): with pg_client.PostgresClient() as cur: cur.execute(cur.mogrify( f"""SELECT DISTINCT action FROM traces WHERE tenant_id=%(tenant_id)s ORDER BY 1""", {"tenant_id": tenant_id})) rows = cur.fetchall() return [r["action"] for r in rows] cron_jobs = [ {"func": process_traces_queue, "trigger": IntervalTrigger(seconds=config("TRACE_PERIOD", cast=int, default=60)), "misfire_grace_time": 20, "max_instances": 1} ]