openreplay/api/chalicelib/core/integration_base.py
Amirouche 66040ea9f5 wip
2024-02-06 14:18:50 +01:00

63 lines
1.5 KiB
Python

from abc import ABC, abstractmethod
from chalicelib.utils import pg_client, helper
class BaseIntegration(ABC):
def __init__(self, user_id, ISSUE_CLASS):
self._user_id = user_id
self._issue_handler = ISSUE_CLASS(self.integration_token)
self.integration = None
async def init():
integration = await self.get()
self.integration = integration
@property
@abstractmethod
def provider(self):
pass
@property
@abstractmethod
def issue_handler(self):
pass
@property
def integration_token(self):
if integration is None:
print("no token configured yet")
return None
return integration["token"]
async def get(self):
async with pg_client.cursor() as cur:
await cur.execute(
cur.mogrify(
"""SELECT *
FROM public.oauth_authentication
WHERE user_id=%(user_id)s AND provider=%(provider)s;""",
{"user_id": self._user_id, "provider": self.provider.lower()})
)
return helper.dict_to_camel_case(await cur.fetchone())
@abstractmethod
def get_obfuscated(self):
pass
@abstractmethod
async def update(self, changes, obfuscate=False):
pass
@abstractmethod
async def _add(self, data):
pass
@abstractmethod
async def delete(self):
pass
@abstractmethod
async def add_edit(self, data):
pass