style(connectors): different prints for each script (fill, worker) (#1388)
This commit is contained in:
parent
1bdc79d6b4
commit
3479060d02
3 changed files with 19 additions and 20 deletions
|
|
@ -15,7 +15,7 @@ def main():
|
||||||
try:
|
try:
|
||||||
w_pool.load_checkpoint(database_api)
|
w_pool.load_checkpoint(database_api)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print('[WARN] Checkpoint not found')
|
print('[WORKER WARN] Checkpoint not found')
|
||||||
print(repr(e))
|
print(repr(e))
|
||||||
# ssl_protocol = config('KAFKA_USE_SSL', default=True, cast=bool)
|
# ssl_protocol = config('KAFKA_USE_SSL', default=True, cast=bool)
|
||||||
# consumer_settings = {
|
# consumer_settings = {
|
||||||
|
|
@ -29,7 +29,7 @@ def main():
|
||||||
# consumer = Consumer(consumer_settings)
|
# consumer = Consumer(consumer_settings)
|
||||||
|
|
||||||
# consumer.subscribe(config("TOPICS", default="saas-raw").split(','))
|
# consumer.subscribe(config("TOPICS", default="saas-raw").split(','))
|
||||||
print("[INFO] Kafka consumer subscribed")
|
print("[WORKER INFO] Kafka consumer subscribed")
|
||||||
|
|
||||||
# w_pool.run_workers(kafka_consumer=consumer, database_api=database_api)
|
# w_pool.run_workers(kafka_consumer=consumer, database_api=database_api)
|
||||||
w_pool.run_workers(database_api=database_api)
|
w_pool.run_workers(database_api=database_api)
|
||||||
|
|
|
||||||
|
|
@ -44,7 +44,7 @@ async def main():
|
||||||
try:
|
try:
|
||||||
res = pdredshift.redshift_to_pandas(query.format(table=table, limit=limit))
|
res = pdredshift.redshift_to_pandas(query.format(table=table, limit=limit))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(repr(e))
|
print('[FILL Exception]',repr(e))
|
||||||
res = list()
|
res = list()
|
||||||
if res is None:
|
if res is None:
|
||||||
return
|
return
|
||||||
|
|
@ -75,11 +75,10 @@ async def main():
|
||||||
base_query += f"\nEND WHERE sessionid IN ({','.join(all_ids)})"
|
base_query += f"\nEND WHERE sessionid IN ({','.join(all_ids)})"
|
||||||
if len(all_ids) == 0:
|
if len(all_ids) == 0:
|
||||||
return
|
return
|
||||||
print(base_query[:200])
|
|
||||||
try:
|
try:
|
||||||
pdredshift.exec_commit(base_query)
|
pdredshift.exec_commit(base_query)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(repr(e))
|
print('[FILL Exception]',repr(e))
|
||||||
print(f'[FILL-INFO] {time()-t} - for {len(sessionids)} elements')
|
print(f'[FILL-INFO] {time()-t} - for {len(sessionids)} elements')
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -162,7 +162,7 @@ def read_from_kafka(pipe: Connection, params: dict):
|
||||||
try:
|
try:
|
||||||
msg = kafka_consumer.poll(5.0)
|
msg = kafka_consumer.poll(5.0)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(e)
|
print('[WORKER Exception]', e)
|
||||||
if msg is None:
|
if msg is None:
|
||||||
continue
|
continue
|
||||||
n_messages += 1
|
n_messages += 1
|
||||||
|
|
@ -190,21 +190,21 @@ def read_from_kafka(pipe: Connection, params: dict):
|
||||||
to_decode.append(msg)
|
to_decode.append(msg)
|
||||||
if n_messages != 0:
|
if n_messages != 0:
|
||||||
print(
|
print(
|
||||||
f'[INFO-bg] Found {broken_batchs} broken batch over {n_messages} read messages ({100 * broken_batchs / n_messages:.2f}%)')
|
f'[WORKER INFO-bg] Found {broken_batchs} broken batch over {n_messages} read messages ({100 * broken_batchs / n_messages:.2f}%)')
|
||||||
else:
|
else:
|
||||||
print('[WARN-bg] No messages read')
|
print('[WORKER WARN-bg] No messages read')
|
||||||
non_valid_updated = project_filter.non_valid_sessions_cache
|
non_valid_updated = project_filter.non_valid_sessions_cache
|
||||||
pipe.send((non_valid_updated, sessionIds, to_decode))
|
pipe.send((non_valid_updated, sessionIds, to_decode))
|
||||||
continue_signal = pipe.recv()
|
continue_signal = pipe.recv()
|
||||||
if continue_signal == 'CLOSE':
|
if continue_signal == 'CLOSE':
|
||||||
print('[SHUTDOWN-reader] Reader shutting down')
|
print('[WORKER SHUTDOWN-reader] Reader shutting down')
|
||||||
break
|
break
|
||||||
kafka_consumer.commit()
|
kafka_consumer.commit()
|
||||||
print('[INFO] Closing consumer')
|
print('[WORKER INFO] Closing consumer')
|
||||||
close_consumer(kafka_consumer)
|
close_consumer(kafka_consumer)
|
||||||
print('[INFO] Closing pg connection')
|
print('[WORKER INFO] Closing pg connection')
|
||||||
asyncio.run(pg_client.terminate())
|
asyncio.run(pg_client.terminate())
|
||||||
print('[INFO] Successfully closed reader task')
|
print('[WORKER INFO] Successfully closed reader task')
|
||||||
# except Exception as e:
|
# except Exception as e:
|
||||||
# print('[WARN]', repr(e))
|
# print('[WARN]', repr(e))
|
||||||
|
|
||||||
|
|
@ -226,7 +226,7 @@ def project_from_session(sessionId: int):
|
||||||
)
|
)
|
||||||
res = conn.fetchone()
|
res = conn.fetchone()
|
||||||
if res is None:
|
if res is None:
|
||||||
print(f'[WARN] sessionid {sessionId} not found in sessions table')
|
print(f'[WORKER WARN] sessionid {sessionId} not found in sessions table')
|
||||||
return None
|
return None
|
||||||
return res['project_id']
|
return res['project_id']
|
||||||
|
|
||||||
|
|
@ -245,10 +245,10 @@ def project_from_sessions(sessionIds: list[int]):
|
||||||
)
|
)
|
||||||
res = conn.fetchall()
|
res = conn.fetchall()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print('[project_from_sessions]', repr(e))
|
print('[WORKER project_from_sessions]', repr(e))
|
||||||
raise e
|
raise e
|
||||||
if res is None:
|
if res is None:
|
||||||
print(f'[WARN] sessionids {",".join([str(sessId) for sessId in sessIds])} not found in sessions table')
|
print(f'[WORKER WARN] sessionids {",".join([str(sessId) for sessId in sessIds])} not found in sessions table')
|
||||||
else:
|
else:
|
||||||
response += res
|
response += res
|
||||||
sessionIds = sessionIds[:-1000]
|
sessionIds = sessionIds[:-1000]
|
||||||
|
|
@ -341,7 +341,7 @@ class WorkerPool:
|
||||||
elif old_status == 'OPEN':
|
elif old_status == 'OPEN':
|
||||||
sessions_insert_batch.append(deepcopy(self.sessions[session_id]))
|
sessions_insert_batch.append(deepcopy(self.sessions[session_id]))
|
||||||
else:
|
else:
|
||||||
print('[WARN] Closed session should not be closed again')
|
print('[WORKER-WARN] Closed session should not be closed again')
|
||||||
elif flag == 'reader':
|
elif flag == 'reader':
|
||||||
count += 1
|
count += 1
|
||||||
if count > 1:
|
if count > 1:
|
||||||
|
|
@ -374,7 +374,7 @@ class WorkerPool:
|
||||||
while signal_handler.KEEP_PROCESSING:
|
while signal_handler.KEEP_PROCESSING:
|
||||||
# Setup of parameters for workers
|
# Setup of parameters for workers
|
||||||
if not kafka_reader_process.is_alive():
|
if not kafka_reader_process.is_alive():
|
||||||
print('[INFO] Restarting reader task')
|
print('[WORKER-INFO] Restarting reader task')
|
||||||
del kafka_reader_process
|
del kafka_reader_process
|
||||||
kafka_reader_process = Process(target=read_from_kafka, args=(reader_conn, kafka_task_params))
|
kafka_reader_process = Process(target=read_from_kafka, args=(reader_conn, kafka_task_params))
|
||||||
kafka_reader_process.start()
|
kafka_reader_process.start()
|
||||||
|
|
@ -404,7 +404,7 @@ class WorkerPool:
|
||||||
try:
|
try:
|
||||||
results.append(async_result.get(timeout=32 * UPLOAD_RATE))
|
results.append(async_result.get(timeout=32 * UPLOAD_RATE))
|
||||||
except TimeoutError as e:
|
except TimeoutError as e:
|
||||||
print('[TimeoutError] Decoding of messages is taking longer than expected')
|
print('[WORKER-TimeoutError] Decoding of messages is taking longer than expected')
|
||||||
raise e
|
raise e
|
||||||
events_batch, sessions_insert_batch, sessions_update_batch, session_ids, messages = self._pool_response_handler(
|
events_batch, sessions_insert_batch, sessions_update_batch, session_ids, messages = self._pool_response_handler(
|
||||||
pool_results=results)
|
pool_results=results)
|
||||||
|
|
@ -412,11 +412,11 @@ class WorkerPool:
|
||||||
table_name, EVENT_TYPE)
|
table_name, EVENT_TYPE)
|
||||||
self.save_snapshot(database_api)
|
self.save_snapshot(database_api)
|
||||||
main_conn.send('CONTINUE')
|
main_conn.send('CONTINUE')
|
||||||
print('[INFO] Sending close signal')
|
print('[WORKER-INFO] Sending close signal')
|
||||||
main_conn.send('CLOSE')
|
main_conn.send('CLOSE')
|
||||||
self.terminate()
|
self.terminate()
|
||||||
kafka_reader_process.terminate()
|
kafka_reader_process.terminate()
|
||||||
print('[SHUTDOWN] Process terminated')
|
print('[WORKER-SHUTDOWN] Process terminated')
|
||||||
|
|
||||||
def load_checkpoint(self, database_api):
|
def load_checkpoint(self, database_api):
|
||||||
file = database_api.load_binary(name='checkpoint')
|
file = database_api.load_binary(name='checkpoint')
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue