41 lines
1.3 KiB
Python
41 lines
1.3 KiB
Python
from decouple import config, Csv
|
|
import asyncio
|
|
from db.api import DBConnection
|
|
from utils import pg_client
|
|
from utils.worker import WorkerPool
|
|
|
|
|
|
def main():
|
|
DATABASE = config('CLOUD_SERVICE')
|
|
database_api = DBConnection(DATABASE)
|
|
|
|
allowed_projects = config('PROJECT_IDS', default=None, cast=Csv(int))
|
|
w_pool = WorkerPool(n_workers=60,
|
|
project_filter=allowed_projects)
|
|
try:
|
|
w_pool.load_checkpoint(database_api)
|
|
except Exception as e:
|
|
print('[WORKER WARN] Checkpoint not found')
|
|
print(repr(e))
|
|
# ssl_protocol = config('KAFKA_USE_SSL', default=True, cast=bool)
|
|
# consumer_settings = {
|
|
# "bootstrap.servers": config('KAFKA_SERVERS'),
|
|
# "group.id": f"connector_{DATABASE}",
|
|
# "auto.offset.reset": "earliest",
|
|
# "enable.auto.commit": False
|
|
# }
|
|
# if ssl_protocol:
|
|
# consumer_settings['security.protocol'] = 'SSL'
|
|
# consumer = Consumer(consumer_settings)
|
|
|
|
# consumer.subscribe(config("TOPICS", default="saas-raw").split(','))
|
|
print("[WORKER INFO] Kafka consumer subscribed")
|
|
|
|
# w_pool.run_workers(kafka_consumer=consumer, database_api=database_api)
|
|
w_pool.run_workers(database_api=database_api)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
asyncio.run(pg_client.init())
|
|
main()
|
|
|