* feat(api): dynamic-api 1/2
* feat(api): dynamic-api 2/2
feat(api): core-api 1/2
* feat(api): changed schemas
feat(api): aipkey authorizer
feat(api): jwt authorizer payload
feat(api): core-api 2/3
* feat(api): apikey authorizer
feat(api): shared context
feat(api): response editor
feat(api): middleware
feat(api): custom router
feat(api): fix auth double call
* feat(api): dashboard
feat(api): insights
feat(api): public api v1
* feat(api): allow full CORS
* feat(api): use decouple-config instead of env
feat(api): fixed conflict slack endpoint
feat(api): fixed favorite errors param
* feat(api): migration fixes
* feat(api): changes
* feat(api): crons
* feat(api): changes and fixes
* feat(api): added new endpoints
feat(api): applied new changes
feat(api): Docker image
* feat(api): EE 1/4
* feat(api): EE core_dynamic
* feat(api): global routers generator
* feat(api): project authorizer
feat(api): docker image
feat(api): crons
* feat(api): EE trace activity
* feat(api): changed ORRouter
* feat(api): EE trace activity parameters&payload
* feat(api): EE trace activity action name & path_format
* feat(db): user trace
* feat(api): EE trace activity ignore routes and hide attribute
feat(api): fix funnel payload schema
* feat(api): mobile support
* feat(api): changed build script
* feat(api): changed mobile sign endpoint
feat(api): changed requirements.txt
* feat(api): changed dockerfile
* feat(api): changed mobile-env-var
* feat(api): removed insights
* feat(api): changed EE Dockerfile
* feat(api): cast session_id to str for signing
* feat(api): fixed error_id type
* feat(api): fixed /errors priority conflict
* feat(api): fixed /errors/{errorId} default params
* feat(api): fixed change password after invitation
* feat(api): use background task for emails instead of low-timeout-api
feat(api): EE fixed missing required params
* feat(api): funnel-insights payload change
* feat(api): funnel-insights payload change
* feat(api): changed edit user payload schema
* feat(api): changed metrics payload schema
* feat(api): changed metrics payload schema
* feat(api): changed edit user default values
feat(api): fixed change error status route
* feat(api): changed edit user
* feat(api): stop user from changing his own role
* feat(api): changed add slack
* feat(api): changed get funnel
* feat(api): changed get funnel on the fly payload
feat(api): changed update payload
* feat(api): changed get funnel on the fly payload
* feat(api): changed update funnel payload
* feat(api): changed get funnel-sessions/issues on the fly payload
* feat(api): fixed funnel missing rangeValue
* feat(api): fixes
* feat(api): iceServers configuration
* feat(api): fix issueId casting
* feat(api): changed issues-sessions endpoint payload-schema
* feat(api): EE changed traces-ignored-routes
* feat(api): EE include core sessions.py
* feat(api): EE check licence on every request if expired
* feat(api): move general stats to dynamic
* feat(api): code cleanup
feat(api): removed sentry
* feat(api): changed traces-ignore-routes
* feat(api): changed dependencies
* feat(api): changed jwt-auth-response code
* feat(api): changed traces-ignore-routes
* feat(api): changed traces-ignore-routes
* feat(api): removed PyTZ
feat(api): migrated time-helper to zoneinfo
* feat(api): EE added missing dependency
feat(api): changed base docker image
* feat(api): merge after roles
* feat(api): EE roles fastapi
* feat(db): handel HTTPExceptions
* feat(db): changed payload schema
* feat(db): changed payload schema
* feat(api): included insights
* feat(api): removed unused helper
* feat(api): merge from dev to fatsapi
* feat(api): merge fixes
feat(api): SAML migration
* feat(api): changed GET /signup response
feat(api): changed EE Dockerfile
* feat(api): changed edition detection
* feat(api): include ee endpoints
* feat(api): add/edit member changes
* feat(api): saml changed redirect
* feat(api): track session's replay
feat(api): track error's details
* feat(api): ignore tracking for read roles
* feat(api): define global queue
feat(api): define global scheduler
feat(api): traces use queue
feat(api): traces batch insert
feat(DB): changed traces schema
* feat(api): fix signup captcha
* feat(api): fix signup captcha
* feat(api): optional roleId
feat(api): set roleId to member if None
* feat(api): fixed edit role
* feat(api): return role details when creating a new member
* feat(api): trace: use BackgroundTasks instead of BackgroundTask to not override previous tasks
* feat(api): trace: use BackgroundTask if no other background task is defined
* feat(api): optimised delete metadata
* feat(api): Notification optional message
* feat(api): fix background-task reference
* feat(api): fix trace-background-task
* feat(api): fixed g-captcha for reset password
* feat(api): fix edit self-user
* feat(api): fixed create github-issue
* feat(api): set misfire_grace_time for crons
* feat(api): removed chalice
feat(api): freeze dependencies
* feat(api): refactored blueprints
* feat(api): /metadata/session_search allow projectId=None
* feat(api): public API, changed userId type
* feat(api): fix upload sourcemaps
* feat(api): user-trace support ApiKey endpoints
* feat(api): fixed user-trace foreign key type
* feat(api): fixed trace schema
* feat(api): trace save auth-method
* feat(api): trace fixed auth-method
* feat(api): trace changed schema
157 lines
6.8 KiB
Python
157 lines
6.8 KiB
Python
import json
|
|
import queue
|
|
import re
|
|
from typing import Optional, List
|
|
|
|
from decouple import config
|
|
from fastapi import Request, Response
|
|
from pydantic import BaseModel, Field
|
|
from starlette.background import BackgroundTask
|
|
|
|
import app as main_app
|
|
from chalicelib.utils import pg_client
|
|
from chalicelib.utils.TimeUTC import TimeUTC
|
|
from schemas import CurrentContext
|
|
|
|
IGNORE_ROUTES = [
|
|
{"method": ["*"], "path": "/notifications"},
|
|
{"method": ["*"], "path": "/announcements"},
|
|
{"method": ["*"], "path": "/client"},
|
|
{"method": ["*"], "path": "/account"},
|
|
{"method": ["GET"], "path": "/projects"},
|
|
{"method": ["*"], "path": "/{projectId}/sessions/search2"},
|
|
{"method": ["GET"], "path": "/{projectId}/sessions2/favorite"},
|
|
{"method": ["GET"], "path": re.compile("^/{projectId}/sessions2/{sessionId}/.*")},
|
|
{"method": ["GET"], "path": "/{projectId}/sample_rate"},
|
|
{"method": ["GET"], "path": "/boarding"},
|
|
{"method": ["GET"], "path": "/{projectId}/metadata"},
|
|
{"method": ["GET"], "path": "/{projectId}/integration/sources"},
|
|
{"method": ["GET"], "path": "/{projectId}/funnels"},
|
|
{"method": ["GET"], "path": "/integrations/slack/channels"},
|
|
{"method": ["GET"], "path": "/webhooks"},
|
|
{"method": ["GET"], "path": "/{projectId}/alerts"},
|
|
{"method": ["GET"], "path": "/client/members"},
|
|
{"method": ["GET"], "path": "/client/roles"},
|
|
{"method": ["GET"], "path": "/announcements/view"},
|
|
{"method": ["GET"], "path": "/config/weekly_report"},
|
|
{"method": ["GET"], "path": "/{projectId}/events/search"},
|
|
{"method": ["POST"], "path": "/{projectId}/errors/search"},
|
|
{"method": ["GET"], "path": "/{projectId}/errors/stats"},
|
|
{"method": ["GET"], "path": re.compile("^/{projectId}/errors/{errorId}/.*")},
|
|
{"method": ["GET"], "path": re.compile("^/integrations/.*")},
|
|
{"method": ["*"], "path": re.compile("^/{projectId}/dashboard/.*")},
|
|
{"method": ["*"], "path": re.compile("^/{projectId}/funnels$")},
|
|
{"method": ["*"], "path": re.compile("^/{projectId}/funnels/.*")},
|
|
]
|
|
IGNORE_IN_PAYLOAD = ["token", "password", "authorizationToken", "authHeader", "xQueryKey", "awsSecretAccessKey",
|
|
"serviceAccountCredentials", "accessKey", "applicationKey", "apiKey"]
|
|
|
|
|
|
class TraceSchema(BaseModel):
|
|
user_id: Optional[int] = Field(None)
|
|
tenant_id: int = Field(...)
|
|
auth: Optional[str] = Field(None)
|
|
action: str = Field(...)
|
|
method: str = Field(...)
|
|
path_format: str = Field(...)
|
|
endpoint: str = Field(...)
|
|
payload: Optional[dict] = Field(None)
|
|
parameters: Optional[dict] = Field(None)
|
|
status: Optional[int] = Field(None)
|
|
created_at: int = Field(...)
|
|
|
|
|
|
def __process_trace(trace: TraceSchema):
|
|
data = trace.dict()
|
|
data["parameters"] = json.dumps(trace.parameters) if trace.parameters is not None and len(
|
|
trace.parameters.keys()) > 0 else None
|
|
data["payload"] = json.dumps(trace.payload) if trace.payload is not None and len(trace.payload.keys()) > 0 else None
|
|
return data
|
|
|
|
|
|
async def write_trace(trace: TraceSchema):
|
|
data = __process_trace(trace)
|
|
with pg_client.PostgresClient() as cur:
|
|
cur.execute(
|
|
cur.mogrify(
|
|
f"""INSERT INTO traces(user_id, tenant_id, created_at, auth, action, method, path_format, endpoint, payload, parameters, status)
|
|
VALUES (%(user_id)s, %(tenant_id)s, %(created_at)s, %(auth)s, %(action)s, %(method)s, %(path_format)s, %(endpoint)s, %(payload)s::jsonb, %(parameters)s::jsonb, %(status)s);""",
|
|
data)
|
|
)
|
|
|
|
|
|
async def write_traces_batch(traces: List[TraceSchema]):
|
|
if len(traces) == 0:
|
|
return
|
|
params = {}
|
|
values = []
|
|
for i, t in enumerate(traces):
|
|
data = __process_trace(t)
|
|
for key in data.keys():
|
|
params[f"{key}_{i}"] = data[key]
|
|
values.append(
|
|
f"(%(user_id_{i})s, %(tenant_id_{i})s, %(created_at_{i})s, %(auth_{i})s, %(action_{i})s, %(method_{i})s, %(path_format_{i})s, %(endpoint_{i})s, %(payload_{i})s::jsonb, %(parameters_{i})s::jsonb, %(status_{i})s)")
|
|
|
|
with pg_client.PostgresClient() as cur:
|
|
cur.execute(
|
|
cur.mogrify(
|
|
f"""INSERT INTO traces(user_id, tenant_id, created_at, auth, action, method, path_format, endpoint, payload, parameters, status)
|
|
VALUES {" , ".join(values)};""",
|
|
params)
|
|
)
|
|
|
|
|
|
async def process_trace(action: str, path_format: str, request: Request, response: Response):
|
|
if not hasattr(request.state, "currentContext"):
|
|
return
|
|
current_context: CurrentContext = request.state.currentContext
|
|
body: json = None
|
|
if request.method in ["POST", "PUT", "DELETE"]:
|
|
body = await request.json()
|
|
intersect = list(set(body.keys()) & set(IGNORE_IN_PAYLOAD))
|
|
for attribute in intersect:
|
|
body[attribute] = "HIDDEN"
|
|
current_trace = TraceSchema(tenant_id=current_context.tenant_id,
|
|
user_id=current_context.user_id if isinstance(current_context, CurrentContext) \
|
|
else None,
|
|
auth="jwt" if isinstance(current_context, CurrentContext) else "apiKey",
|
|
action=action,
|
|
endpoint=str(request.url.path), method=request.method,
|
|
payload=body,
|
|
parameters=dict(request.query_params),
|
|
status=response.status_code,
|
|
path_format=path_format,
|
|
created_at=TimeUTC.now())
|
|
if not hasattr(main_app.app, "queue_system"):
|
|
main_app.app.queue_system = queue.Queue()
|
|
q: queue.Queue = main_app.app.queue_system
|
|
q.put(current_trace)
|
|
|
|
|
|
def trace(action: str, path_format: str, request: Request, response: Response):
|
|
for p in IGNORE_ROUTES:
|
|
if (isinstance(p["path"], str) and p["path"] == path_format \
|
|
or isinstance(p["path"], re.Pattern) and re.search(p["path"], path_format)) \
|
|
and (p["method"][0] == "*" or request.method in p["method"]):
|
|
return
|
|
background_task: BackgroundTask = BackgroundTask(process_trace, action, path_format, request, response)
|
|
if response.background is None:
|
|
response.background = background_task
|
|
else:
|
|
response.background.add_task(background_task.func, *background_task.args, *background_task.kwargs)
|
|
|
|
|
|
async def process_traces_queue():
|
|
queue_system: queue.Queue = main_app.app.queue_system
|
|
traces = []
|
|
while not queue_system.empty():
|
|
obj = queue_system.get_nowait()
|
|
traces.append(obj)
|
|
if len(traces) > 0:
|
|
await write_traces_batch(traces)
|
|
|
|
|
|
cron_jobs = [
|
|
{"func": process_traces_queue, "trigger": "interval", "seconds": config("traces_period", cast=int, default=60),
|
|
"misfire_grace_time": 20}
|
|
]
|